Ma Zhenyu's Blog
技术笔记发布于: 2026年5月1日5283

NVIDIA NIM 多 Key 负载均衡代理服务:突破单 Key 限制,同时兼容 OpenAI 与 Anthropic 双协议

Python

一个本地部署的智能代理网关,通过多 API Key 负载均衡突破 NVIDIA NIM 的单 Key RPM 限制,同时原生支持 OpenAI 和 Anthropic 两种 API 格式,内置可视化监控面板与模型管理功能。


目录


背景:为什么需要这个项目?

NVIDIA 推出的 NIM (NVIDIA Inference Microservices) API 提供了免费调用大语言模型的机会,但每个 API Key 都有严格的 RPM (Requests Per Minute) 限制,通常为 40 RPM

痛点分析

场景 单 Key 局限 实际需求
个人学习 40 RPM 够用 ✅ 无压力
小团队协作 3-5 人同时使用 ❌ 频繁触发 429
聊天机器人 高并发用户访问 ❌ 完全不够用
批量处理 数据分析/文本生成 ❌ 效率极低

解决方案

NVIDIA NIM Load Balancer 通过多 Key 智能调度,将 N 个 Key 的 RPM 配额池化使用:

单 Key:     ████████░░░░░░░░░░░░░  40 RPM
5 Keys:     ██████████████████████  200 RPM (理论上限)
10 Keys:    ██████████████████████  400 RPM (理论上限)

更重要的是,本项目不仅解决了限流问题,还实现了 OpenAI / Anthropic 双协议透明代理 —— 你可以用同一套基础设施同时服务两种 SDK 的客户端。


架构概览

┌─────────────────────────────────────────────────────────────────────┐
│                        客户端层                                      │
│   ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐             │
│   │ ChatBox  │ │ LobeChat │ │Claude Code│ │  curl    │             │
│   │(OpenAI)  │ │(OpenAI)  │ │(Anthropic)│ │(任意格式) │             │
│   └────┬─────┘ └────┬─────┘ └────┬─────┘ └────┬─────┘             │
└────────┼────────────┼────────────┼────────────┼────────────────────┘
         │            │            │            │
         ▼            ▼            ▼            ▼
┌─────────────────────────────────────────────────────────────────────┐
│                    FastAPI 代理网关 :8000                            │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    协议适配层                                 │   │
│  │                                                             │   │
│  │  ┌─────────────────────┐    ┌─────────────────────────┐     │   │
│  │  │  OpenAI 兼容接口     │    │ Anthropic 兼容接口       │     │   │
│  │  │                     │    │                         │     │   │
│  │  │ POST /v1/chat/      │◄──►│ POST /v1/messages       │     │   │
│  │  │ completions          │    │ GET  /v1/models         │     │   │
│  │  │ GET  /v1/models      │    │ POST /v1/messages/      │     │   │
│  │  │                     │    │ count_tokens            │     │   │
│  │  └──────────┬──────────┘    └───────────┬─────────────┘     │   │
│  │             │                           │                    │   │
│  │             └───────────┬───────────────┘                    │   │
│  │                         ▼                                    │   │
│  │              anthropic_adapter.py                             │   │
│  │         (双向格式转换 + 模型映射 + 截断)                       │   │
│  └─────────────────────────┬───────────────────────────────────┘   │
│                            │                                       │
│  ┌─────────────────────────▼───────────────────────────────────┐   │
│  │                    核心引擎层                                 │   │
│  │                                                             │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │   │
│  │  │ Admission    │  │ LoadBalancer │  │ NvidiaProxy      │  │   │
│  │  │ Control      │→ │ (策略选择)   │→ │ (请求转发+重试)  │  │   │
│  │  │ (准入控制)   │  │              │  │                  │  │   │
│  │  └──────────────┘  └──────┬───────┘  └────────┬─────────┘  │   │
│  │                           │                   │             │   │
│  │                           ▼                   │             │   │
│  │                 ┌─────────────────┐           │             │   │
│  │                 │    Key Pool     │◄──────────┘             │   │
│  │                 │  (滑动窗口计数)  │                          │   │
│  │                 │  ┌───┬───┬───┐  │                          │   │
│  │                 │  │K1 │K2 │K3 │  │                          │   │
│  │                 │  │35 │封禁│38 │  │                          │   │
│  │                 │  └───┴───┴───┘  │                          │   │
│  │                 └─────────────────┘                          │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    管理与监控层                               │   │
│  │                                                             │   │
│  │  ┌──────────────┐  ┌──────────────┐  ┌──────────────────┐  │   │
│  │  │ /dashboard   │  │ /api/stats/* │  │ /api/models/*    │  │   │
│  │  │ (Web 面板)   │  │ (统计分析)   │  │ (模型管理)       │  │   │
│  │  └──────────────┘  └──────────────┘  └──────────────────┘  │   │
│  └─────────────────────────────────────────────────────────────┘   │
│                                                                     │
│  ┌─────────────────────────────────────────────────────────────┐   │
│  │                    持久化层                                   │   │
│  │  SQLite + WriteBuffer (异步批量写入)                         │   │
│  └─────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────┘
                              │
                              │ HTTPS
                              ▼
               ┌─────────────────────────────┐
               │   NVIDIA NIM API            │
               │   integrate.api.nvidia.com  │
               └─────────────────────────────┘

核心特性详解

1. 双协议兼容引擎

这是本项目的核心竞争力:一个代理服务同时支持 OpenAI 和 Anthropic 两种主流 API 格式,客户端无需修改任何代码即可接入。

工作原理

客户端请求 → 协议检测 → 格式转换 → 统一处理 → 响应回转 → 返回客户端

协议检测机制:通过 anthropic-version 请求头自动识别客户端类型

客户端类型 识别方式 目标端点 转换器
OpenAI SDK 无特殊 header /v1/chat/completions 无需转换
Anthropic SDK anthropic-version: 2023-06-01 /v1/messages anthropic_adapter

OpenAI 接口使用示例

# 标准 Chat Completions(流式)
curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{
    "model": "meta/llama-3.1-70b-instruct",
    "messages": [{"role": "user", "content": "你好,请介绍一下自己"}],
    "stream": true,
    "temperature": 0.7
  }'

# 响应格式(标准 OpenAI SSE)
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"你好"}}]}
data: {"id":"chatcmpl-xxx","choices":[{"delta":{"content":"!"}}]}
data: [DONE]

Anthropic 接口使用示例

# Messages API(完全兼容 Claude SDK)
curl http://localhost:8000/v1/messages \
  -H "Content-Type: application/json" \
  -H "anthropic-version: 2023-06-01" \
  -H "x-api-key: any-string" \
  -d '{
    "model": "claude-sonnet-4-6",
    "max_tokens": 1024,
    "system": "你是一个有帮助的助手",
    "messages": [
      {"role": "user", "content": "你好,请介绍一下自己"}
    ]
  }'

# 响应格式(标准 Anthropic JSON)
{
  "id": "msg_xxx",
  "type": "message",
  "role": "assistant",
  "content": [{"type": "text", "text": "你好!我是..."}],
  "model": "claude-sonnet-4-6",
  "stop_reason": "end_turn"
}

Anthropic 特有功能支持矩阵

功能 支持状态 配置示例
Thinking(扩展思考) ✅ 完整支持 "thinking": {"type": "enabled", "budget_tokens": 1024}
Tool Use(工具调用) ✅ 完整支持 "tools": [...], "tool_choice": {"type": "auto"}
System Prompt ✅ 多格式 字符串或 [{type: "text", text: "..."}] 数组
流式输出(SSE) ✅ 事件流转换 content_block_start/delta/stop 事件
消息截断 ✅ 自动管理 超出上下文窗口时按策略截断
Token 计数 ✅ 估算接口 POST /v1/messages/count_tokens
错误格式转换 ✅ 自动映射 NVIDIA 错误 → Anthropic 错误结构
模型名称映射 ✅ 可配置 claude-sonnet-4-6qwen/qwen3.5-397b-a17b

模型映射配置

config.yaml 中自定义 Anthropic 到 NVIDIA 的模型映射关系:

anthropic:
  default_model: "claude-haiku-4-5-20251001"
  
  # 模型名称映射表
  model_mapping:
    "claude-opus-4-7": "moonshotai/kimi-k2.5"
    "claude-opus-4-6": "moonshotai/kimi-k2.5"
    "claude-sonnet-4-6": "qwen/qwen3.5-397b-a17b"
    "claude-haiku-4-5-20251001": "moonshotai/kimi-k2-instruct-0905"
  
  # 各模型的上下文窗口大小
  context_windows:
    "claude-opus-4-7": 1000000
    "claude-opus-4-6": 1000000
    "claude-sonnet-4-6": 256000
    "claude-haiku-4-5-20251001": 200000
  
  # 截断策略
  truncation_strategy: "recent"        # recent: 保留最近 | middle: 从中间截断
  truncation_buffer_ratio: 0.10         # 预留 10% 给输出 token
  
  # Thinking 模式
  enable_thinking: true
  default_thinking_budget: 1024
  
  # Tool Choice
  enable_tool_choice: true

实际效果演示

# 使用 Anthropic Python SDK 直接连接本地代理
import anthropic

client = anthropic.Anthropic(
    base_url="http://localhost:8000",  # 指向本地代理
    api_key="any-string",               # 随意填写,代理会忽略
)

message = client.messages.create(
    model="claude-sonnet-4-6",          # 会自动映射为 qwen/qwen3.5-397b-a17b
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "Hello!"}
    ],
)

print(message.content[0].text)          # 正常输出
print(message.model)                    # "claude-sonnet-4-6" (回映原始模型名)

2. 智能负载均衡系统

三种均衡策略对比

策略 算法原理 适用场景 推荐度
most_remaining 选择当前剩余配额最多的 Key 最大化吞吐量 ⭐⭐⭐ 推荐默认
round_robin 严格轮询,依次分配 均匀分散请求 ⭐⭐
least_used 选择历史累计使用最少的 Key 冷启动/新 Key

滑动窗口限流机制

每个 Key 维护一个 60 秒滑动窗口,精确追踪实时 RPM 使用量:

class APIKey:
    def __init__(self, key: str, rpm_limit: int = 40):
        self.key = key
        self.rpm_limit = rpm_limit
        self._timestamps: deque = deque()  # 存储过去 60s 的请求时间戳
    
    @property
    def remaining(self) -> int:
        """计算剩余配额"""
        self._clean_old_timestamps()
        return self.rpm_limit - len(self._timestamps)
    
    def _clean_old_timestamps(self):
        """清理过期时间戳(超过 60 秒的)"""
        cutoff = time.time() - 60
        while self._timestamps and self._timestamps[0] < cutoff:
            self._timestamps.popleft()
    
    def record_request(self):
        """记录一次请求"""
        self._timestamps.append(time.time())

时间线示意

Key A (limit=40):  |●●●●●●●●●●|●●●●●●●●●●|●●●●●●●●●●|░░░░░░░░░░|
                    ↑ t=0s      ↑ t=20s     ↑ t=40s     ↑ t=60s
                    当前: 30次   当前: 30次   当前: 30次   过期清理后: 0次
                    剩余: 10     剩余: 10     剩余: 10     剩余: 40

自动故障转移流程

当某个 Key 触发 429 限流时:

请求到达 → 选择 Key A → 发送请求 → 收到 429 错误
                                        ↓
                              ┌─────────────────┐
                              │ 封禁 Key A 60s  │
                              │ 更新状态为 banned│
                              └────────┬────────┘
                                       ↓
                              Full Jitter 退避
                              random(0, min(2^attempt, 8)) 秒
                                       ↓
                              选择下一个可用 Key B → 重试请求
                                       ↓
                              成功?→ 返回结果
                              失败?→ 重复上述流程(最多 max_retries 次)

健康检查后台任务

独立协程周期性检测所有 Key 的健康状态:

async def health_check_loop():
    while True:
        for key in key_pool.keys:
            if key.status == "banned":
                # 尝试解禁:发送测试请求
                success = await test_key(key.key)
                if success:
                    key.unban()
            elif key.consecutive_failures >= 3:
                # 连续失败多次,主动检测
                await test_key(key.key)
        
        await asyncio.sleep(30)  # 每 30 秒检查一次

3. 前置准入控制 (Admission Control)

这是防止 429 雪崩效应 的关键设计。在高并发场景下,如果所有请求同时涌向 NVIDIA,即使有多 Key 也可能瞬间触发全部限流。

问题场景

无准入控制:
100 个并发请求 → 同时占用 100 个配额 → 所有 Key 瞬间超限 → 全部返回 429 → 客户端重试风暴 → 雪崩

解决方案:原子性预扣配额

在请求发出之前就锁定配额,确保总并发量不超过所有 Key 的总配额:

def try_acquire(self, select_fn=None) -> Optional[APIKey]:
    """
    原子性地获取一个可用 Key 并预扣配额
    使用锁保证并发安全
    """
    with self._acquire_lock:  # 互斥锁
        available = [k for k in self.keys if k.is_available()]
        
        if not available:
            return None  # 无可用 Key
        
        selected = select_fn(available)  # 应用选择策略
        selected.pre_acquire()  # 预扣配额(原子操作)
        return selected

两种模式对比

模式 行为 适用场景 资源消耗
reject_fast 配额不足立即返回 HTTP 429 高并发 / API 调用 ⚡ 零等待
queue_wait 智能排队等待配额释放 交互式聊天 / 低延迟要求 🔄 占用协程

reject_fast 模式(推荐高并发):

balancer:
  admission_mode: "reject_fast"
  # 效果:429 响应包含 Retry-After header,客户端可据此自动重试

queue_wait 模式(推荐聊天场景):

balancer:
  admission_mode: "queue_wait"
  queue_wait_timeout: 30  # 最大等待 30 秒
  # 效果:用户无感知,请求会自动等待直到有配额

4. 可视化监控面板

访问 http://localhost:8000/dashboard 即可打开内置监控面板。

面板功能一览

模块 展示内容 更新频率
📈 请求趋势图 过去 N 小时的请求量、Token 消耗、错误率 实时轮询
🥧 模型分布饼图 各模型调用量占比、Token 消耗占比 实时
📊 Key 状态柱状图 各 Key 的 Token 使用对比、RPM 利用率 实时
📋 请求记录表 最近 50 条完整调用记录(含延迟、Token、状态) 实时
⚡ 性能指标卡片 TTFT P50/P95、Tokens/s P50/P95、总请求数 实时
🔑 Key 池概览 每个 Key 的实时 RPM、健康状态、累计统计 实时

性能指标说明

指标 英文全称 含义 优化目标
TTFT Time To First Token 从发请求到收到第一个 Token 的延迟 越低越好 (< 1s)
Tokens/s Tokens per Second 文本生成速度 越高越好 (> 30)
P50 50th Percentile 中位数(典型值) 参考基准
P95 95th Percentile 95 分位(尾部情况) 关注长尾

统计数据持久化

所有统计数据通过 WriteBuffer 异步批量写入 SQLite,保证:

  • ✅ 服务重启不丢失历史数据
  • ✅ 高频写入不影响主线程性能
  • ✅ 支持查询最长 24 小时的时序数据
class WriteBuffer:
    """异步批量写入缓冲区"""
    
    def __init__(self, engine, flush_interval=5, batch_size=100):
        self._buffer = []
        self._flush_interval = flush_interval   # 每 5 秒刷写一次
        self._batch_size = batch_size           # 或累积 100 条时刷写
    
    async def add(self, record: RequestRecord):
        self._buffer.append(record)
        if len(self._buffer) >= self._batch_size:
            await self.flush()
    
    async def flush(self):
        """批量写入数据库"""
        if not self._buffer:
            return
        batch = self._buffer.copy()
        self._buffer.clear()
        # 使用 SQLAlchemy bulk_insert_mappings 高效插入
        await self._engine.execute(...)

5. 模型管理平台

除了作为代理转发请求,本服务还提供了完整的 模型生命周期管理 功能。

管理接口清单

操作 方法 路径 说明
查看所有模型 GET /api/models 按厂商分组展示
启用模型 POST /api/models/{id}/enable 加入可用列表
禁用模型 POST /api/models/{id}/disable 从可用列表移除
切换状态 POST /api/models/{id}/toggle 快速启用/禁用
全部启用 POST /api/models/enable-all 一键启用所有
全部禁用 POST /api/models/disable-all 仅保留默认模型
刷新列表 POST /api/models/fetch 从 NVIDIA 重新拉取

使用场景

# 场景 1:只保留常用的几个模型,减少列表长度
curl -X POST http://localhost:8000/api/models/disable-all
curl -X POST http://localhost:8000/api/models/meta/llama-3.1-70b-instruct/enable

# 场景 2:发现 NVIDIA 上架了新模型,刷新列表
curl -X POST http://localhost:8000/api/models/fetch

# 场景 3:查看当前启用的模型及统计
curl http://localhost:8000/api/models | jq .
# 输出:
# {
#   "stats": {"total": 156, "enabled": 12, "disabled": 144},
#   "groups": {
#     "meta": [{...}, {...}],
#     "nvidia": [{...}],
#     "moonshotai": [{...}],
#     "qwen": [{...}]
#   }
# }

自动拉取配置

models:
  auto_fetch: true  # 启动时自动从 NVIDIA 拉取最新模型列表
  fallback_list:    # 如果拉取失败,使用这些备用模型
    - "meta/llama-3.1-70b-instruct"
    - "meta/llama-3.1-405b-instruct"
    - "nvidia/llama-3.1-nemotron-70b-instruct"

快速上手

环境准备

# 要求 Python >= 3.10
python --version  # 确保 >= 3.10

# 克隆项目
git clone https://github.com/Xiaoma716/NVIDIA-NIM.git
cd NVIDIA-NIM

# 创建虚拟环境(推荐)
python -m venv venv

# Windows
venv\Scripts\activate

# Linux/macOS
source venv/bin/activate

# 安装依赖
pip install -r requirements.txt

配置文件

复制模板并编辑:

cp config.example.yaml config.yaml

最小可用配置(只需填入 API Key):

keys:
  - key: "nvapi-你的第一个key"
  - key: "nvapi-你的第二个key"   # 可选,多个 Key 实现负载均衡

nvidia:
  base_url: "https://integrate.api.nvidia.com/v1"
  default_model: "meta/llama-3.1-70b-instruct"
  rpm_limit: 40

server:
  port: 8000

💡 其他配置项都有合理默认值,可以先跑起来再逐步调优。

启动服务

python main.py

看到以下输出说明启动成功:

2024-xx-xx 12:00:00 | INFO     | 配置加载成功
2024-xx-xx 12:00:00 | INFO     | Key Pool 初始化完成: 3 个 Key 可用
2024-xx-xx 12:00:00 | INFO     | 模型列表加载完成: 156 个模型 (12 个已启用)
2024-xx-xx 12:00:00 | INFO     | 数据库初始化完成
2024-xx-xx 12:00:00 | INFO     | Uvicorn running on http://0.0.0.0:8000

验证运行

# 1. 健康检查
curl http://localhost:8000/health
# 预期输出:{"status": "healthy", "summary": {"available_keys": 3, ...}}

# 2. OpenAI 格式测试
curl http://localhost:8000/v1/chat/completions \
  -H "Content-Type: application/json" \
  -d '{"model": "meta/llama-3.1-70b-instruct", "messages": [{"role": "user", "content": "hi"}]}'

# 3. Anthropic 格式测试
curl http://localhost:8000/v1/messages \
  -H "Content-Type: application/json" \
  -H "anthropic-version: 2023-06-01" \
  -d '{"model": "claude-haiku-4-5-20251001", "max_tokens": 64, "messages": [{"role": "user", "content": "hi"}]}'

# 4. 打开监控面板
# 浏览器访问: http://localhost:8000/dashboard

接入你的应用

方式一:OpenAI 兼容客户端

客户端 配置位置 Base URL 填写
ChatBox 设置 → API 地址 http://localhost:8000/v1
LobeChat 设置 → OpenAI API http://localhost:8000/v1
NextChat 设置 → API 地址 http://localhost:8000/v1
自定义 Python openai.OpenAI(base_url="...") http://localhost:8000/v1

方式二:Anthropic 兼容客户端

客户端 配置方式
Claude Code CLI export ANTHROPIC_BASE_URL=http://localhost:8000
Claude Desktop 在 config.json 中设置 apiBaseUrl
Python SDK anthropic.Anthropic(base_url="http://localhost:8000")
TypeScript SDK new Anthropic({baseURL: "http://localhost:8000"})

技术实现深挖

项目目录结构

NVIDIA-NIM/
├── main.py                      # 启动入口,含配置校验
├── config.example.yaml          # 配置模板
├── requirements.txt             # 依赖清单
│
├── core/                        # 🔧 核心逻辑层
│   ├── config.py                #   单例配置管理,支持点号路径访问
│   ├── key_pool.py              #   Key 池:滑动窗口 + 状态机 + 健康检查
│   ├── balancer.py              #   负载均衡器:3 种策略 + 准入控制
│   ├── proxy.py                 #   请求转发:重试 + 流式 + Token 提取
│   ├── model_manager.py         #   模型管理:CRUD + 缓存 + 分组
│   ├── stats_manager.py         #   统计引擎:时序 + Token + 性能指标
│   ├── database.py              #   SQLite ORM 初始化
│   ├── write_buffer.py          #   异步批量写入缓冲区
│   └── anthropic_adapter.py     #   ★ 双协议转换核心
│
├── api/                         # 🌐 HTTP 接口层
│   ├── router.py                #   路由定义(FastAPI Depends 注入)
│   └── dashboard.py             #   监控面板 HTML(内嵌 ECharts)
│
├── static/
│   └── dashboard.html           # 静态资源
│
├── tests/                       # 🧪 测试
│   ├── test_core.py
│   └── test_prompt_token_leak.py
│
├── data/                        # 💾 运行时数据(SQLite)
└── logs/                        # 📝 日志文件

关键技术点

1. Full Jitter 退避算法

避免重试风暴的关键:

@staticmethod
def _backoff(attempt: int, base: float = 1.0, cap: float = 8.0) -> float:
    """
    Full Jitter: sleep = random(0, min(cap, base * 2^attempt))
    
    相比固定退避或指数退避,Full Jitter 能有效分散重试时间,
    降低多个客户端同时重试导致碰撞的概率。
    """
    exp = base * (2 ** attempt)      # 指数增长: 1, 2, 4, 8...
    capped = min(exp, cap)           # 上限封顶: 最大 8 秒
    return random.uniform(0, capped) # 随机抖动: [0, capped]
    
# 重试时间序列示例:
# attempt=0: random(0, 1)   ≈ 0.3s
# attempt=1: random(0, 2)   ≈ 1.2s
# attempt=2: random(0, 4)   ≈ 2.7s
# attempt=3: random(0, 8)   ≈ 5.1s

2. Anthropic ↔ OpenAI 格式转换核心

这是整个双协议兼容的核心模块,负责处理两种 API 格式的所有差异:

# 请求转换:Anthropic → OpenAI
def convert_request(anthropic_req: dict) -> dict:
    """将 Anthropic Messages API 请求转换为 OpenAI Chat Completions 格式"""
    openai_messages = []
    
    # 处理 system prompt(Anthropic 独有字段)
    if "system" in anthropic_req:
        system_content = _convert_system_content(anthropic_req["system"])
        openai_messages.append({"role": "system", "content": system_content})
    
    # 转换消息(处理 tool_result、thinking 等特殊块)
    for msg in anthropic_req["messages"]:
        converted = convert_message(msg)
        openai_messages.append(converted)
    
    # 映射模型名称
    nvidia_model = map_model_to_nvidia(anthropic_req["model"])
    
    return {
        "model": nvidia_model,
        "messages": openai_messages,
        "temperature": anthropic_req.get("temperature"),
        "max_tokens": anthropic_req.get("max_tokens", 4096),
        "stream": anthropic_req.get("stream", False),
        # ... 其他字段映射
    }

# 响应转换:OpenAI → Anthropic
def convert_response(openai_resp: dict, original_model: str) -> dict:
    """将 OpenAI 响应转换回 Anthropic 格式"""
    return {
        "id": generate_msg_id(),
        "type": "message",
        "role": "assistant",
        "content": [
            {
                "type": "text",
                "text": openai_resp["choices"][0]["message"]["content"]
            }
        ],
        "model": original_model,  # 回映原始 Anthropic 模型名
        "stop_reason": map_stop_reason(openai_resp["choices"][0].get("finish_reason")),
        "usage": {
            "input_tokens": openai_resp["usage"]["prompt_tokens"],
            "output_tokens": openai_resp["usage"]["completion_tokens"],
        }
    }

3. 流式响应的双向转换

SSE 流式输出的转换是最复杂的部分:

async def convert_stream(
    openai_stream: AsyncGenerator, 
    original_model: str
) -> AsyncGenerator[str, None]:
    """
    将 OpenAI SSE 流转换为 Anthropic SSE 事件流
    
    OpenAI 事件格式:
      data: {"choices":[{"delta":{"content":"Hello"}}]}
    
    Anthropic 事件格式:
      event: content_block_start
      data: {"index":0,"content_block":{"type":"text","text":""}}
      
      event: content_block_delta
      data: {"index":{},"delta":{"type":"text_delta","text":"Hello"}}
      
      event: content_block_stop
      data: {"index":0}
      
      event: message_start
      event: message_delta
      event: message_stop
    """
    
    # 发送 message_start 事件
    yield _make_sse_event("message_start", {...})
    
    # 发送 content_block_start 事件
    yield _make_sse_event("content_block_start", {
        "index": 0,
        "content_block": {"type": "text", "text": ""}
    })
    
    # 转换每个 chunk
    async for chunk in openai_stream:
        delta = chunk["choices"][0].get("delta", {})
        content = delta.get("content", "")
        
        if content:
            # 发送 content_block_delta 事件
            yield _make_sse_event("content_block_delta", {
                "index": 0,
                "delta": {"type": "text_delta", "text": content}
            })
    
    # 发送结束事件
    yield _make_sse_event("content_block_stop", {"index": 0})
    yield _make_sse_event("message_delta", {"stop_reason": "end_turn"})
    yield _make_sse_event("message_stop", {})

4. 消息截断策略

当消息总长度超过模型的上下文窗口时,自动截断:

def truncate_messages(
    messages: List[Dict],
    max_context_tokens: int,
    buffer_ratio: float = 0.10,  # 预留 10%
    strategy: str = "recent"       # 截断策略
) -> tuple:
    """
    截断消息以适应上下文窗口
    
    Args:
        messages: 消息列表
        max_context_tokens: 上下文窗口大小
        buffer_ratio: 为输出预留的比例
        strategy: "recent" 保留最近的消息
                  "middle"  从中间删除旧消息
    
    Returns:
        (截断后的消息, 是否被截断, 移除的 token 数)
    """
    effective_limit = int(max_context_tokens * (1 - buffer_ratio))
    
    current_tokens = estimate_tokens_for_messages(messages)
    
    if current_tokens <= effective_limit:
        return messages, False, 0
    
    if strategy == "recent":
        # 从开头移除旧消息,保留最近的
        while messages and estimate_tokens_for_messages(messages) > effective_limit:
            removed = messages.pop(0)
            
    elif strategy == "middle":
        # 保留 system + 最近几条,删除中间的历史
        pass  # 实现略...
    
    removed_tokens = current_tokens - estimate_tokens_for_messages(messages)
    return messages, True, removed_tokens

技术栈总览

层级 技术选型 选型理由
Web 框架 FastAPI + Uvicorn 异步高性能,自动生成 API 文档
HTTP 客户端 httpx (HTTP/2 支持) 异步原生,连接池复用
AI SDK OpenAI Python SDK 统一接口抽象,简化 NVIDIA API 调用
ORM SQLAlchemy 2.0 + aiosqlite 异步 ORM + 异步 SQLite 驱动
日志 Loguru 开箱即用的结构化日志,支持轮转
配置 PyYAML 人类可读的配置格式
前端 ECharts (内嵌 HTML) 零依赖,功能丰富的数据可视化

性能指标

在实际部署中,本服务的性能表现如下(基于 5 个 Key、most_remaining 策略):

指标 典型值 说明
吞吐量提升 5x (5 Keys) 线性扩展,N 个 Key ≈ N 倍吞吐
TTFT 中位数 (P50) 300-800ms 取决于模型大小和网络延迟
TTFT 95 分位 (P95) 1-3s 包含重试和 Key 切换的情况
生成速度 30-80 tokens/s 取决于具体模型
代理额外开销 < 5ms 本地转发几乎无延迟
内存占用 ~50MB (空闲) 轻量级,适合个人服务器
Key 切换耗时 < 1ms 内存中的策略选择

与直接调用的对比

对比项 直接调用 NVIDIA API 通过本代理
单 Key RPM 上限 40 40(相同)
总可用 RPM 40 40 × N(N 个 Key)
429 处理 客户端自行处理 自动切换 + 重试
协议支持 仅 OpenAI 格式 OpenAI + Anthropic
监控能力 完整 Dashboard
统计数据 持久化存储
模型管理 手动输入 自动拉取 + 启停控制

总结与展望

项目价值

本项目的核心价值在于:

  1. 突破限制:通过多 Key 池化,线性扩展 API 调用能力
  2. 双协议透明代理:一套服务同时服务 OpenAI 和 Anthropic 生态
  3. 生产级可靠性:前置准入控制、自动故障转移、健康检查
  4. 可观测性:完整的监控面板和性能指标
  5. 易用性:零代码改动即可接入现有应用

适用场景

  • ✅ 个人学习与研究(免费调用大模型)
  • ✅ 小团队内部工具(多人共享 API 配额)
  • ✅ 聊天机器人后端(高并发场景)
  • ✅ 批量文本处理(数据分析、翻译等)
  • ✅ Claude Code / ChatGPT 替代方案(本地代理)

未来规划

  • 支持更多 API 协议(如 Google Gemini 格式)
  • 分布式部署模式(多实例协同)
  • Web UI 配置管理界面
  • Docker 一键部署
  • Key 用量告警与通知
  • 请求日志的全文搜索

开源信息

项目 信息
GitHub MaZhenyu-Dev/NVIDIA-NIM
License MIT
Python >= 3.10
维护者 @MaZhenyu-Dev

Star 历史

如果这个项目对你有帮助,欢迎给一个 ⭐ 支持一下!

你的 Star 是我持续更新的动力 ⭐

免责声明

⚠️ 重要提示:本项目为个人学习与研究目的开发,仅供本地使用。

  • 使用者需自行承担使用 NVIDIA API 所产生的一切费用与责任
  • 本项目与 NVIDIA 公司无任何关联
  • 请遵守 NVIDIA API 的使用条款和服务协议
  • 本项目不提供任何形式的服务级别保证(SLA)

最后更新:2026 年 5 月 | 版本:v2.2

评论区