SGLang 原生 API#
除了与 OpenAI 兼容的 API 外,SGLang 运行时还提供了其原生服务器 API。我们介绍以下 API:
/generate(文本生成模型)/get_model_info/get_server_info/health/health_generate/flush_cache/update_weights/encode(嵌入模型)/v1/rerank(交叉编码器重排模型)/classify(奖励模型)/start_expert_distribution_record/stop_expert_distribution_record/dump_expert_distribution_record/tokenize/detokenize这些 API 的完整列表可以在 http_server.py 中找到
在下面的示例中,我们主要使用 requests 来测试这些 API。您也可以使用 curl。
启动服务器#
[ ]:
from sglang.test.doc_patch import launch_server_cmd
from sglang.utils import wait_for_server, print_highlight, terminate_process
server_process, port = launch_server_cmd(
"python3 -m sglang.launch_server --model-path qwen/qwen2.5-0.5b-instruct --host 0.0.0.0 --log-level warning"
)
wait_for_server(f"http://localhost:{port}")
生成(文本生成模型)#
生成文本补全内容。这与 OpenAI API 中的 /v1/completions 类似。详细参数可以在 采样参数 中找到。
[ ]:
import requests
url = f"http://localhost:{port}/generate"
data = {"text": "法国的首都是什么?"}
response = requests.post(url, json=data)
print_highlight(response.json())
获取模型信息#
获取模型的信息。
model_path:模型的路径/名称。is_generation:模型是否用作生成模型或嵌入模型。tokenizer_path:分词器的路径/名称。preferred_sampling_params:通过--preferred-sampling-params指定的默认采样参数。在此示例中返回None,因为我们没有在服务器参数中明确配置它。weight_version:此字段包含模型权重的版本。这通常用于跟踪模型训练参数的更改或更新。has_image_understanding:模型是否具有图像理解能力。has_audio_understanding:模型是否具有音频理解能力。
[ ]:
url = f"http://localhost:{port}/get_model_info"
response = requests.get(url)
response_json = response.json()
print_highlight(response_json)
assert response_json["model_path"] == "qwen/qwen2.5-0.5b-instruct"
assert response_json["is_generation"] is True
assert response_json["tokenizer_path"] == "qwen/qwen2.5-0.5b-instruct"
assert response_json["preferred_sampling_params"] is None
assert response_json.keys() == {
"model_path",
"is_generation",
"tokenizer_path",
"preferred_sampling_params",
"weight_version",
"has_image_understanding",
"has_audio_understanding",
}
获取服务器信息#
获取服务器信息,包括 CLI 参数、令牌限制和内存池大小。
注意:
get_server_info合并了以下已弃用的端点:get_server_argsget_memory_pool_sizeget_max_total_num_tokens
[ ]:
url = f"http://localhost:{port}/get_server_info"
response = requests.get(url)
print_highlight(response.text)
健康检查#
/health:检查服务器的健康状况。/health_generate:通过生成一个令牌来检查服务器的健康状况。
[ ]:
url = f"http://localhost:{port}/health_generate"
response = requests.get(url)
print_highlight(response.text)
[ ]:
url = f"http://localhost:{port}/health"
response = requests.get(url)
print_highlight(response.text)
刷新缓存#
刷新基数树缓存。当模型权重通过 /update_weights API 更新时,它会自动触发。
[ ]:
url = f"http://localhost:{port}/flush_cache"
response = requests.post(url)
print_highlight(response.text)
从磁盘更新权重#
无需重启服务器即可从磁盘更新模型权重。仅适用于具有相同架构和参数大小的模型。
SGLang 支持 update_weights_from_disk API,用于在训练过程中进行持续评估(将检查点保存到磁盘并从磁盘更新权重)。
[ ]:
# 成功更新,具有相同架构和大小
url = f"http://localhost:{port}/update_weights_from_disk"
data = {"model_path": "qwen/qwen2.5-0.5b-instruct"}
response = requests.post(url, json=data)
print_highlight(response.text)
assert response.json()["success"] is True
assert response.json()["message"] == "模型权重更新成功。"
[ ]:
# 参数大小不同或名称错误导致更新失败
url = f"http://localhost:{port}/update_weights_from_disk"
data = {"model_path": "qwen/qwen2.5-0.5b-instruct-wrong"}
response = requests.post(url, json=data)
response_json = response.json()
print_highlight(response_json)
assert response_json["success"] is False
assert response_json["message"] == (
"获取权重迭代器失败: "
"qwen/qwen2.5-0.5b-instruct-wrong"
"(未找到仓库)。"
)
[ ]:
terminate_process(server_process)
编码(嵌入模型)#
将文本编码为嵌入向量。请注意,此 API 仅适用于嵌入模型,对于生成模型会引发错误。 因此,我们启动一个新服务器来服务嵌入模型。
[ ]:
embedding_process, port = launch_server_cmd(
"""
python3 -m sglang.launch_server --model-path Alibaba-NLP/gte-Qwen2-1.5B-instruct \
--host 0.0.0.0 --is-embedding --log-level warning
"""
)
wait_for_server(f"http://localhost:{port}")
[ ]:
# 嵌入模型成功编码
url = f"http://localhost:{port}/encode"
data = {"model": "Alibaba-NLP/gte-Qwen2-1.5B-instruct", "text": "从前有座山"}
response = requests.post(url, json=data)
response_json = response.json()
print_highlight(f"文本嵌入(前10个):{response_json['embedding'][:10]}")
[ ]:
terminate_process(embedding_process)
v1/rerank(交叉编码器重排模型)#
使用交叉编码器模型根据查询对文档列表进行重排排序。请注意,此 API 仅适用于具有 attention-backend 为 triton 和 torch_native 的交叉编码器模型,如 BAAI/bge-reranker-v2-m3。
[ ]:
reranker_process, port = launch_server_cmd(
"""
python3 -m sglang.launch_server --model-path BAAI/bge-reranker-v2-m3 \
--host 0.0.0.0 --disable-radix-cache --chunked-prefill-size -1 --attention-backend triton --is-embedding --log-level warning
"""
)
wait_for_server(f"http://localhost:{port}")
[ ]:
# 计算查询和文档的重排分数
url = f"http://localhost:{port}/v1/rerank"
data = {
"model": "BAAI/bge-reranker-v2-m3",
"query": "熊猫是什么?",
"documents": [
"你好",
"大熊猫(Ailuropoda melanoleuca),有时被称为熊猫或熊猫,是中国特有的熊种。",
],
}
response = requests.post(url, json=data)
response_json = response.json()
for item in response_json:
print_highlight(f"分数: {item['score']:.2f} - 文档: '{item['document']}'")
[ ]:
terminate_process(reranker_process)
分类(奖励模型)#
SGLang 运行时也支持奖励模型。在这里,我们使用奖励模型来成对生成内容的分类质量。
[ ]:
# 请注意,SGLang 现在将嵌入模型和奖励模型视为同一类型的模型。
# 这将在未来更新。
reward_process, port = launch_server_cmd(
"""
python3 -m sglang.launch_server --model-path Skywork/Skywork-Reward-Llama-3.1-8B-v0.2 --host 0.0.0.0 --is-embedding --log-level warning
"""
)
wait_for_server(f"http://localhost:{port}")
[ ]:
from transformers import AutoTokenizer
PROMPT = (
"神经网络中 Sigmoid 节点的数值输出范围是多少?"
)
RESPONSE1 = "Sigmoid 节点的输出值在 -1 和 1 之间。"
RESPONSE2 = "Sigmoid 节点的输出值在 0 和 1 之间。"
CONVS = [
[{"role": "user", "content": PROMPT}, {"role": "assistant", "content": RESPONSE1}],
[{"role": "user", "content": PROMPT}, {"role": "assistant", "content": RESPONSE2}],
]
tokenizer = AutoTokenizer.from_pretrained("Skywork/Skywork-Reward-Llama-3.1-8B-v0.2")
prompts = tokenizer.apply_chat_template(CONVS, tokenize=False, return_dict=False)
url = f"http://localhost:{port}/classify"
data = {"model": "Skywork/Skywork-Reward-Llama-3.1-8B-v0.2", "text": prompts}
responses = requests.post(url, json=data).json()
for response in responses:
print_highlight(f"奖励分数: {response['embedding'][0]}")
[ ]:
terminate_process(reward_process)
在 MoE 模型中捕获专家选择分布#
SGLang 运行时支持记录 MoE 模型运行中每个专家被选择的次数。这在分析模型吞吐量和规划优化时非常有用。
注意:为了更好的可读性,我们只打印了下面 csv 的前 10 行。如果您想更深入地分析结果,请相应地进行调整。
[ ]:
expert_record_server_process, port = launch_server_cmd(
"python3 -m sglang.launch_server --model-path Qwen/Qwen1.5-MoE-A2.7B --host 0.0.0.0 --expert-distribution-recorder-mode stat --log-level warning"
)
wait_for_server(f"http://localhost:{port}")
[ ]:
response = requests.post(f"http://localhost:{port}/start_expert_distribution_record")
print_highlight(response)
url = f"http://localhost:{port}/generate"
data = {"text": "法国的首都是什么?"}
response = requests.post(url, json=data)
print_highlight(response.json())
response = requests.post(f"http://localhost:{port}/stop_expert_distribution_record")
print_highlight(response)
response = requests.post(f"http://localhost:{port}/dump_expert_distribution_record")
print_highlight(response)
[ ]:
terminate_process(expert_record_server_process)
分词/反分词示例(往返过程)#
本示例演示如何一起使用 /tokenize 和 /detokenize 端点。我们首先将字符串分词,然后将生成的 ID 反分词以重建原始文本。当您需要在外部处理分词但仍希望利用服务器进行反分词时,此工作流程非常有用。
[ ]:
tokenizer_free_server_process, port = launch_server_cmd(
"""
python3 -m sglang.launch_server --model-path qwen/qwen2.5-0.5b-instruct
"""
)
wait_for_server(f"http://localhost:{port}")
[ ]:
import requests
from sglang.utils import print_highlight
base_url = f"http://localhost:{port}"
tokenize_url = f"{base_url}/tokenize"
detokenize_url = f"{base_url}/detokenize"
model_name = "qwen/qwen2.5-0.5b-instruct"
input_text = "SGLang 提供高效的分词端点。"
print_highlight(f"原始输入文本:\n'{input_text}'")
# --- 对输入文本进行分词 ---
tokenize_payload = {
"model": model_name,
"prompt": input_text,
"add_special_tokens": False,
}
try:
tokenize_response = requests.post(tokenize_url, json=tokenize_payload)
tokenize_response.raise_for_status()
tokenization_result = tokenize_response.json()
token_ids = tokenization_result.get("tokens")
if not token_ids:
raise ValueError("分词返回空令牌。")
print_highlight(f"\n分词输出(ID):\n{token_ids}")
print_highlight(f"令牌数量: {tokenization_result.get('count')}")
print_highlight(f"模型最大长度: {tokenization_result.get('max_model_len')}")
# --- 对获取的令牌 ID 进行反分词 ---
detokenize_payload = {
"model": model_name,
"tokens": token_ids,
"skip_special_tokens": True,
}
detokenize_response = requests.post(detokenize_url, json=detokenize_payload)
detokenize_response.raise_for_status()
detokenization_result = detokenize_response.json()
reconstructed_text = detokenization_result.get("text")
print_highlight(f"\n反分词输出(文本):\n'{reconstructed_text}'")
if input_text == reconstructed_text:
print_highlight(
"\n往返成功:原始文本和重建文本匹配。"
)
else:
print_highlight(
"\n往返不匹配:原始文本和重建文本不同。"
)
except requests.exceptions.RequestException as e:
print_highlight(f"\nHTTP 请求错误: {e}")
except Exception as e:
print_highlight(f"\n发生错误: {e}")
[ ]:
terminate_process(tokenizer_free_server_process)