NVIDIA NIM 多 Key 负载均衡代理服务:突破单 Key 限制,同时兼容 OpenAI 与 Anthropic 双协议
一个本地部署的智能代理网关,通过多 API Key 负载均衡突破 NVIDIA NIM 的单 Key RPM 限制,同时原生支持 OpenAI 和 Anthropic 两种 API 格式,内置可视化监控面板与模型管理功能。
目录
背景:为什么需要这个项目?
NVIDIA 推出的 NIM (NVIDIA Inference Microservices) API 提供了免费调用大语言模型的机会,但每个 API Key 都有严格的 RPM (Requests Per Minute) 限制,通常为 40 RPM。
痛点分析
| 场景 | 单 Key 局限 | 实际需求 |
|---|---|---|
| 个人学习 | 40 RPM 够用 | ✅ 无压力 |
| 小团队协作 | 3-5 人同时使用 | ❌ 频繁触发 429 |
| 聊天机器人 | 高并发用户访问 | ❌ 完全不够用 |
| 批量处理 | 数据分析/文本生成 | ❌ 效率极低 |
解决方案
NVIDIA NIM Load Balancer 通过多 Key 智能调度,将 N 个 Key 的 RPM 配额池化使用:
单 Key: ████████░░░░░░░░░░░░░ 40 RPM
5 Keys: ██████████████████████ 200 RPM (理论上限)
10 Keys: ██████████████████████ 400 RPM (理论上限)
更重要的是,本项目不仅解决了限流问题,还实现了 OpenAI / Anthropic 双协议透明代理 —— 你可以用同一套基础设施同时服务两种 SDK 的客户端。
架构概览
┌─────────────────────────────────────────────────────────────────────┐
│ 客户端层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ ChatBox │ │ LobeChat │ │Claude Code│ │ curl │ │
│ │(OpenAI) │ │(OpenAI) │ │(Anthropic)│ │(任意格式) │ │
│ └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘ │
└────────┼────────────┼────────────┼────────────┼────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────────────────────────────────────────────────────────────────┐
│ FastAPI 代理网关 :8000 │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 协议适配层 │ │
│ │ │ │
│ │ ┌─────────────────────┐ ┌─────────────────────────┐ │ │
│ │ │ OpenAI 兼容接口 │ │ Anthropic 兼容接口 │ │ │
│ │ │ │ │ │ │ │
│ │ │ POST /v1/chat/ │◄──►│ POST /v1/messages │ │ │
│ │ │ completions │ │ GET /v1/models │ │ │
│ │ │ GET /v1/models │ │ POST /v1/messages/ │ │ │
│ │ │ │ │ count_tokens │ │ │
│ │ └──────────┬──────────┘ └───────────┬─────────────┘ │ │
│ │ │ │ │ │
│ │ └───────────┬───────────────┘ │ │
│ │ ▼ │ │
│ │ anthropic_adapter.py │ │
│ │ (双向格式转换 + 模型映射 + 截断) │ │
│ └─────────────────────────┬───────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────▼───────────────────────────────────┐ │
│ │ 核心引擎层 │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │
│ │ │ Admission │ │ LoadBalancer │ │ NvidiaProxy │ │ │
│ │ │ Control │→ │ (策略选择) │→ │ (请求转发+重试) │ │ │
│ │ │ (准入控制) │ │ │ │ │ │ │
│ │ └──────────────┘ └──────┬───────┘ └────────┬─────────┘ │ │
│ │ │ │ │ │
│ │ ▼ │ │ │
│ │ ┌─────────────────┐ │ │ │
│ │ │ Key Pool │◄──────────┘ │ │
│ │ │ (滑动窗口计数) │ │ │
│ │ │ ┌───┬───┬───┐ │ │ │
│ │ │ │K1 │K2 │K3 │ │ │ │
│ │ │ │35 │封禁│38 │ │ │ │
│ │ │ └───┴───┴───┘ │ │ │
│ │ └─────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 管理与监控层 │ │
│ │ │ │
│ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ │
│ │ │ /dashboard │ │ /api/stats/* │ │ /api/models/* │ │ │
│ │ │ (Web 面板) │ │ (统计分析) │ │ (模型管理) │ │ │
│ │ └──────────────┘ └──────────────┘ └──────────────────┘ │ │
│ └─────────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────────┐ │
│ │ 持久化层 │ │
│ │ SQLite + WriteBuffer (异步批量写入) │ │
│ └─────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────────┘
│
│ HTTPS
▼
┌─────────────────────────────┐
│ NVIDIA NIM API │
│ integrate.api.nvidia.com │
└─────────────────────────────┘
核心特性详解
1. 双协议兼容引擎
这是本项目的核心竞争力:一个代理服务同时支持 OpenAI 和 Anthropic 两种主流 API 格式,客户端无需修改任何代码即可接入。
工作原理
客户端请求 → 协议检测 → 格式转换 → 统一处理 → 响应回转 → 返回客户端
协议检测机制:通过 anthropic-version 请求头自动识别客户端类型
| 客户端类型 | 识别方式 | 目标端点 | 转换器 |
|---|---|---|---|
| OpenAI SDK | 无特殊 header | /v1/chat/completions |
无需转换 |
| Anthropic SDK | anthropic-version: 2023-06-01 |
/v1/messages |
anthropic_adapter |
OpenAI 接口使用示例
# 标准 Chat Completions(流式)
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "meta/llama-3.1-70b-instruct",
"messages": [{"role": "user", "content": "你好,请介绍一下自己"}],
"stream": true,
"temperature": 0.7
}'
# 响应格式(标准 OpenAI SSE)
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"}}]}
data: [DONE]
Anthropic 接口使用示例
# Messages API(完全兼容 Claude SDK)
curl http://localhost:8000/v1/messages \
-H "Content-Type: application/json" \
-H "anthropic-version: 2023-06-01" \
-H "x-api-key: any-string" \
-d '{
"model": "claude-sonnet-4-6",
"max_tokens": 1024,
"system": "你是一个有帮助的助手",
"messages": [
{"role": "user", "content": "你好,请介绍一下自己"}
]
}'
# 响应格式(标准 Anthropic JSON)
{
"id": "msg_xxx",
"type": "message",
"role": "assistant",
"content": [{"type": "text", "text": "你好!我是..."}],
"model": "claude-sonnet-4-6",
"stop_reason": "end_turn"
}
Anthropic 特有功能支持矩阵
| 功能 | 支持状态 | 配置示例 |
|---|---|---|
| Thinking(扩展思考) | ✅ 完整支持 | "thinking": {"type": "enabled", "budget_tokens": 1024} |
| Tool Use(工具调用) | ✅ 完整支持 | "tools": [...], "tool_choice": {"type": "auto"} |
| System Prompt | ✅ 多格式 | 字符串或 [{type: "text", text: "..."}] 数组 |
| 流式输出(SSE) | ✅ 事件流转换 | content_block_start/delta/stop 事件 |
| 消息截断 | ✅ 自动管理 | 超出上下文窗口时按策略截断 |
| Token 计数 | ✅ 估算接口 | POST /v1/messages/count_tokens |
| 错误格式转换 | ✅ 自动映射 | NVIDIA 错误 → Anthropic 错误结构 |
| 模型名称映射 | ✅ 可配置 | claude-sonnet-4-6 → qwen/qwen3.5-397b-a17b |
模型映射配置
在 config.yaml 中自定义 Anthropic 到 NVIDIA 的模型映射关系:
anthropic:
default_model: "claude-haiku-4-5-20251001"
# 模型名称映射表
model_mapping:
"claude-opus-4-7": "moonshotai/kimi-k2.5"
"claude-opus-4-6": "moonshotai/kimi-k2.5"
"claude-sonnet-4-6": "qwen/qwen3.5-397b-a17b"
"claude-haiku-4-5-20251001": "moonshotai/kimi-k2-instruct-0905"
# 各模型的上下文窗口大小
context_windows:
"claude-opus-4-7": 1000000
"claude-opus-4-6": 1000000
"claude-sonnet-4-6": 256000
"claude-haiku-4-5-20251001": 200000
# 截断策略
truncation_strategy: "recent" # recent: 保留最近 | middle: 从中间截断
truncation_buffer_ratio: 0.10 # 预留 10% 给输出 token
# Thinking 模式
enable_thinking: true
default_thinking_budget: 1024
# Tool Choice
enable_tool_choice: true
实际效果演示
# 使用 Anthropic Python SDK 直接连接本地代理
import anthropic
client = anthropic.Anthropic(
base_url="http://localhost:8000", # 指向本地代理
api_key="any-string", # 随意填写,代理会忽略
)
message = client.messages.create(
model="claude-sonnet-4-6", # 会自动映射为 qwen/qwen3.5-397b-a17b
max_tokens=1024,
messages=[
{"role": "user", "content": "Hello!"}
],
)
print(message.content[0].text) # 正常输出
print(message.model) # "claude-sonnet-4-6" (回映原始模型名)
2. 智能负载均衡系统
三种均衡策略对比
| 策略 | 算法原理 | 适用场景 | 推荐度 |
|---|---|---|---|
most_remaining |
选择当前剩余配额最多的 Key | 最大化吞吐量 | ⭐⭐⭐ 推荐默认 |
round_robin |
严格轮询,依次分配 | 均匀分散请求 | ⭐⭐ |
least_used |
选择历史累计使用最少的 Key | 冷启动/新 Key | ⭐ |
滑动窗口限流机制
每个 Key 维护一个 60 秒滑动窗口,精确追踪实时 RPM 使用量:
class APIKey:
def __init__(self, key: str, rpm_limit: int = 40):
self.key = key
self.rpm_limit = rpm_limit
self._timestamps: deque = deque() # 存储过去 60s 的请求时间戳
@property
def remaining(self) -> int:
"""计算剩余配额"""
self._clean_old_timestamps()
return self.rpm_limit - len(self._timestamps)
def _clean_old_timestamps(self):
"""清理过期时间戳(超过 60 秒的)"""
cutoff = time.time() - 60
while self._timestamps and self._timestamps[0] < cutoff:
self._timestamps.popleft()
def record_request(self):
"""记录一次请求"""
self._timestamps.append(time.time())
时间线示意:
Key A (limit=40): |●●●●●●●●●●|●●●●●●●●●●|●●●●●●●●●●|░░░░░░░░░░|
↑ t=0s ↑ t=20s ↑ t=40s ↑ t=60s
当前: 30次 当前: 30次 当前: 30次 过期清理后: 0次
剩余: 10 剩余: 10 剩余: 10 剩余: 40
自动故障转移流程
当某个 Key 触发 429 限流时:
请求到达 → 选择 Key A → 发送请求 → 收到 429 错误
↓
┌─────────────────┐
│ 封禁 Key A 60s │
│ 更新状态为 banned│
└────────┬────────┘
↓
Full Jitter 退避
random(0, min(2^attempt, 8)) 秒
↓
选择下一个可用 Key B → 重试请求
↓
成功?→ 返回结果
失败?→ 重复上述流程(最多 max_retries 次)
健康检查后台任务
独立协程周期性检测所有 Key 的健康状态:
async def health_check_loop():
while True:
for key in key_pool.keys:
if key.status == "banned":
# 尝试解禁:发送测试请求
success = await test_key(key.key)
if success:
key.unban()
elif key.consecutive_failures >= 3:
# 连续失败多次,主动检测
await test_key(key.key)
await asyncio.sleep(30) # 每 30 秒检查一次
3. 前置准入控制 (Admission Control)
这是防止 429 雪崩效应 的关键设计。在高并发场景下,如果所有请求同时涌向 NVIDIA,即使有多 Key 也可能瞬间触发全部限流。
问题场景
无准入控制:
100 个并发请求 → 同时占用 100 个配额 → 所有 Key 瞬间超限 → 全部返回 429 → 客户端重试风暴 → 雪崩
解决方案:原子性预扣配额
在请求发出之前就锁定配额,确保总并发量不超过所有 Key 的总配额:
def try_acquire(self, select_fn=None) -> Optional[APIKey]:
"""
原子性地获取一个可用 Key 并预扣配额
使用锁保证并发安全
"""
with self._acquire_lock: # 互斥锁
available = [k for k in self.keys if k.is_available()]
if not available:
return None # 无可用 Key
selected = select_fn(available) # 应用选择策略
selected.pre_acquire() # 预扣配额(原子操作)
return selected
两种模式对比
| 模式 | 行为 | 适用场景 | 资源消耗 |
|---|---|---|---|
reject_fast |
配额不足立即返回 HTTP 429 | 高并发 / API 调用 | ⚡ 零等待 |
queue_wait |
智能排队等待配额释放 | 交互式聊天 / 低延迟要求 | 🔄 占用协程 |
reject_fast 模式(推荐高并发):
balancer:
admission_mode: "reject_fast"
# 效果:429 响应包含 Retry-After header,客户端可据此自动重试
queue_wait 模式(推荐聊天场景):
balancer:
admission_mode: "queue_wait"
queue_wait_timeout: 30 # 最大等待 30 秒
# 效果:用户无感知,请求会自动等待直到有配额
4. 可视化监控面板
访问 http://localhost:8000/dashboard 即可打开内置监控面板。
面板功能一览
| 模块 | 展示内容 | 更新频率 |
|---|---|---|
| 📈 请求趋势图 | 过去 N 小时的请求量、Token 消耗、错误率 | 实时轮询 |
| 🥧 模型分布饼图 | 各模型调用量占比、Token 消耗占比 | 实时 |
| 📊 Key 状态柱状图 | 各 Key 的 Token 使用对比、RPM 利用率 | 实时 |
| 📋 请求记录表 | 最近 50 条完整调用记录(含延迟、Token、状态) | 实时 |
| ⚡ 性能指标卡片 | TTFT P50/P95、Tokens/s P50/P95、总请求数 | 实时 |
| 🔑 Key 池概览 | 每个 Key 的实时 RPM、健康状态、累计统计 | 实时 |
性能指标说明
| 指标 | 英文全称 | 含义 | 优化目标 |
|---|---|---|---|
| TTFT | Time To First Token | 从发请求到收到第一个 Token 的延迟 | 越低越好 (< 1s) |
| Tokens/s | Tokens per Second | 文本生成速度 | 越高越好 (> 30) |
| P50 | 50th Percentile | 中位数(典型值) | 参考基准 |
| P95 | 95th Percentile | 95 分位(尾部情况) | 关注长尾 |
统计数据持久化
所有统计数据通过 WriteBuffer 异步批量写入 SQLite,保证:
- ✅ 服务重启不丢失历史数据
- ✅ 高频写入不影响主线程性能
- ✅ 支持查询最长 24 小时的时序数据
class WriteBuffer:
"""异步批量写入缓冲区"""
def __init__(self, engine, flush_interval=5, batch_size=100):
self._buffer = []
self._flush_interval = flush_interval # 每 5 秒刷写一次
self._batch_size = batch_size # 或累积 100 条时刷写
async def add(self, record: RequestRecord):
self._buffer.append(record)
if len(self._buffer) >= self._batch_size:
await self.flush()
async def flush(self):
"""批量写入数据库"""
if not self._buffer:
return
batch = self._buffer.copy()
self._buffer.clear()
# 使用 SQLAlchemy bulk_insert_mappings 高效插入
await self._engine.execute(...)
5. 模型管理平台
除了作为代理转发请求,本服务还提供了完整的 模型生命周期管理 功能。
管理接口清单
| 操作 | 方法 | 路径 | 说明 |
|---|---|---|---|
| 查看所有模型 | GET | /api/models |
按厂商分组展示 |
| 启用模型 | POST | /api/models/{id}/enable |
加入可用列表 |
| 禁用模型 | POST | /api/models/{id}/disable |
从可用列表移除 |
| 切换状态 | POST | /api/models/{id}/toggle |
快速启用/禁用 |
| 全部启用 | POST | /api/models/enable-all |
一键启用所有 |
| 全部禁用 | POST | /api/models/disable-all |
仅保留默认模型 |
| 刷新列表 | POST | /api/models/fetch |
从 NVIDIA 重新拉取 |
使用场景
# 场景 1:只保留常用的几个模型,减少列表长度
curl -X POST http://localhost:8000/api/models/disable-all
curl -X POST http://localhost:8000/api/models/meta/llama-3.1-70b-instruct/enable
# 场景 2:发现 NVIDIA 上架了新模型,刷新列表
curl -X POST http://localhost:8000/api/models/fetch
# 场景 3:查看当前启用的模型及统计
curl http://localhost:8000/api/models | jq .
# 输出:
# {
# "stats": {"total": 156, "enabled": 12, "disabled": 144},
# "groups": {
# "meta": [{...}, {...}],
# "nvidia": [{...}],
# "moonshotai": [{...}],
# "qwen": [{...}]
# }
# }
自动拉取配置
models:
auto_fetch: true # 启动时自动从 NVIDIA 拉取最新模型列表
fallback_list: # 如果拉取失败,使用这些备用模型
- "meta/llama-3.1-70b-instruct"
- "meta/llama-3.1-405b-instruct"
- "nvidia/llama-3.1-nemotron-70b-instruct"
快速上手
环境准备
# 要求 Python >= 3.10
python --version # 确保 >= 3.10
# 克隆项目
git clone https://github.com/Xiaoma716/NVIDIA-NIM.git
cd NVIDIA-NIM
# 创建虚拟环境(推荐)
python -m venv venv
# Windows
venv\Scripts\activate
# Linux/macOS
source venv/bin/activate
# 安装依赖
pip install -r requirements.txt
配置文件
复制模板并编辑:
cp config.example.yaml config.yaml
最小可用配置(只需填入 API Key):
keys:
- key: "nvapi-你的第一个key"
- key: "nvapi-你的第二个key" # 可选,多个 Key 实现负载均衡
nvidia:
base_url: "https://integrate.api.nvidia.com/v1"
default_model: "meta/llama-3.1-70b-instruct"
rpm_limit: 40
server:
port: 8000
💡 其他配置项都有合理默认值,可以先跑起来再逐步调优。
启动服务
python main.py
看到以下输出说明启动成功:
2024-xx-xx 12:00:00 | INFO | 配置加载成功
2024-xx-xx 12:00:00 | INFO | Key Pool 初始化完成: 3 个 Key 可用
2024-xx-xx 12:00:00 | INFO | 模型列表加载完成: 156 个模型 (12 个已启用)
2024-xx-xx 12:00:00 | INFO | 数据库初始化完成
2024-xx-xx 12:00:00 | INFO | Uvicorn running on http://0.0.0.0:8000
验证运行
# 1. 健康检查
curl http://localhost:8000/health
# 预期输出:{"status": "healthy", "summary": {"available_keys": 3, ...}}
# 2. OpenAI 格式测试
curl http://localhost:8000/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{"model": "meta/llama-3.1-70b-instruct", "messages": [{"role": "user", "content": "hi"}]}'
# 3. Anthropic 格式测试
curl http://localhost:8000/v1/messages \
-H "Content-Type: application/json" \
-H "anthropic-version: 2023-06-01" \
-d '{"model": "claude-haiku-4-5-20251001", "max_tokens": 64, "messages": [{"role": "user", "content": "hi"}]}'
# 4. 打开监控面板
# 浏览器访问: http://localhost:8000/dashboard
接入你的应用
方式一:OpenAI 兼容客户端
| 客户端 | 配置位置 | Base URL 填写 |
|---|---|---|
| ChatBox | 设置 → API 地址 | http://localhost:8000/v1 |
| LobeChat | 设置 → OpenAI API | http://localhost:8000/v1 |
| NextChat | 设置 → API 地址 | http://localhost:8000/v1 |
| 自定义 Python | openai.OpenAI(base_url="...") |
http://localhost:8000/v1 |
方式二:Anthropic 兼容客户端
| 客户端 | 配置方式 |
|---|---|
| Claude Code CLI | export ANTHROPIC_BASE_URL=http://localhost:8000 |
| Claude Desktop | 在 config.json 中设置 apiBaseUrl |
| Python SDK | anthropic.Anthropic(base_url="http://localhost:8000") |
| TypeScript SDK | new Anthropic({baseURL: "http://localhost:8000"}) |
技术实现深挖
项目目录结构
NVIDIA-NIM/
├── main.py # 启动入口,含配置校验
├── config.example.yaml # 配置模板
├── requirements.txt # 依赖清单
│
├── core/ # 🔧 核心逻辑层
│ ├── config.py # 单例配置管理,支持点号路径访问
│ ├── key_pool.py # Key 池:滑动窗口 + 状态机 + 健康检查
│ ├── balancer.py # 负载均衡器:3 种策略 + 准入控制
│ ├── proxy.py # 请求转发:重试 + 流式 + Token 提取
│ ├── model_manager.py # 模型管理:CRUD + 缓存 + 分组
│ ├── stats_manager.py # 统计引擎:时序 + Token + 性能指标
│ ├── database.py # SQLite ORM 初始化
│ ├── write_buffer.py # 异步批量写入缓冲区
│ └── anthropic_adapter.py # ★ 双协议转换核心
│
├── api/ # 🌐 HTTP 接口层
│ ├── router.py # 路由定义(FastAPI Depends 注入)
│ └── dashboard.py # 监控面板 HTML(内嵌 ECharts)
│
├── static/
│ └── dashboard.html # 静态资源
│
├── tests/ # 🧪 测试
│ ├── test_core.py
│ └── test_prompt_token_leak.py
│
├── data/ # 💾 运行时数据(SQLite)
└── logs/ # 📝 日志文件
关键技术点
1. Full Jitter 退避算法
避免重试风暴的关键:
@staticmethod
def _backoff(attempt: int, base: float = 1.0, cap: float = 8.0) -> float:
"""
Full Jitter: sleep = random(0, min(cap, base * 2^attempt))
相比固定退避或指数退避,Full Jitter 能有效分散重试时间,
降低多个客户端同时重试导致碰撞的概率。
"""
exp = base * (2 ** attempt) # 指数增长: 1, 2, 4, 8...
capped = min(exp, cap) # 上限封顶: 最大 8 秒
return random.uniform(0, capped) # 随机抖动: [0, capped]
# 重试时间序列示例:
# attempt=0: random(0, 1) ≈ 0.3s
# attempt=1: random(0, 2) ≈ 1.2s
# attempt=2: random(0, 4) ≈ 2.7s
# attempt=3: random(0, 8) ≈ 5.1s
2. Anthropic ↔ OpenAI 格式转换核心
这是整个双协议兼容的核心模块,负责处理两种 API 格式的所有差异:
# 请求转换:Anthropic → OpenAI
def convert_request(anthropic_req: dict) -> dict:
"""将 Anthropic Messages API 请求转换为 OpenAI Chat Completions 格式"""
openai_messages = []
# 处理 system prompt(Anthropic 独有字段)
if "system" in anthropic_req:
system_content = _convert_system_content(anthropic_req["system"])
openai_messages.append({"role": "system", "content": system_content})
# 转换消息(处理 tool_result、thinking 等特殊块)
for msg in anthropic_req["messages"]:
converted = convert_message(msg)
openai_messages.append(converted)
# 映射模型名称
nvidia_model = map_model_to_nvidia(anthropic_req["model"])
return {
"model": nvidia_model,
"messages": openai_messages,
"temperature": anthropic_req.get("temperature"),
"max_tokens": anthropic_req.get("max_tokens", 4096),
"stream": anthropic_req.get("stream", False),
# ... 其他字段映射
}
# 响应转换:OpenAI → Anthropic
def convert_response(openai_resp: dict, original_model: str) -> dict:
"""将 OpenAI 响应转换回 Anthropic 格式"""
return {
"id": generate_msg_id(),
"type": "message",
"role": "assistant",
"content": [
{
"type": "text",
"text": openai_resp["choices"][0]["message"]["content"]
}
],
"model": original_model, # 回映原始 Anthropic 模型名
"stop_reason": map_stop_reason(openai_resp["choices"][0].get("finish_reason")),
"usage": {
"input_tokens": openai_resp["usage"]["prompt_tokens"],
"output_tokens": openai_resp["usage"]["completion_tokens"],
}
}
3. 流式响应的双向转换
SSE 流式输出的转换是最复杂的部分:
async def convert_stream(
openai_stream: AsyncGenerator,
original_model: str
) -> AsyncGenerator[str, None]:
"""
将 OpenAI SSE 流转换为 Anthropic SSE 事件流
OpenAI 事件格式:
data: {"choices":[{"delta":{"content":"Hello"}}]}
Anthropic 事件格式:
event: content_block_start
data: {"index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"index":{},"delta":{"type":"text_delta","text":"Hello"}}
event: content_block_stop
data: {"index":0}
event: message_start
event: message_delta
event: message_stop
"""
# 发送 message_start 事件
yield _make_sse_event("message_start", {...})
# 发送 content_block_start 事件
yield _make_sse_event("content_block_start", {
"index": 0,
"content_block": {"type": "text", "text": ""}
})
# 转换每个 chunk
async for chunk in openai_stream:
delta = chunk["choices"][0].get("delta", {})
content = delta.get("content", "")
if content:
# 发送 content_block_delta 事件
yield _make_sse_event("content_block_delta", {
"index": 0,
"delta": {"type": "text_delta", "text": content}
})
# 发送结束事件
yield _make_sse_event("content_block_stop", {"index": 0})
yield _make_sse_event("message_delta", {"stop_reason": "end_turn"})
yield _make_sse_event("message_stop", {})
4. 消息截断策略
当消息总长度超过模型的上下文窗口时,自动截断:
def truncate_messages(
messages: List[Dict],
max_context_tokens: int,
buffer_ratio: float = 0.10, # 预留 10%
strategy: str = "recent" # 截断策略
) -> tuple:
"""
截断消息以适应上下文窗口
Args:
messages: 消息列表
max_context_tokens: 上下文窗口大小
buffer_ratio: 为输出预留的比例
strategy: "recent" 保留最近的消息
"middle" 从中间删除旧消息
Returns:
(截断后的消息, 是否被截断, 移除的 token 数)
"""
effective_limit = int(max_context_tokens * (1 - buffer_ratio))
current_tokens = estimate_tokens_for_messages(messages)
if current_tokens <= effective_limit:
return messages, False, 0
if strategy == "recent":
# 从开头移除旧消息,保留最近的
while messages and estimate_tokens_for_messages(messages) > effective_limit:
removed = messages.pop(0)
elif strategy == "middle":
# 保留 system + 最近几条,删除中间的历史
pass # 实现略...
removed_tokens = current_tokens - estimate_tokens_for_messages(messages)
return messages, True, removed_tokens
技术栈总览
| 层级 | 技术选型 | 选型理由 |
|---|---|---|
| Web 框架 | FastAPI + Uvicorn | 异步高性能,自动生成 API 文档 |
| HTTP 客户端 | httpx (HTTP/2 支持) | 异步原生,连接池复用 |
| AI SDK | OpenAI Python SDK | 统一接口抽象,简化 NVIDIA API 调用 |
| ORM | SQLAlchemy 2.0 + aiosqlite | 异步 ORM + 异步 SQLite 驱动 |
| 日志 | Loguru | 开箱即用的结构化日志,支持轮转 |
| 配置 | PyYAML | 人类可读的配置格式 |
| 前端 | ECharts (内嵌 HTML) | 零依赖,功能丰富的数据可视化 |
性能指标
在实际部署中,本服务的性能表现如下(基于 5 个 Key、most_remaining 策略):
| 指标 | 典型值 | 说明 |
|---|---|---|
| 吞吐量提升 | 5x (5 Keys) | 线性扩展,N 个 Key ≈ N 倍吞吐 |
| TTFT 中位数 (P50) | 300-800ms | 取决于模型大小和网络延迟 |
| TTFT 95 分位 (P95) | 1-3s | 包含重试和 Key 切换的情况 |
| 生成速度 | 30-80 tokens/s | 取决于具体模型 |
| 代理额外开销 | < 5ms | 本地转发几乎无延迟 |
| 内存占用 | ~50MB (空闲) | 轻量级,适合个人服务器 |
| Key 切换耗时 | < 1ms | 内存中的策略选择 |
与直接调用的对比
| 对比项 | 直接调用 NVIDIA API | 通过本代理 |
|---|---|---|
| 单 Key RPM 上限 | 40 | 40(相同) |
| 总可用 RPM | 40 | 40 × N(N 个 Key) |
| 429 处理 | 客户端自行处理 | 自动切换 + 重试 |
| 协议支持 | 仅 OpenAI 格式 | OpenAI + Anthropic |
| 监控能力 | 无 | 完整 Dashboard |
| 统计数据 | 无 | 持久化存储 |
| 模型管理 | 手动输入 | 自动拉取 + 启停控制 |
总结与展望
项目价值
本项目的核心价值在于:
- 突破限制:通过多 Key 池化,线性扩展 API 调用能力
- 双协议透明代理:一套服务同时服务 OpenAI 和 Anthropic 生态
- 生产级可靠性:前置准入控制、自动故障转移、健康检查
- 可观测性:完整的监控面板和性能指标
- 易用性:零代码改动即可接入现有应用
适用场景
- ✅ 个人学习与研究(免费调用大模型)
- ✅ 小团队内部工具(多人共享 API 配额)
- ✅ 聊天机器人后端(高并发场景)
- ✅ 批量文本处理(数据分析、翻译等)
- ✅ Claude Code / ChatGPT 替代方案(本地代理)
未来规划
- 支持更多 API 协议(如 Google Gemini 格式)
- 分布式部署模式(多实例协同)
- Web UI 配置管理界面
- Docker 一键部署
- Key 用量告警与通知
- 请求日志的全文搜索
开源信息
| 项目 | 信息 |
|---|---|
| GitHub | MaZhenyu-Dev/NVIDIA-NIM |
| License | MIT |
| Python | >= 3.10 |
| 维护者 | @MaZhenyu-Dev |
Star 历史
如果这个项目对你有帮助,欢迎给一个 ⭐ 支持一下!
你的 Star 是我持续更新的动力 ⭐
免责声明
⚠️ 重要提示:本项目为个人学习与研究目的开发,仅供本地使用。
- 使用者需自行承担使用 NVIDIA API 所产生的一切费用与责任
- 本项目与 NVIDIA 公司无任何关联
- 请遵守 NVIDIA API 的使用条款和服务协议
- 本项目不提供任何形式的服务级别保证(SLA)
最后更新:2026 年 5 月 | 版本:v2.2