fix(proxy): correct usage accounting on format-conversion paths

Audited all proxy format-conversion paths (Chat<->Message, Chat<->Response,
Gemini<->Message) for usage/cache metering. Five issues found and fixed.
The dedup mechanism (request_id PK, proxy/session source isolation) is
untouched, so no double-counting is introduced.

- A (Claude + openai_chat, streaming): inject stream_options.include_usage
  so OpenAI-compatible upstreams emit usage in the SSE tail. Without it the
  converted Anthropic message_delta was all-zero and the whole request's
  input/output/cache was dropped. Same root cause as the already-fixed
  Codex Chat path; the injection is extracted into a shared helper
  (transform::inject_openai_stream_include_usage) reused by both paths.

- C (Claude + gemini_native): subtract cachedContentTokenCount from
  input_tokens in build_anthropic_usage so input becomes fresh input
  (Anthropic semantics). Previously the cache-hit tokens were billed twice
  because this path meters as app_type="claude" (input_includes_cache_read
  = false) while Gemini's promptTokenCount includes the cache.

- D (Codex + openai_chat, streaming): gate log_usage on
  has_billable_tokens() to skip the synthetic all-zero usage the converter
  emits when a non-compliant upstream omits usage, preventing empty-row
  request-count inflation.

- P2 (from_claude_stream_events): use has_billable_tokens() for the return
  gate instead of input>0||output>0, so a fully-cached streamed request
  (cache_read>0, input==output==0) is still recorded. Affects all
  Claude-streaming paths, not just Gemini.

- P3 (Codex Chat->Responses, non-streaming): apply the same
  has_billable_tokens() filter the streaming branch got, since the
  synthesized all-zero usage makes from_codex_response return Some and
  bypass the `if let Some` guard.

Add TokenUsage::has_billable_tokens() as the unified predicate. New tests
cover include_usage injection, gemini input subtraction, the gate itself,
cache-only stream recording, and synthetic all-zero codex usage.
Full lib suite: 1569 passed.
This commit is contained in:
Jason
2026-06-09 13:15:13 +08:00
Unverified
parent 05bc14e82b
commit 36a103bbe4
6 changed files with 188 additions and 25 deletions
+16 -1
View File
@@ -765,6 +765,15 @@ async fn handle_codex_chat_to_responses_transform(
move |events, first_token_ms| {
let usage =
TokenUsage::from_codex_stream_events_auto(&events).unwrap_or_default();
// 上游遵守 OpenAI 语义省略 usage 时,Chat→Responses 转换器会合成一个
// 全 0 的 response.completedfrom_codex_response 对 input/output 字段
// 存在(哪怕=0)即返回 Some。缺 nonzero 闸门会让全 0 usage 也被写入:
// message_id=None → dedup_request_id 退化为随机 UUID,无法去重,每笔
// 请求插入一条无意义空行、虚增请求数。对齐 Claude transform handler 的 skip。
if !usage.has_billable_tokens() {
log::debug!("[Codex] 流式响应 usage 全 0 或缺失,跳过消费记录");
return;
}
let model = usage.model.clone().unwrap_or_else(|| request_model.clone());
let latency_ms = start_time.elapsed().as_millis() as u64;
@@ -844,7 +853,13 @@ async fn handle_codex_chat_to_responses_transform(
.record_response(&responses_response)
.await;
if let Some(usage) = TokenUsage::from_codex_response_auto(&responses_response) {
// 上游非流式 Chat 省略 usage 时,chat_usage_to_responses_usage 会合成全 0 usage
// (transform_codex_chat.rs:1581)from_codex_response 对 input/output 字段存在(哪怕=0)
// 即返回 Some。用 has_billable_tokens 闸门跳过全 0,避免空行虚增请求数——与流式分支
// 及 Claude transform handler 的 skip 行为对齐。
if let Some(usage) = TokenUsage::from_codex_response_auto(&responses_response)
.filter(TokenUsage::has_billable_tokens)
{
let model = responses_response
.get("model")
.and_then(|m| m.as_str())
+41
View File
@@ -409,6 +409,10 @@ pub fn transform_claude_request_for_api_format(
{
result["prompt_cache_key"] = serde_json::json!(key);
}
// 流式请求必须注入 stream_options.include_usage,否则 OpenAI 兼容上游
// 不在 SSE 末尾吐 usage → 转换出的 Anthropic message_delta 全 0 →
// 整笔 input/output/cache 漏记(与 Codex Responses→Chat 路径同源)。
super::transform::inject_openai_stream_include_usage(&mut result);
Ok(result)
}
"gemini_native" => super::transform_gemini::anthropic_to_gemini_with_shadow(
@@ -1617,6 +1621,43 @@ mod tests {
assert!(transformed.get("max_output_tokens").is_some());
}
#[test]
fn test_transform_claude_request_openai_chat_streaming_injects_include_usage() {
let provider = create_provider(json!({
"env": { "ANTHROPIC_BASE_URL": "https://openrouter.ai/api/v1" }
}));
// 流式请求必须注入 stream_options.include_usage,否则 OpenAI 兼容上游不在
// SSE 末尾吐 usage → 转换出的 Anthropic message_delta 全 0 → 整笔 usage 漏记。
let body = json!({
"model": "moonshotai/kimi-k2",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128,
"stream": true
});
let transformed =
transform_claude_request_for_api_format(body, &provider, "openai_chat", None, None)
.unwrap();
assert_eq!(transformed["stream"], true);
assert_eq!(transformed["stream_options"]["include_usage"], true);
}
#[test]
fn test_transform_claude_request_openai_chat_non_streaming_omits_stream_options() {
let provider = create_provider(json!({
"env": { "ANTHROPIC_BASE_URL": "https://openrouter.ai/api/v1" }
}));
// 非流式请求不应注入 stream_optionsusage 在非流式响应体里恒有)。
let body = json!({
"model": "moonshotai/kimi-k2",
"messages": [{ "role": "user", "content": "hello" }],
"max_tokens": 128
});
let transformed =
transform_claude_request_for_api_format(body, &provider, "openai_chat", None, None)
.unwrap();
assert!(transformed.get("stream_options").is_none());
}
#[test]
fn test_transform_claude_request_for_codex_oauth_uses_session_cache_key() {
let provider = create_provider_with_meta(
@@ -225,6 +225,33 @@ pub fn anthropic_to_openai_with_reasoning_content(
Ok(result)
}
/// 为 OpenAI Chat Completions 流式请求注入 `stream_options.include_usage`。
///
/// OpenAI 兼容上游在流式下默认不在 SSE 里返回 usage,必须显式声明 include_usage
/// 才会在末尾吐 usage chunk。缺这一注入会导致流式请求的 token/成本/缓存全部漏记
/// input/output/cache 全为 0)。保留客户端可能透传的其它 stream_options 字段,
/// 仅补 include_usage;非流式请求不动。
///
/// 由 Claude→openai_chatclaude.rs)与 Codex Responses→Chattransform_codex_chat.rs
/// 两条转换路径共用,确保两个客户端方向行为一致。
pub(crate) fn inject_openai_stream_include_usage(result: &mut Value) {
let is_stream = result
.get("stream")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if !is_stream {
return;
}
match result.get_mut("stream_options") {
Some(Value::Object(opts)) => {
opts.insert("include_usage".to_string(), json!(true));
}
_ => {
result["stream_options"] = json!({ "include_usage": true });
}
}
}
/// Translate an Anthropic `tool_choice` into the OpenAI Chat Completions form.
///
/// Anthropic forms:
@@ -336,21 +336,8 @@ pub fn responses_to_chat_completions_with_reasoning(
// include_usage 才会在末尾吐 usage chunk。Codex CLI 用 Responses 协议、
// 自身不带 stream_options,缺这一注入会导致 kimi/MiniMax 等第三方流式请求的
// token/成本/缓存命中率全部漏记(input/output/cache 全为 0)。
let is_stream = result
.get("stream")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if is_stream {
match result.get_mut("stream_options") {
// 保留客户端可能透传的其它 stream_options 字段,仅补 include_usage。
Some(Value::Object(opts)) => {
opts.insert("include_usage".to_string(), json!(true));
}
_ => {
result["stream_options"] = json!({ "include_usage": true });
}
}
}
// 与 Claude→openai_chat 路径共用同一 helper,保证两个客户端方向一致。
super::transform::inject_openai_stream_include_usage(&mut result);
Ok(result)
}
@@ -1101,7 +1101,7 @@ pub(crate) fn build_anthropic_usage(usage: Option<&Value>) -> Value {
});
};
let input_tokens = usage
let prompt_tokens = usage
.get("promptTokenCount")
.and_then(|value| value.as_u64())
.unwrap_or(0);
@@ -1109,18 +1109,26 @@ pub(crate) fn build_anthropic_usage(usage: Option<&Value>) -> Value {
.get("totalTokenCount")
.and_then(|value| value.as_u64())
.unwrap_or(0);
let output_tokens = total_tokens.saturating_sub(input_tokens);
let cached_tokens = usage
.get("cachedContentTokenCount")
.and_then(|value| value.as_u64())
.unwrap_or(0);
// Gemini 的 promptTokenCount 含缓存命中(cachedContentTokenCount);而 Anthropic
// 语义下 input_tokens 必须是不含 cache 的 fresh input、cache_read 单列。本路径转成
// Anthropic 后以 app_type=claude 记账,calculator 对 claude 设 input_includes_cache_read
// =false 不再从 input 扣 cache,因此这里必须先扣减,否则缓存 token 会被双重计费
// (一次按完整 input 价、一次按 cache_read 价)。output 仍按 total-prompt 计算
// (prompt 是总输入,扣减只作用于 input/cache 的拆分,不影响 output)。
let input_tokens = prompt_tokens.saturating_sub(cached_tokens);
let output_tokens = total_tokens.saturating_sub(prompt_tokens);
let mut result = json!({
"input_tokens": input_tokens,
"output_tokens": output_tokens
});
if let Some(cached) = usage
.get("cachedContentTokenCount")
.and_then(|value| value.as_u64())
{
result["cache_read_input_tokens"] = json!(cached);
if cached_tokens > 0 {
result["cache_read_input_tokens"] = json!(cached_tokens);
}
result
@@ -1370,7 +1378,11 @@ mod tests {
assert_eq!(result["content"][0]["type"], "text");
assert_eq!(result["content"][0]["text"], "Hello from Gemini");
assert_eq!(result["stop_reason"], "end_turn");
assert_eq!(result["usage"]["input_tokens"], 12);
// input_tokens = promptTokenCount(12) - cachedContentTokenCount(3) = 9fresh input)。
// Gemini 的 promptTokenCount 含缓存命中,但 Anthropic 语义要求 input 不含 cache、
// cache_read 单列;二者相加(9+3)=总输入 12。扣减避免本路径以 app_type=claude
// 记账时把缓存 token 双重计费。
assert_eq!(result["usage"]["input_tokens"], 9);
assert_eq!(result["usage"]["output_tokens"], 8);
assert_eq!(result["usage"]["cache_read_input_tokens"], 3);
}
+82 -1
View File
@@ -37,6 +37,18 @@ impl TokenUsage {
.map(|mid| format!("{SESSION_REQUEST_ID_PREFIX}{mid}"))
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string())
}
/// 是否产生了任一计费维度的 token。
///
/// 用于在写入前过滤全 0 的空 usage:当 OpenAI 兼容上游在流式下省略 usage 时,
/// 转换器会合成一个全 0 的终止事件,若无 message_id 则 `dedup_request_id`
/// 退化为随机 UUID,导致每笔请求插入一条无意义的空行、虚增请求数。
pub fn has_billable_tokens(&self) -> bool {
self.input_tokens > 0
|| self.output_tokens > 0
|| self.cache_read_tokens > 0
|| self.cache_creation_tokens > 0
}
}
/// API 类型
@@ -185,7 +197,11 @@ impl TokenUsage {
}
}
if usage.input_tokens > 0 || usage.output_tokens > 0 {
// 用 has_billable_tokens 而非仅看 input/output:完全缓存命中、无输出的流式请求
// input==0 && output==0 但 cache_read>0)是真实的 cache-read 计费,必须保留。
// Gemini→Anthropic 路径在 input 改为 fresh(promptTokenCount - cachedContentTokenCount)
// 后尤其会出现这种全缓存场景;旧 gate 会把它当成"无 usage"丢弃。
if usage.has_billable_tokens() {
usage.model = model;
usage.message_id = message_id;
Some(usage)
@@ -522,6 +538,71 @@ mod tests {
assert_eq!(usage.model, Some("claude-sonnet-4-20250514".to_string()));
}
#[test]
fn test_has_billable_tokens_gates_empty_usage() {
// 全 0 usage(如上游省略 usage 时合成的全 0 终止事件)不应计费——
// 这是 Codex 流式空行多记修复(D)的闸门依据。
assert!(!TokenUsage::default().has_billable_tokens());
// 仅有 cache_read 也属于真实计费 token,必须计入。
let only_cache = TokenUsage {
cache_read_tokens: 100,
..Default::default()
};
assert!(only_cache.has_billable_tokens());
let normal = TokenUsage {
input_tokens: 10,
output_tokens: 5,
..Default::default()
};
assert!(normal.has_billable_tokens());
}
#[test]
fn test_claude_stream_cache_only_request_is_recorded() {
// P2 回归:完全缓存命中、无输出的流式请求(input==0 && output==0 但 cache_read>0
// 是真实计费,必须保留——旧 gate `input>0 || output>0` 会把它丢弃。
let events = vec![
json!({
"type": "message_start",
"message": {
"id": "msg_cacheonly",
"model": "claude-opus-4-8",
"usage": {
"input_tokens": 0,
"cache_read_input_tokens": 50000,
"cache_creation_input_tokens": 0
}
}
}),
json!({
"type": "message_delta",
"usage": { "output_tokens": 0 }
}),
];
let usage = TokenUsage::from_claude_stream_events(&events)
.expect("cache-only 流式请求必须被记录,不能被 input/output gate 丢弃");
assert_eq!(usage.input_tokens, 0);
assert_eq!(usage.output_tokens, 0);
assert_eq!(usage.cache_read_tokens, 50000);
assert_eq!(usage.message_id, Some("msg_cacheonly".to_string()));
}
#[test]
fn test_codex_response_auto_returns_some_for_synthetic_all_zero() {
// P3 回归:上游非流式 Chat 省略 usage 时转换器合成的全 0 usagefrom_codex_response_auto
// 仍返回 Some(字段存在、无 positivity check)——证明 handlers 必须用 has_billable_tokens
// 闸门才能挡住空行,单靠 `if let Some` 不够。
let synthetic = json!({
"usage": { "input_tokens": 0, "output_tokens": 0, "total_tokens": 0 }
});
let usage = TokenUsage::from_codex_response_auto(&synthetic)
.expect("全 0 usage 字段存在时 from_codex_response_auto 返回 Some");
assert!(
!usage.has_billable_tokens(),
"全 0 usage 必须被 has_billable_tokens 判为非计费,由 handlers 闸门跳过"
);
}
#[test]
fn test_claude_response_parsing_no_model() {
let response = json!({