fix(proxy): exclude cache_read and cache_creation from input on Claude←OpenAI paths

Builds on #2774 (which fixed cache_read for the streaming openai_chat path).
Two gaps remained, both double-counting cache tokens when a Claude client
meters as app_type="claude" (input_includes_cache_read=false):

1. cache_read was still added to input on the non-streaming openai_chat path
   (transform.rs openai_to_anthropic) and the whole openai_responses family
   (transform_responses.rs build_anthropic_usage_from_responses, covering the
   non-streaming call site and both streaming_responses call sites).

2. cache_creation was never subtracted on any converted path, including the
   streaming openai_chat path #2774 had already touched. Claude billing treats
   cache_creation as a separate bucket, so an inclusive upstream carrying a
   direct cache_creation_input_tokens field billed it twice.

All four metering points now compute:
  input = prompt_tokens - cache_read - cache_creation
restoring the invariant input + cache_read + cache_creation == prompt_tokens.
Pure OpenAI upstreams are unaffected (no cache_creation concept/field).

Tests: update direct-cache assertions (40->20), add a streaming conservation
regression test, and pin prompt<cache underflow (saturating clamp to 0) for all
three metering functions. cargo test 1573 pass, clippy clean.

Note: fix is forward-only; historical rows are not recomputed (cost is frozen at
log time and app_type="claude" mixes native + converted rows).
This commit is contained in:
Jason
2026-06-09 21:39:09 +08:00
Unverified
parent 36a103bbe4
commit cb01593f7d
3 changed files with 189 additions and 22 deletions
+93 -7
View File
@@ -100,9 +100,14 @@ struct ToolBlockState {
const INFINITE_WHITESPACE_THRESHOLD: usize = 500;
fn build_anthropic_usage_json(usage: &Usage) -> Value {
// OpenAI prompt_tokens 含缓存,Anthropic input_tokens 不含,需减去
// OpenAI prompt_tokens 含缓存,Anthropic input_tokens 不含,需减去 cache_read 与 cache_creation
// (三桶互斥,恒等 input + cache_read + cache_creation == prompt_tokens)。
let cached = extract_cache_read_tokens(usage).unwrap_or(0);
let input_tokens = usage.prompt_tokens.saturating_sub(cached);
let cache_creation = usage.cache_creation_input_tokens.unwrap_or(0);
let input_tokens = usage
.prompt_tokens
.saturating_sub(cached)
.saturating_sub(cache_creation);
let mut usage_json = json!({
"input_tokens": input_tokens,
"output_tokens": usage.completion_tokens
@@ -110,8 +115,8 @@ fn build_anthropic_usage_json(usage: &Usage) -> Value {
if cached > 0 {
usage_json["cache_read_input_tokens"] = json!(cached);
}
if let Some(created) = usage.cache_creation_input_tokens {
usage_json["cache_creation_input_tokens"] = json!(created);
if cache_creation > 0 {
usage_json["cache_creation_input_tokens"] = json!(cache_creation);
}
usage_json
}
@@ -227,13 +232,19 @@ pub fn create_anthropic_sse_stream<E: std::error::Error + Send + 'static>(
});
if let Some(u) = &chunk.usage {
let cached = extract_cache_read_tokens(u).unwrap_or(0);
let input = u.prompt_tokens.saturating_sub(cached);
let cache_creation =
u.cache_creation_input_tokens.unwrap_or(0);
let input = u
.prompt_tokens
.saturating_sub(cached)
.saturating_sub(cache_creation);
start_usage["input_tokens"] = json!(input);
if cached > 0 {
start_usage["cache_read_input_tokens"] = json!(cached);
}
if let Some(created) = u.cache_creation_input_tokens {
start_usage["cache_creation_input_tokens"] = json!(created);
if cache_creation > 0 {
start_usage["cache_creation_input_tokens"] =
json!(cache_creation);
}
}
@@ -1043,6 +1054,81 @@ mod tests {
);
}
#[tokio::test]
async fn test_usage_chunk_subtracts_cache_read_and_creation_from_input() {
// prompt_tokens(1000) 含 cache_read(600) 与 cache_creation(300);转 Anthropic 后
// input 应为 fresh,守恒:input(100) + cache_read(600) + cache_creation(300) == prompt(1000)。
let input = concat!(
"data: {\"id\":\"chatcmpl_cc\",\"model\":\"glm-5.1\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"tool-1\",\"type\":\"function\",\"function\":{\"name\":\"Bash\",\"arguments\":\"{\\\"command\\\":\\\"pwd\\\"}\"}}]}}]}\n\n",
"data: {\"id\":\"chatcmpl_cc\",\"model\":\"glm-5.1\",\"choices\":[{\"delta\":{},\"finish_reason\":\"tool_calls\"}]}\n\n",
"data: {\"choices\":[],\"usage\":{\"prompt_tokens\":1000,\"completion_tokens\":50,\"prompt_tokens_details\":{\"cached_tokens\":600},\"cache_creation_input_tokens\":300}}\n\n",
"data: [DONE]\n\n"
);
let events = collect_anthropic_events(input).await;
let message_delta = events
.iter()
.find(|event| event_type(event) == Some("message_delta"))
.expect("should emit message_delta with usage");
// fresh input = 1000 - 600 - 300 = 100
assert_eq!(
message_delta
.pointer("/usage/input_tokens")
.and_then(|v| v.as_u64()),
Some(100)
);
assert_eq!(
message_delta
.pointer("/usage/cache_read_input_tokens")
.and_then(|v| v.as_u64()),
Some(600)
);
assert_eq!(
message_delta
.pointer("/usage/cache_creation_input_tokens")
.and_then(|v| v.as_u64()),
Some(300)
);
}
#[tokio::test]
async fn test_usage_chunk_clamps_input_to_zero_when_cache_exceeds_prompt() {
// prompt(100) < cache_read(80)+cache_creation(50)=130saturating 钳到 0,防下溢。
// 钉桩:阻止未来把 saturating_sub 误改成普通减法(debug panic / release wrap)。
let input = concat!(
"data: {\"id\":\"chatcmpl_uf\",\"model\":\"glm-5.1\",\"choices\":[{\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"tool-1\",\"type\":\"function\",\"function\":{\"name\":\"Bash\",\"arguments\":\"{\\\"command\\\":\\\"pwd\\\"}\"}}]}}]}\n\n",
"data: {\"id\":\"chatcmpl_uf\",\"model\":\"glm-5.1\",\"choices\":[{\"delta\":{},\"finish_reason\":\"tool_calls\"}]}\n\n",
"data: {\"choices\":[],\"usage\":{\"prompt_tokens\":100,\"completion_tokens\":50,\"prompt_tokens_details\":{\"cached_tokens\":80},\"cache_creation_input_tokens\":50}}\n\n",
"data: [DONE]\n\n"
);
let events = collect_anthropic_events(input).await;
let message_delta = events
.iter()
.find(|event| event_type(event) == Some("message_delta"))
.expect("should emit message_delta with usage");
assert_eq!(
message_delta
.pointer("/usage/input_tokens")
.and_then(|v| v.as_u64()),
Some(0)
);
assert_eq!(
message_delta
.pointer("/usage/cache_read_input_tokens")
.and_then(|v| v.as_u64()),
Some(80)
);
assert_eq!(
message_delta
.pointer("/usage/cache_creation_input_tokens")
.and_then(|v| v.as_u64()),
Some(50)
);
}
#[tokio::test]
async fn test_message_delta_includes_zero_usage_when_stream_has_no_usage() {
let input = concat!(
+55 -13
View File
@@ -644,10 +644,31 @@ pub fn openai_to_anthropic(body: Value) -> Result<Value, ProxyError> {
// usage — map cache tokens from OpenAI format to Anthropic format
let usage = body.get("usage").cloned().unwrap_or(json!({}));
// OpenAI prompt_tokens 含缓存命中,Anthropic input_tokens 不含 → 减去 cache_read 与
// cache_creation,使 input 成为 fresh input。本路径以 app_type="claude" 记账(calculator
// 不再扣减),若不减则缓存会被计入 input 与各 cache 桶两次。三桶互斥,恒等:
// input + cache_read + cache_creation == prompt_tokensinclusive 上游)。
// 与流式 build_anthropic_usage_json (#2774) 及 transform_gemini 的 saturating_sub 对称。
// 最终 cache_read:直传字段优先于 nestedcache_creation 仅来自直传字段(OpenAI 无此概念)。
let cached = usage
.get("cache_read_input_tokens")
.and_then(|v| v.as_u64())
.or_else(|| {
usage
.pointer("/prompt_tokens_details/cached_tokens")
.and_then(|v| v.as_u64())
})
.unwrap_or(0);
let cache_creation = usage
.get("cache_creation_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let input_tokens = usage
.get("prompt_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0) as u32;
.unwrap_or(0)
.saturating_sub(cached)
.saturating_sub(cache_creation) as u32;
let output_tokens = usage
.get("completion_tokens")
.and_then(|v| v.as_u64())
@@ -658,19 +679,11 @@ pub fn openai_to_anthropic(body: Value) -> Result<Value, ProxyError> {
"output_tokens": output_tokens
});
// OpenAI standard: prompt_tokens_details.cached_tokens
if let Some(cached) = usage
.pointer("/prompt_tokens_details/cached_tokens")
.and_then(|v| v.as_u64())
{
if cached > 0 {
usage_json["cache_read_input_tokens"] = json!(cached);
}
// Some compatible servers return these fields directly
if let Some(v) = usage.get("cache_read_input_tokens") {
usage_json["cache_read_input_tokens"] = v.clone();
}
if let Some(v) = usage.get("cache_creation_input_tokens") {
usage_json["cache_creation_input_tokens"] = v.clone();
if cache_creation > 0 {
usage_json["cache_creation_input_tokens"] = json!(cache_creation);
}
let result = json!({
@@ -1314,7 +1327,8 @@ mod tests {
});
let result = openai_to_anthropic(input).unwrap();
assert_eq!(result["usage"]["input_tokens"], 100);
// prompt_tokens(100) 含 cached(80),转换后 input 应为 fresh = 100 - 80 = 20
assert_eq!(result["usage"]["input_tokens"], 20);
assert_eq!(result["usage"]["output_tokens"], 50);
assert_eq!(result["usage"]["cache_read_input_tokens"], 80);
}
@@ -1338,10 +1352,38 @@ mod tests {
});
let result = openai_to_anthropic(input).unwrap();
// cache_read(60)+cache_creation(20) 均从 prompt(100) 扣除,fresh = 100 - 60 - 20 = 20
// 守恒:input(20) + cache_read(60) + cache_creation(20) == prompt(100)
assert_eq!(result["usage"]["input_tokens"], 20);
assert_eq!(result["usage"]["cache_read_input_tokens"], 60);
assert_eq!(result["usage"]["cache_creation_input_tokens"], 20);
}
#[test]
fn test_openai_to_anthropic_clamps_input_when_cache_exceeds_prompt() {
// prompt(100) < cache_read(60)+cache_creation(50)=110saturating 钳到 0,防下溢。
// 钉桩:阻止未来把 saturating_sub 误改成普通减法(debug panic / release wrap)。
let input = json!({
"id": "chatcmpl-uf",
"model": "gpt-4",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": "x"},
"finish_reason": "stop"
}],
"usage": {
"prompt_tokens": 100,
"completion_tokens": 10,
"cache_read_input_tokens": 60,
"cache_creation_input_tokens": 50
}
});
let result = openai_to_anthropic(input).unwrap();
assert_eq!(result["usage"]["input_tokens"], 0);
assert_eq!(result["usage"]["cache_read_input_tokens"], 60);
assert_eq!(result["usage"]["cache_creation_input_tokens"], 50);
}
#[test]
fn test_openai_to_anthropic_finish_reason_content_filter_maps_end_turn() {
let input = json!({
@@ -355,6 +355,23 @@ pub(crate) fn build_anthropic_usage_from_responses(usage: Option<&Value>) -> Val
result["cache_creation_input_tokens"] = v.clone();
}
// OpenAI/Responses 的 input(prompt_tokens/input_tokens)含缓存命中,Anthropic input_tokens 不含
// → 减去 cache_read 与 cache_creation,使其成为 fresh input。本函数在计量意义上是 claude 专属
// Codex Responses 透传走 from_codex_response_*,不调用本函数),故可安全在此扣减。三桶互斥,
// 恒等:input + cache_read + cache_creation == 上游 input(inclusive)。与 build_anthropic_usage_json
// (#2774) 及 transform_gemini 的 saturating_sub 对称;一处同时覆盖非流式与流式(streaming_responses)。
let cached = result
.get("cache_read_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
let cache_creation = result
.get("cache_creation_input_tokens")
.and_then(|v| v.as_u64())
.unwrap_or(0);
if cached > 0 || cache_creation > 0 {
result["input_tokens"] = json!(input.saturating_sub(cached).saturating_sub(cache_creation));
}
result
}
@@ -1156,7 +1173,8 @@ mod tests {
});
let result = responses_to_anthropic(input).unwrap();
assert_eq!(result["usage"]["input_tokens"], 100);
// input_tokens(100) 含 cached(80),转换后 input 应为 fresh = 100 - 80 = 20
assert_eq!(result["usage"]["input_tokens"], 20);
assert_eq!(result["usage"]["output_tokens"], 50);
assert_eq!(result["usage"]["cache_read_input_tokens"], 80);
}
@@ -1180,6 +1198,9 @@ mod tests {
});
let result = responses_to_anthropic(input).unwrap();
// cache_read(60)+cache_creation(20) 均从 input(100) 扣除,fresh = 100 - 60 - 20 = 20
// 守恒:input(20) + cache_read(60) + cache_creation(20) == 上游 input(100)
assert_eq!(result["usage"]["input_tokens"], 20);
assert_eq!(result["usage"]["cache_read_input_tokens"], 60);
assert_eq!(result["usage"]["cache_creation_input_tokens"], 20);
}
@@ -1642,7 +1663,8 @@ mod tests {
"cached_tokens": 80
}
})));
assert_eq!(result["input_tokens"], json!(100));
// input_tokens(100) 含 nested cached(80),转换后 input 应为 fresh = 100 - 80 = 20
assert_eq!(result["input_tokens"], json!(20));
assert_eq!(result["output_tokens"], json!(50));
assert_eq!(result["cache_read_input_tokens"], json!(80));
}
@@ -1657,9 +1679,26 @@ mod tests {
},
"cache_read_input_tokens": 100
})));
// 直传 cache_read(100) 优先于 nested(80)input(100) - 100 = 0fresh
assert_eq!(result["input_tokens"], json!(0));
assert_eq!(result["cache_read_input_tokens"], json!(100)); // Direct field overrides nested
}
#[test]
fn test_build_usage_clamps_input_when_cache_exceeds_input() {
// input(100) < cache_read(60)+cache_creation(50)=110saturating 钳到 0,防下溢。
// 钉桩:阻止未来把 saturating_sub 误改成普通减法(debug panic / release wrap)。
let result = build_anthropic_usage_from_responses(Some(&json!({
"input_tokens": 100,
"output_tokens": 10,
"cache_read_input_tokens": 60,
"cache_creation_input_tokens": 50
})));
assert_eq!(result["input_tokens"], json!(0));
assert_eq!(result["cache_read_input_tokens"], json!(60));
assert_eq!(result["cache_creation_input_tokens"], json!(50));
}
#[test]
fn test_build_usage_cache_tokens_without_input_output() {
let result = build_anthropic_usage_from_responses(Some(&json!({