From a6d718d0fccdff7832a9482237fc23a6a9ac806c Mon Sep 17 00:00:00 2001 From: Jason Date: Thu, 11 Jun 2026 08:29:21 +0800 Subject: [PATCH] fix(proxy): aggregate mislabeled SSE bodies in transform fallback (#2234) The Claude/Codex format-transform non-stream branch returned an opaque 422 "Failed to parse upstream response" whenever a 2xx upstream body was not valid JSON. The common case: MaaS gateways force-stream a stream:false request and return an SSE body with a non-SSE Content-Type, defeating the header-only is_sse() check. On serde failure, sniff for SSE and aggregate the chunks into a single JSON, then run the existing converter so clients still receive a valid non-stream response. - chat_sse_to_response_value: aggregate chat.completion.chunk SSE (content / reasoning / refusal / tool_calls / legacy function_call), tool_calls index-keyed via BTreeMap to avoid unbounded densification, first-wins finish_reason, message-snapshot override, completeness and error-event guards; synthesize an id when the upstream omits one - responses_sse_to_response_value: process the residual trailing block, tolerating truncation and skipping it once a completed event was seen - enrich remaining parse failures with content-type / content-encoding / body-snippet diagnostics - deflate: try zlib (RFC 9110) before raw; keep the content-encoding header for unsupported encodings - gate zero-usage rows on the Claude transform path --- src-tauri/src/proxy/handlers.rs | 1078 ++++++++++++++++++++- src-tauri/src/proxy/response_processor.rs | 66 +- 2 files changed, 1118 insertions(+), 26 deletions(-) diff --git a/src-tauri/src/proxy/handlers.rs b/src-tauri/src/proxy/handlers.rs index 9560d849a..c399f1ba0 100644 --- a/src-tauri/src/proxy/handlers.rs +++ b/src-tauri/src/proxy/handlers.rs @@ -16,6 +16,7 @@ use super::{ }, handler_context::RequestContext, providers::{ + codex_chat_common::extract_reasoning_field_text, codex_chat_history::record_responses_sse_stream, get_adapter, get_claude_api_format, streaming::create_anthropic_sse_stream, streaming_codex_chat::create_responses_sse_stream_from_chat_with_context, @@ -431,10 +432,39 @@ async fn handle_claude_transform( let upstream_response: Value = if aggregate_codex_oauth_responses_sse { responses_sse_to_response_value(&body_str)? } else { - serde_json::from_slice(&body_bytes).map_err(|e| { - log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}"); - ProxyError::TransformError(format!("Failed to parse upstream response: {e}")) - })? + match serde_json::from_slice(&body_bytes) { + Ok(value) => value, + // 兜底嗅探(#2234):部分网关对 stream:false 强制返回 SSE 体,却把 + // Content-Type 标成 application/json 等,is_sse() 的 header 检查失效。 + // 此时按 SSE 聚合成单个 JSON 再走既有非流转换器,客户端仍收到 + // Anthropic JSON,非流语义不变。gemini_native 暂无聚合器,落诊断错误。 + Err(_) if body_looks_like_sse(&body_str) && api_format != "gemini_native" => { + log::warn!( + "[Claude] 上游对非流请求返回未标记的 SSE 体(api_format={api_format}),按 SSE 聚合兜底" + ); + let aggregated = if api_format == "openai_responses" { + responses_sse_to_response_value(&body_str) + } else { + chat_sse_to_response_value(&body_str) + }; + // 聚合也失败时:保留全量 body 服务端日志,并给客户端错误附带同款 + // 现场诊断(content-type/body 摘要),否则命中嗅探臂的用户只拿到 + // 裸聚合错误、丢失非嗅探臂已有的诊断增强(C7) + aggregated.map_err(|e| { + log::error!("[Claude] SSE 聚合兜底失败: {e}, body: {body_str}"); + aggregate_fallback_error(e, &response_headers, &body_str) + })? + } + Err(e) => { + log::error!("[Claude] 解析上游响应失败: {e}, body: {body_str}"); + return Err(upstream_body_parse_error( + "Failed to parse upstream response", + &e, + &response_headers, + &body_str, + )); + } + } }; // 根据 api_format 选择非流式转换器 @@ -457,7 +487,11 @@ async fn handle_claude_transform( })?; // 记录使用量 - if let Some(usage) = TokenUsage::from_claude_response(&anthropic_response) { + // 全 0 usage 不落账(对齐 Codex 流式收集器的 skip):SSE 聚合兜底救回的流 + // 在上游缺 stream_options.include_usage 时没有 usage,写入只会产生无意义空行 + if let Some(usage) = + TokenUsage::from_claude_response(&anthropic_response).filter(|u| u.has_billable_tokens()) + { // 转换后的响应缺失/合成空 model 时,回退到映射后的出站模型(接管真值), // 再回退到客户端请求别名 let model = anthropic_response @@ -877,10 +911,28 @@ async fn handle_codex_chat_to_responses_transform( let (mut response_headers, status, body_bytes) = read_decoded_body(response, ctx.tag, body_timeout).await?; let body_str = String::from_utf8_lossy(&body_bytes); - let chat_response: Value = serde_json::from_slice(&body_bytes).map_err(|e| { - log::error!("[Codex] 解析 Chat 上游响应失败: {e}, body: {body_str}"); - ProxyError::TransformError(format!("Failed to parse upstream chat response: {e}")) - })?; + let chat_response: Value = match serde_json::from_slice(&body_bytes) { + Ok(value) => value, + // 与 Claude 侧 handle_claude_transform 对称的兜底嗅探(#2234): + // 上游对 stream:false 返回未标记 Content-Type 的 SSE 体时按 SSE 聚合。 + Err(_) if body_looks_like_sse(&body_str) => { + log::warn!("[Codex] 上游对非流请求返回未标记的 SSE 体,按 Chat SSE 聚合兜底"); + // 聚合也失败时:保留全量 body 服务端日志,并给客户端错误附带现场诊断(C7) + chat_sse_to_response_value(&body_str).map_err(|e| { + log::error!("[Codex] SSE 聚合兜底失败: {e}, body: {body_str}"); + aggregate_fallback_error(e, &response_headers, &body_str) + })? + } + Err(e) => { + log::error!("[Codex] 解析 Chat 上游响应失败: {e}, body: {body_str}"); + return Err(upstream_body_parse_error( + "Failed to parse upstream chat response", + &e, + &response_headers, + &body_str, + )); + } + }; let responses_response = transform_codex_chat::chat_completion_to_response_with_context( chat_response, &tool_context, @@ -1320,15 +1372,24 @@ fn should_use_claude_transform_streaming( /// 复用 `proxy::sse` 的 `take_sse_block`/`strip_sse_field`:`take_sse_block` 同时支持 /// `\n\n` 与 `\r\n\r\n` 两种分隔符,`strip_sse_field` 兼容带/不带空格的字段写法。 fn responses_sse_to_response_value(body: &str) -> Result { - let mut buffer = body.to_string(); + let mut buffer = body.trim_start_matches('\u{feff}').to_string(); let mut completed_response: Option = None; let mut output_items = Vec::new(); - while let Some(block) = take_sse_block(&mut buffer) { + // strict=false 用于残余尾块:截断的半截 JSON 忽略而非报错,避免破坏 + // 已聚合好的完整响应(codex_oauth 聚合路径也复用本函数) + let mut process_block = |block: &str, strict: bool| -> Result<(), ProxyError> { + // 残余尾块(strict=false)在已拿到 completed 后整体跳过——codex_oauth 聚合 + // 路径也复用本函数,已完成后再执行残余里的完整 response.failed/杂事件会把 + // 成功响应翻成 422(C8)。 + if !strict && completed_response.is_some() { + return Ok(()); + } let mut event_name = ""; let mut data_lines: Vec<&str> = Vec::new(); for line in block.lines() { + let line = line.trim_start(); if let Some(evt) = strip_sse_field(line, "event") { event_name = evt.trim(); } else if let Some(d) = strip_sse_field(line, "data") { @@ -1337,17 +1398,23 @@ fn responses_sse_to_response_value(body: &str) -> Result { } if data_lines.is_empty() { - continue; + return Ok(()); } let data_str = data_lines.join("\n"); if data_str.trim() == "[DONE]" { - continue; + return Ok(()); } - let data: Value = serde_json::from_str(&data_str).map_err(|e| { - ProxyError::TransformError(format!("Failed to parse upstream SSE event: {e}")) - })?; + let data: Value = match serde_json::from_str(&data_str) { + Ok(v) => v, + Err(_) if !strict => return Ok(()), + Err(e) => { + return Err(ProxyError::TransformError(format!( + "Failed to parse upstream SSE event: {e}" + ))) + } + }; match event_name { "response.output_item.done" => { @@ -1367,7 +1434,16 @@ fn responses_sse_to_response_value(body: &str) -> Result { } _ => {} } + Ok(()) + }; + + while let Some(block) = take_sse_block(&mut buffer) { + process_block(&block, true)?; } + // 最后一个事件后可能没有空行分隔(错标 SSE 兜底/非规范上游常见): + // 残余 buffer 当最后一块处理,否则尾部的 response.completed 会被丢掉。 + // 已完成时的跳过判定在闭包内(C8)。 + process_block(&buffer, false)?; let mut response = completed_response.ok_or_else(|| { ProxyError::TransformError("No response.completed event in upstream SSE".to_string()) @@ -1386,6 +1462,448 @@ fn responses_sse_to_response_value(body: &str) -> Result { Ok(response) } +/// 判断响应体是否"看起来像" SSE 文本(#2234 兜底嗅探)。 +/// +/// 仅在 JSON 解析已失败后调用:合法 JSON 不可能以这些前缀开头,误判面为零。 +/// 覆盖 SSE 规范的全部四种字段行;包含 ":" 是因为 OpenRouter 等会在流前发 +/// `: PROCESSING` 注释行。 +fn body_looks_like_sse(body: &str) -> bool { + let trimmed = body.trim_start_matches('\u{feff}').trim_start(); + ["data:", "event:", "id:", "retry:", ":"] + .iter() + .any(|prefix| trimmed.starts_with(prefix)) +} + +/// 构造带现场诊断的上游解析错误:附 content-type / content-encoding 与 body +/// 前缀摘要,让客户端收到的报错自带根因判别("data:"=错标 SSE、"<"=HTML +/// 拦截页、� 乱码=未解压二进制),不再依赖向用户索要服务端日志。 +fn upstream_body_parse_error( + prefix: &str, + err: &serde_json::Error, + headers: &axum::http::HeaderMap, + body: &str, +) -> ProxyError { + ProxyError::TransformError(format!( + "{prefix}: {err} {}", + body_diagnostics_suffix(headers, body) + )) +} + +/// SSE 聚合兜底失败时,给聚合器内部错误附加同款现场诊断(content-type/ +/// content-encoding/body 摘要),使命中 #2234 嗅探臂的客户端也拿到根因线索, +/// 而非仅 "No chat completion choices in upstream SSE" 这类无 header/body 的裸消息。 +fn aggregate_fallback_error( + err: ProxyError, + headers: &axum::http::HeaderMap, + body: &str, +) -> ProxyError { + let base = match &err { + ProxyError::TransformError(m) => m.clone(), + other => other.to_string(), + }; + ProxyError::TransformError(format!("{base} {}", body_diagnostics_suffix(headers, body))) +} + +/// 现场诊断后缀:content-type、content-encoding 与 body 前 120 字符摘要。 +fn body_diagnostics_suffix(headers: &axum::http::HeaderMap, body: &str) -> String { + let header_str = |name: &str| { + headers + .get(name) + .and_then(|v| v.to_str().ok()) + .unwrap_or("") + }; + format!( + "(content-type: {}; content-encoding: {}; body[..120]: '{}')", + header_str("content-type"), + header_str("content-encoding"), + body_snippet(body, 120), + ) +} + +/// 从 SSE chunk 的 error 字段提取可报告的错误消息。占位形状(空对象、空消息、 +/// false、空字符串等,常见于 OpenAI 兼容网关每 chunk 附带的 error 字段)返回 +/// None——不应据此判定整条流失败(否则会把成功流误杀成 422,C12/C2234 目标人群)。 +fn error_event_message(error: &Value) -> Option { + if let Some(msg) = error.get("message").and_then(|m| m.as_str()) { + return (!msg.is_empty()).then(|| msg.to_string()); + } + if let Some(s) = error.as_str() { + return (!s.is_empty()).then(|| s.to_string()); + } + None +} + +/// 取 body 前 `max_chars` 个字符的单行摘要:\r 丢弃、\n 折叠为字面 \n、 +/// 其余控制字符替换为 �,超长加省略号。 +fn body_snippet(body: &str, max_chars: usize) -> String { + let mut snippet = String::new(); + for c in body.chars().take(max_chars) { + match c { + '\n' => snippet.push_str("\\n"), + '\r' => {} + c if c.is_control() => snippet.push('\u{FFFD}'), + c => snippet.push(c), + } + } + if body.chars().nth(max_chars).is_some() { + snippet.push('…'); + } + snippet +} + +/// 解析单个 SSE 块的 event 名与 data 负载(多行 data 按规范以 \n 连接)。 +/// 行首允许前导空白后再匹配字段名——与 body_looks_like_sse 的 trim 宽容度对齐, +/// 否则缩进的 ` data:` 行被嗅探接受却在此静默丢失(C4)。返回 None 表示无 data 行。 +fn sse_block_parts(block: &str) -> Option<(String, String)> { + let mut event_name = String::new(); + let mut data_lines: Vec<&str> = Vec::new(); + for line in block.lines() { + let line = line.trim_start(); + if let Some(evt) = strip_sse_field(line, "event") { + event_name = evt.trim().to_string(); + } else if let Some(d) = strip_sse_field(line, "data") { + data_lines.push(d); + } + } + (!data_lines.is_empty()).then(|| (event_name, data_lines.join("\n"))) +} + +/// 把 Chat Completions 流式 SSE 聚合为单个 chat.completion JSON(#2234 兜底)。 +/// +/// 专供非流式分支使用:上游对 stream:false 返回了 SSE 体但 Content-Type 没标 +/// text/event-stream,header 检查(is_sse)失效。聚合后喂给既有非流转换器 +/// (Claude 侧 openai_to_anthropic、Codex 侧 chat_completion_to_response_with_context), +/// 客户端拿到的仍是合法 JSON,非流语义不变。 +/// 增量合并语义与 providers/streaming.rs 对齐:tool_calls 按 delta.index 定位, +/// id/name 出现即覆盖、arguments 字符串拼接;reasoning 各形态(reasoning_content / +/// reasoning / reasoning_details)经 codex_chat_common 公共提取器并入同一累加器; +/// finish_reason 首个非 null 即锁定(kimi-k2.6 会在 tool_use 后再发带 +/// finish_reason 的尾块,见 streaming.rs)。 +fn chat_sse_to_response_value(body: &str) -> Result { + // 剥 BOM:嗅探器接受 BOM 开头,但 strip_sse_field 按行首精确匹配, + // 不剥会让首个 data 行静默丢失 + let mut buffer = body.trim_start_matches('\u{feff}').to_string(); + + let mut id = Value::Null; + let mut created = Value::Null; + let mut model = Value::Null; + let mut content = String::new(); + let mut reasoning_content = String::new(); + // tool_calls 以 BTreeMap 按 index 聚合:上游可控的 index(u64)不会 densify + // 数组——旧的 `while len() <= index { push }` 写法遇到 index=4e9 会 OOM 整个 + // 进程(C1)。BTreeMap 既免去无界分配,又天然保持 index 有序输出。 + let mut tool_calls: std::collections::BTreeMap = + std::collections::BTreeMap::new(); + let mut finish_reason = Value::Null; + let mut usage = Value::Null; + let mut saw_choice = false; + let mut saw_done = false; + + // strict=false 用于残余尾块:截断的半截 JSON 忽略而非报错,与 + // responses_sse_to_response_value 的残余处理对称(C2),否则一个被掐断的 + // 尾块会把已聚合完整的响应误杀成 422。 + let mut process_event = + |event_name: &str, data_str: &str, strict: bool| -> Result<(), ProxyError> { + let trimmed = data_str.trim(); + if trimmed == "[DONE]" { + saw_done = true; + return Ok(()); + } + if trimmed.is_empty() { + return Ok(()); + } + let chunk: Value = match serde_json::from_str(data_str) { + Ok(v) => v, + Err(_) if !strict => return Ok(()), + Err(e) => { + return Err(ProxyError::TransformError(format!( + "Failed to parse upstream SSE chunk: {e}" + ))) + } + }; + + // `event: error` 事件:错误由事件名标记,data 体未必有 error 键(直接是 + // 错误对象)。即便此前已聚合完整 choice 也要据此判失败,否则会把网关的 + // 配额/限流错误伪装成成功(C18)。 + if event_name.eq_ignore_ascii_case("error") { + let message = chunk + .get("error") + .and_then(error_event_message) + .or_else(|| error_event_message(&chunk)) + .unwrap_or_else(|| "upstream error event in SSE stream".to_string()); + return Err(ProxyError::TransformError(message)); + } + // 网关把错误作为普通 data chunk 下发({"error":{...}}):仅在 error 含 + // 可报告消息时判失败。空对象 / 空消息 / null / false 等占位形状(部分 + // OpenAI 兼容网关每 chunk 都带)不能据此误杀成功流(C12)。 + if let Some(message) = chunk + .get("error") + .filter(|e| !e.is_null()) + .and_then(error_event_message) + { + return Err(ProxyError::TransformError(message)); + } + + // 首个"有意义"的值锁定 envelope。Azure 的 content-filter 前置块带 + // ""/0 占位(streaming.rs 有同款空串守卫),不能让占位值冻结字段 + for (slot, key) in [ + (&mut id, "id"), + (&mut created, "created"), + (&mut model, "model"), + ] { + if slot.is_null() { + if let Some(v) = chunk.get(key).filter(|v| envelope_value_meaningful(v)) { + *slot = v.clone(); + } + } + } + // OpenAI 语义:usage 只在最终 chunk 非 null + if let Some(u) = chunk.get("usage").filter(|u| !u.is_null()) { + usage = u.clone(); + } + + // 代理上下文只存在单选择(n=1),仅聚合 index==0 的 choice + let Some(choice) = chunk + .get("choices") + .and_then(|c| c.as_array()) + .and_then(|arr| { + arr.iter() + .find(|ch| ch.get("index").and_then(|i| i.as_u64()).unwrap_or(0) == 0) + }) + else { + return Ok(()); + }; + + // "见过响应"的证据必须是 choice payload:metadata/usage-only chunk + + // [DONE] 的流(全程无 choice)若也算数,会绕过下方两道守卫、 + // 包装出空内容假成功 + saw_choice = true; + + // finish_reason 首个非 null 即锁定(对齐 streaming.rs 的 first-wins: + // 多 finish_reason 上游的尾块 "stop" 不能覆盖先到的 "tool_calls") + if finish_reason.is_null() { + if let Some(fr) = choice.get("finish_reason").filter(|v| !v.is_null()) { + finish_reason = fr.clone(); + } + } + // payload 选择:正常增量走 delta;但假流式中转会把完整 chat.completion + // 包成单事件(message 而非 delta),有的还附带空 delta:{}。delta 为空对象 + // 且存在 message 时改用 message 快照(覆盖此前累计的增量,防混合形态双计), + // 否则内容被静默丢弃、完成性守卫又被其 finish_reason 击穿 → 空内容假成功(C3)。 + let delta_nonempty = choice + .get("delta") + .and_then(|d| d.as_object()) + .is_some_and(|o| !o.is_empty()); + let (payload, is_full_message) = if delta_nonempty { + (choice.get("delta").unwrap(), false) + } else if let Some(message) = choice.get("message") { + (message, true) + } else if let Some(delta) = choice.get("delta") { + // 空 delta 且无 message:正常的纯 finish_reason 收尾块 + (delta, false) + } else { + return Ok(()); + }; + if is_full_message { + content.clear(); + reasoning_content.clear(); + tool_calls.clear(); + } + match payload.get("content") { + Some(Value::String(text)) => content.push_str(text), + Some(Value::Array(parts)) => { + for part in parts { + if let Some(text) = part.get("text").and_then(|t| t.as_str()) { + content.push_str(text); + } else if let Some(refusal) = part.get("refusal").and_then(|r| r.as_str()) { + content.push_str(refusal); + } + } + } + _ => {} + } + // refusal:OpenAI 官方拒绝形态(delta.refusal / message.refusal 字符串)。 + // 两个下游转换器都把 refusal 当可见内容,漏读会让拒绝响应变空消息假成功(C15)。 + if let Some(refusal) = payload.get("refusal").and_then(|r| r.as_str()) { + content.push_str(refusal); + } + // reasoning 字段穷举提取直接复用 codex_chat_common(reasoning_content > + // reasoning 字符串/对象 > reasoning_details),避免第三份手写实现漏档: + // MiMo/OpenRouter 等只发 reasoning_details 的 provider 否则会丢思考内容 + if let Some(text) = extract_reasoning_field_text(payload) { + reasoning_content.push_str(&text); + } + if let Some(deltas) = payload.get("tool_calls").and_then(|t| t.as_array()) { + for (pos, tc) in deltas.iter().enumerate() { + merge_tool_call_delta(&mut tool_calls, tc, pos); + } + } else if let Some(fc) = payload.get("function_call").filter(|v| !v.is_null()) { + // legacy function_call(2023 弃用但仍有中转回传)→ 当单个 tool_call。 + // 两个下游转换器都支持 function_call,漏读会让 finish_reason + // "function_call"→stop_reason "tool_use" 却零工具块、卡死 agent 循环(C17)。 + let synthetic = json!({ + "index": 0, + "id": fc.get("id").and_then(|v| v.as_str()).unwrap_or(""), + "type": "function", + "function": fc, + }); + merge_tool_call_delta(&mut tool_calls, &synthetic, 0); + } + Ok(()) + }; + + while let Some(block) = take_sse_block(&mut buffer) { + if let Some((event, data)) = sse_block_parts(&block) { + process_event(&event, &data, true)?; + } + } + // 最后一个事件后可能没有空行分隔(半截流/非规范上游):残余 buffer 当最后一块 + // 处理,strict=false 容忍被掐断的尾块(C2)。 + if let Some((event, data)) = sse_block_parts(&buffer) { + process_event(&event, &data, false)?; + } + + if !saw_choice { + return Err(ProxyError::TransformError( + "No chat completion choices in upstream SSE".to_string(), + )); + } + // 完成性守卫:close-delimited 响应的中途截断在字节层不可检测,缺少 + // finish_reason 与 [DONE] 两个完成证据时按截断处理,避免把半截内容 + // 包装成"看起来成功"的响应静默返回(比 422 更难诊断的失败形态)。 + if finish_reason.is_null() && !saw_done { + return Err(ProxyError::TransformError( + "Upstream SSE stream appears truncated (no finish_reason or [DONE] marker)".to_string(), + )); + } + + // tool_calls 终结化:全空壳(index 空洞或未收到任何字段)直接丢弃(避免幽灵 + // tool_use);缺 id/name 的按原始 index 回填合成值(对齐 streaming.rs 的 + // tool_call_{idx}/unknown_tool)——空 id 会破坏 Claude 的 tool_use_id ↔ + // tool_result 回程 + let tool_calls: Vec = tool_calls + .into_iter() + .filter(|(_, tc)| { + tc["id"].as_str().is_some_and(|s| !s.is_empty()) + || tc["function"]["name"] + .as_str() + .is_some_and(|s| !s.is_empty()) + || tc["function"]["arguments"] + .as_str() + .is_some_and(|s| !s.is_empty()) + }) + .map(|(index, mut tc)| { + if tc["id"].as_str().is_none_or(str::is_empty) { + tc["id"] = json!(format!("tool_call_{index}")); + } + if tc["function"]["name"].as_str().is_none_or(str::is_empty) { + tc["function"]["name"] = json!("unknown_tool"); + } + tc + }) + .collect(); + + let mut message = serde_json::Map::new(); + message.insert("role".to_string(), json!("assistant")); + message.insert("content".to_string(), json!(content)); + if !reasoning_content.is_empty() { + message.insert("reasoning_content".to_string(), json!(reasoning_content)); + } + if !tool_calls.is_empty() { + message.insert("tool_calls".to_string(), Value::Array(tool_calls)); + } + + // 上游未回传有效 id 时合成 UUID:留 null/"" 会让下游 dedup_request_id 退化为 + // 常量 "session:" 全局碰撞,INSERT OR REPLACE 静默覆盖前序 usage 行、少计成本(C9)。 + let id = if envelope_value_meaningful(&id) { + id + } else { + json!(uuid::Uuid::new_v4().to_string()) + }; + + let mut response = json!({ + "id": id, + "object": "chat.completion", + "created": created, + "model": model, + "choices": [{ + "index": 0, + "message": Value::Object(message), + "finish_reason": finish_reason, + }], + }); + if !usage.is_null() { + response["usage"] = usage; + } + Ok(response) +} + +/// envelope 字段是否"有意义":过滤 null、空串与数值 0(含浮点 0.0——Azure +/// content-filter 前置块的占位值),避免占位值抢先冻结 id/model/created。 +fn envelope_value_meaningful(v: &Value) -> bool { + match v { + Value::Null => false, + Value::String(s) => !s.is_empty(), + Value::Number(n) => n.as_f64() != Some(0.0), + _ => true, + } +} + +/// 合并单条 tool_calls 增量到按 index 聚合的 BTreeMap:OpenAI 流式把 id/name 放 +/// 首个增量、arguments 分片下发,按 delta.index 定位目标;缺 index 时退到所在数组 +/// 中的位置(message 形态的完整 tool_calls 常不带 index,按 0 会互相覆盖)。 +fn merge_tool_call_delta( + tool_calls: &mut std::collections::BTreeMap, + delta: &Value, + fallback_index: usize, +) { + let index = delta + .get("index") + .and_then(|i| i.as_u64()) + .map(|i| i as usize) + .unwrap_or(fallback_index); + let target = tool_calls.entry(index).or_insert_with(|| { + json!({ + "id": "", + "type": "function", + "function": {"name": "", "arguments": ""} + }) + }); + if let Some(v) = delta + .get("id") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + { + target["id"] = json!(v); + } + if let Some(func) = delta.get("function") { + if let Some(name) = func + .get("name") + .and_then(|v| v.as_str()) + .filter(|s| !s.is_empty()) + { + target["function"]["name"] = json!(name); + } + // arguments:string 直接拼接;object/array 序列化后拼接——非流 message + // 快照常把 arguments 作对象回传(OpenAI 兼容偏差),只认 string 会丢参数 + // 致工具空输入执行(C16) + match func.get("arguments") { + Some(Value::String(args)) => { + if let Some(existing) = target["function"]["arguments"].as_str() { + target["function"]["arguments"] = json!(format!("{existing}{args}")); + } + } + Some(v @ (Value::Object(_) | Value::Array(_))) => { + let serialized = serde_json::to_string(v).unwrap_or_default(); + if let Some(existing) = target["function"]["arguments"].as_str() { + target["function"]["arguments"] = json!(format!("{existing}{serialized}")); + } + } + _ => {} + } + } +} + // ============================================================================ // 使用量记录(保留用于 Claude 转换逻辑) // ============================================================================ @@ -1479,11 +1997,535 @@ async fn log_usage( #[cfg(test)] mod tests { use super::{ - codex_proxy_error_json, responses_sse_to_response_value, - should_use_claude_transform_streaming, + body_looks_like_sse, body_snippet, chat_sse_to_response_value, codex_proxy_error_json, + responses_sse_to_response_value, should_use_claude_transform_streaming, transform, + upstream_body_parse_error, }; use crate::proxy::ProxyError; + #[test] + fn body_looks_like_sse_detects_unlabeled_sse_prefixes() { + assert!(body_looks_like_sse("data: {\"id\":\"1\"}\n\n")); + assert!(body_looks_like_sse("event: message\ndata: {}\n\n")); + // SSE 规范的另两种字段行也可能打头 + assert!(body_looks_like_sse("id: 1\ndata: {}\n\n")); + assert!(body_looks_like_sse("retry: 3000\ndata: {}\n\n")); + // OpenRouter 会在流前发注释行 + assert!(body_looks_like_sse( + ": OPENROUTER PROCESSING\n\ndata: {}\n\n" + )); + // BOM + 前导空白 + assert!(body_looks_like_sse("\u{feff}\n data: {}\n\n")); + // HTML 拦截页与普通文本不应误判为 SSE + assert!(!body_looks_like_sse("blocked")); + assert!(!body_looks_like_sse("Bad Gateway")); + assert!(!body_looks_like_sse("")); + } + + #[test] + fn upstream_body_parse_error_carries_field_diagnostics() { + let mut headers = axum::http::HeaderMap::new(); + headers.insert("content-type", "text/html".parse().unwrap()); + headers.insert("content-encoding", "gzip".parse().unwrap()); + let parse_err = serde_json::from_str::("").unwrap_err(); + + let err = upstream_body_parse_error( + "Failed to parse upstream response", + &parse_err, + &headers, + "\nblocked", + ); + + match err { + ProxyError::TransformError(msg) => { + assert!(msg.contains("content-type: text/html"), "{msg}"); + assert!(msg.contains("content-encoding: gzip"), "{msg}"); + assert!(msg.contains("\\nblocked"), "{msg}"); + } + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn upstream_body_parse_error_marks_missing_headers() { + let headers = axum::http::HeaderMap::new(); + let parse_err = serde_json::from_str::("data:").unwrap_err(); + + let err = upstream_body_parse_error("x", &parse_err, &headers, "data: oops"); + + match err { + ProxyError::TransformError(msg) => { + assert!(msg.contains("content-type: "), "{msg}"); + assert!(msg.contains("content-encoding: "), "{msg}"); + } + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_collects_reasoning_alias() { + // OpenRouter/Kimi 用 reasoning(字符串),部分网关用对象形态 + let sse = "data: {\"id\":\"c1\",\"model\":\"kimi-k2.6\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning\":\"think\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning\":{\"content\":\"ing\"},\"content\":\"ok\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!( + response["choices"][0]["message"]["reasoning_content"], + "thinking" + ); + assert_eq!(response["choices"][0]["message"]["content"], "ok"); + } + + #[test] + fn chat_sse_to_response_value_collects_reasoning_details() { + // MiMo/OpenRouter 等只发 reasoning_details(数组形态)的 provider, + // 经公共提取器兜底,不能丢思考内容 + let sse = "data: {\"id\":\"c1\",\"model\":\"mimo\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_details\":[{\"type\":\"reasoning.text\",\"text\":\"think\"}]},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_details\":[{\"type\":\"reasoning.text\",\"text\":\"ing\"}],\"content\":\"ok\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!( + response["choices"][0]["message"]["reasoning_content"], + "thinking" + ); + assert_eq!(response["choices"][0]["message"]["content"], "ok"); + } + + #[test] + fn responses_sse_to_response_value_handles_missing_trailing_blank_line() { + // 错标 SSE 兜底/非规范上游:最后的 response.completed 后没有空行分隔 + let sse = "event: response.completed\n\ +data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_tail\",\"status\":\"completed\",\"model\":\"gpt-5.4\",\"output\":[],\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n"; + + let response = responses_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "resp_tail"); + } + + #[test] + fn responses_sse_to_response_value_ignores_truncated_trailing_block() { + // 截断的残余尾块不能破坏已聚合好的完整响应(codex_oauth 路径复用本函数) + let sse = "event: response.completed\n\ +data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_ok\",\"status\":\"completed\",\"model\":\"gpt-5.4\",\"output\":[],\"usage\":{\"input_tokens\":3,\"output_tokens\":1}}}\n\ +\n\ +event: response.extra\n\ +data: {\"type\":\"resp"; + + let response = responses_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "resp_ok"); + } + + #[test] + fn chat_sse_to_response_value_skips_azure_placeholder_envelope() { + // Azure content-filter 前置块带 ""/0 占位,不能冻结 envelope 字段 + let sse = "data: {\"id\":\"\",\"model\":\"\",\"created\":0,\"object\":\"\",\"choices\":[],\"prompt_filter_results\":[]}\n\n\ +data: {\"id\":\"chatcmpl-real\",\"model\":\"gpt-5.4\",\"created\":42,\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "chatcmpl-real"); + assert_eq!(response["model"], "gpt-5.4"); + assert_eq!(response["created"], 42); + } + + #[test] + fn chat_sse_to_response_value_tolerates_null_error_field() { + // one-api 系网关每个 chunk 都带 "error": null,不能误判为上游错误 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"error\":null,\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn chat_sse_to_response_value_first_finish_reason_wins() { + // kimi-k2.6 等会在 tool_use 后再发带 finish_reason 的尾块, + // 尾块 "stop" 不能覆盖先到的 "tool_calls"(对齐 streaming.rs first-wins) + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"function\":{\"name\":\"f\",\"arguments\":\"{}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["finish_reason"], "tool_calls"); + } + + #[test] + fn chat_sse_to_response_value_unwraps_message_shaped_fake_stream() { + // 假流式中转把完整 chat.completion 包成单个 SSE 事件(message 而非 delta) + let sse = "data: {\"id\":\"c1\",\"object\":\"chat.completion\",\"model\":\"m\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":\"full answer\"},\"finish_reason\":\"stop\"}]}\n\n\ +data: [DONE]\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "full answer"); + assert_eq!(response["choices"][0]["finish_reason"], "stop"); + } + + #[test] + fn chat_sse_to_response_value_message_snapshot_overrides_deltas() { + // 混合形态:先发增量再发完整 message 快照时,快照覆盖增量(防双计) + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"par\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":\"full\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "full"); + } + + #[test] + fn chat_sse_to_response_value_backfills_sparse_tool_call_ids() { + // index 空洞的空壳被丢弃;缺 id 的按原始 index 回填 tool_call_{idx} + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":1,\"function\":{\"name\":\"f2\",\"arguments\":\"{}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + let tool_calls = response["choices"][0]["message"]["tool_calls"] + .as_array() + .unwrap(); + assert_eq!(tool_calls.len(), 1, "index 0 的空壳应被丢弃"); + assert_eq!(tool_calls[0]["id"], "tool_call_1"); + assert_eq!(tool_calls[0]["function"]["name"], "f2"); + } + + #[test] + fn chat_sse_to_response_value_strips_bom_before_parsing() { + // 嗅探器接受 BOM,块解析也必须剥掉它,否则首个 data 行静默丢失 + let sse = "\u{feff}data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn body_snippet_sanitizes_controls_and_truncates() { + assert_eq!( + body_snippet("\r\nblocked\u{0}", 120), + "\\nblocked\u{FFFD}" + ); + let long = "a".repeat(200); + let snippet = body_snippet(&long, 120); + assert_eq!(snippet.chars().count(), 121); // 120 个字符 + 省略号 + assert!(snippet.ends_with('…')); + } + + #[test] + fn chat_sse_to_response_value_aggregates_text_finish_reason_and_usage() { + let sse = "data: {\"id\":\"chatcmpl-1\",\"object\":\"chat.completion.chunk\",\"created\":123,\"model\":\"gpt-5.4\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"Hel\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"chatcmpl-1\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"lo\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"chatcmpl-1\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":10,\"completion_tokens\":2,\"total_tokens\":12}}\n\n\ +data: [DONE]\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["id"], "chatcmpl-1"); + assert_eq!(response["object"], "chat.completion"); + assert_eq!(response["model"], "gpt-5.4"); + assert_eq!(response["choices"][0]["message"]["role"], "assistant"); + assert_eq!(response["choices"][0]["message"]["content"], "Hello"); + assert_eq!(response["choices"][0]["finish_reason"], "stop"); + assert_eq!(response["usage"]["prompt_tokens"], 10); + } + + #[test] + fn chat_sse_to_response_value_merges_tool_call_argument_fragments() { + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":\"\"}}]},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"{\\\"city\\\":\"}}]},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":0,\"function\":{\"arguments\":\"\\\"SF\\\"}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n\ +data: [DONE]\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + let tool_call = &response["choices"][0]["message"]["tool_calls"][0]; + assert_eq!(tool_call["id"], "call_1"); + assert_eq!(tool_call["function"]["name"], "get_weather"); + assert_eq!(tool_call["function"]["arguments"], "{\"city\":\"SF\"}"); + assert_eq!(response["choices"][0]["finish_reason"], "tool_calls"); + } + + #[test] + fn chat_sse_to_response_value_collects_reasoning_content() { + let sse = "data: {\"id\":\"c1\",\"model\":\"deepseek-r2\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"think\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"reasoning_content\":\"ing\",\"content\":\"ok\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!( + response["choices"][0]["message"]["reasoning_content"], + "thinking" + ); + assert_eq!(response["choices"][0]["message"]["content"], "ok"); + } + + #[test] + fn chat_sse_to_response_value_handles_missing_trailing_blank_line() { + // 非规范上游/半截流:最后一个事件后没有空行分隔 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn chat_sse_to_response_value_handles_crlf_delimiters() { + // 真实 HTTP SSE 按规范使用 \r\n\r\n 分隔事件 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":null}]}\r\n\ +\r\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}]}\r\n\ +\r\n\ +data: [DONE]\r\n\ +\r\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + assert_eq!(response["choices"][0]["finish_reason"], "stop"); + } + + #[test] + fn chat_sse_to_response_value_propagates_upstream_error_event() { + let sse = "data: {\"error\":{\"message\":\"rate limited by gateway\",\"code\":429}}\n\n"; + + let err = chat_sse_to_response_value(sse).unwrap_err(); + match err { + ProxyError::TransformError(msg) => assert!(msg.contains("rate limited by gateway")), + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_rejects_truncated_stream() { + // 只有内容增量、无 finish_reason 也无 [DONE]:close-delimited 截断不可 + // 在字节层检测,必须按截断报错而非静默返回半截内容 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"par\"},\"finish_reason\":null}]}\n\n"; + + let err = chat_sse_to_response_value(sse).unwrap_err(); + match err { + ProxyError::TransformError(msg) => assert!(msg.contains("truncated")), + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_accepts_done_marker_without_finish_reason() { + // 非规范上游可能不发 finish_reason 但正常收尾 [DONE]:视为完成 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":null}]}\n\n\ +data: [DONE]\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + assert_eq!( + response["choices"][0]["finish_reason"], + serde_json::Value::Null + ); + } + + #[test] + fn chat_sse_to_response_value_rejects_stream_without_chunks() { + let err = chat_sse_to_response_value(": keepalive\n\ndata: [DONE]\n\n").unwrap_err(); + match err { + ProxyError::TransformError(msg) => { + assert!(msg.contains("No chat completion choices")) + } + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_rejects_choiceless_stream_despite_done() { + // metadata/usage-only chunk + [DONE]、全程无 choice payload: + // 不能凭 [DONE] 包装成空内容假成功(saw_choice 必须以 choice 为证据) + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[],\"usage\":{\"prompt_tokens\":1,\"completion_tokens\":0,\"total_tokens\":1}}\n\n\ +data: [DONE]\n\n"; + + let err = chat_sse_to_response_value(sse).unwrap_err(); + match err { + ProxyError::TransformError(msg) => { + assert!(msg.contains("No chat completion choices"), "{msg}") + } + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_huge_tool_call_index_does_not_oom() { + // C1:上游可控的巨大 index 不得 densify 数组(旧实现会 OOM 整个进程); + // BTreeMap 只占一个槽,且原始 index 用于回填合成 id + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"tool_calls\":[{\"index\":4000000000,\"function\":{\"name\":\"f\",\"arguments\":\"{}\"}}]},\"finish_reason\":\"tool_calls\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + let tool_calls = response["choices"][0]["message"]["tool_calls"] + .as_array() + .unwrap(); + assert_eq!(tool_calls.len(), 1); + assert_eq!(tool_calls[0]["id"], "tool_call_4000000000"); + assert_eq!(tool_calls[0]["function"]["name"], "f"); + } + + #[test] + fn chat_sse_to_response_value_empty_delta_falls_back_to_message_snapshot() { + // C3:同一 choice 同时带空 delta:{} 与完整 message 快照——不能因 delta 键 + // 存在就短路到空 delta、丢掉 message 内容(finish_reason 还会击穿守卫) + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{},\"message\":{\"role\":\"assistant\",\"content\":\"full answer\"},\"finish_reason\":\"stop\"}]}\n\n\ +data: [DONE]\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["choices"][0]["message"]["content"], "full answer"); + assert_eq!(response["choices"][0]["finish_reason"], "stop"); + } + + #[test] + fn chat_sse_to_response_value_empty_delta_scaffold_does_not_wipe_real_content() { + // C3 反向陷阱:每个 chunk 都带真内容 delta + 空 message 壳时,不能让空 + // message 触发 clear 抹掉累计内容(delta 非空则优先 delta,不走快照覆盖) + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"message\":{},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"c1\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\" there\"},\"message\":{},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["choices"][0]["message"]["content"], "hi there"); + } + + #[test] + fn chat_sse_to_response_value_object_form_tool_arguments_preserved() { + // C16:message 快照里 arguments 作对象回传时序列化保留,不能丢成空输入 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"tool_calls\":[{\"id\":\"call_1\",\"type\":\"function\",\"function\":{\"name\":\"get_weather\",\"arguments\":{\"city\":\"SF\"}}}]},\"finish_reason\":\"tool_calls\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + let args = response["choices"][0]["message"]["tool_calls"][0]["function"]["arguments"] + .as_str() + .unwrap(); + let parsed: serde_json::Value = serde_json::from_str(args).unwrap(); + assert_eq!(parsed["city"], "SF"); + } + + #[test] + fn chat_sse_to_response_value_collects_refusal() { + // C15:delta.refusal 字符串并入可见内容,避免拒绝响应变空消息假成功 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"refusal\":\"I can't help with that.\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!( + response["choices"][0]["message"]["content"], + "I can't help with that." + ); + } + + #[test] + fn chat_sse_to_response_value_maps_legacy_function_call() { + // C17:legacy function_call → 单个 tool_call,避免 finish_reason + // function_call 映射成 tool_use 却零工具块卡死 agent + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"message\":{\"role\":\"assistant\",\"content\":null,\"function_call\":{\"name\":\"get_weather\",\"arguments\":\"{\\\"city\\\":\\\"SF\\\"}\"}},\"finish_reason\":\"function_call\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + let tc = &response["choices"][0]["message"]["tool_calls"][0]; + assert_eq!(tc["function"]["name"], "get_weather"); + assert_eq!(tc["function"]["arguments"], "{\"city\":\"SF\"}"); + } + + #[test] + fn chat_sse_to_response_value_event_error_fails_even_after_complete_choice() { + // C18:event:error(data 无 error 键)即便跟在完整 choice 后也判失败, + // 不能伪装成成功 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"partial\"},\"finish_reason\":\"stop\"}]}\n\n\ +event: error\n\ +data: {\"message\":\"insufficient_user_quota\",\"code\":429}\n\n"; + + let err = chat_sse_to_response_value(sse).unwrap_err(); + match err { + ProxyError::TransformError(msg) => { + assert!(msg.contains("insufficient_user_quota"), "{msg}") + } + other => panic!("expected TransformError, got {other:?}"), + } + } + + #[test] + fn chat_sse_to_response_value_tolerates_empty_error_placeholder() { + // C12:error 为空对象 / 空消息等占位形状不得误杀成功流 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"error\":{},\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn chat_sse_to_response_value_tolerates_truncated_residual_after_complete() { + // C2:完整 finish_reason 块后尾块被掐断(半截 JSON),不能误杀已完整的聚合 + let sse = "data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n\ +data: {\"usage\":{\"prompt_to"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn chat_sse_to_response_value_float_zero_does_not_freeze_envelope() { + // C14:浮点 0.0 占位的 created 不得冻结 envelope,真值应能覆盖 + let sse = "data: {\"id\":\"\",\"model\":\"\",\"created\":0.0,\"choices\":[]}\n\n\ +data: {\"id\":\"chatcmpl-real\",\"model\":\"m\",\"created\":42,\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["created"], 42); + assert_eq!(response["id"], "chatcmpl-real"); + } + + #[test] + fn chat_sse_to_response_value_synthesizes_id_when_absent() { + // C9:上游无 id 时合成非空唯一 id,避免下游 dedup 退化成常量碰撞覆盖 + let sse = "data: {\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let r1 = chat_sse_to_response_value(sse).unwrap(); + let r2 = chat_sse_to_response_value(sse).unwrap(); + let id1 = r1["id"].as_str().unwrap(); + let id2 = r2["id"].as_str().unwrap(); + assert!(!id1.is_empty()); + assert_ne!(id1, id2, "两次无 id 聚合应产出不同 id 以避免 dedup 碰撞"); + } + + #[test] + fn chat_sse_to_response_value_accepts_indented_data_lines() { + // C4:行首缩进的 data 行(嗅探器宽容接受)也应能被聚合,不静默丢失 + let sse = " data: {\"id\":\"c1\",\"model\":\"m\",\"choices\":[{\"index\":0,\"delta\":{\"content\":\"hi\"},\"finish_reason\":\"stop\"}]}\n\n"; + + let response = chat_sse_to_response_value(sse).unwrap(); + assert_eq!(response["choices"][0]["message"]["content"], "hi"); + } + + #[test] + fn responses_sse_completed_then_trailing_failed_keeps_success() { + // C8:已拿到 response.completed 后,残余里的完整 response.failed 不得翻车 + // (codex_oauth 聚合路径复用本函数,此前该尾块被忽略=成功) + let sse = "event: response.completed\n\ +data: {\"type\":\"response.completed\",\"response\":{\"id\":\"resp_ok\",\"status\":\"completed\",\"model\":\"gpt-5.4\",\"output\":[]}}\n\n\ +event: response.failed\n\ +data: {\"type\":\"response.failed\",\"response\":{\"error\":{\"message\":\"boom\"}}}\n"; + + let response = responses_sse_to_response_value(sse).unwrap(); + assert_eq!(response["id"], "resp_ok"); + } + + #[test] + fn aggregated_chat_sse_round_trips_through_openai_to_anthropic() { + // 全链路:错标 Content-Type 的 SSE 体 → 聚合 → 既有非流转换器 → Anthropic JSON + let sse = "data: {\"id\":\"chatcmpl-9\",\"created\":1,\"model\":\"gpt-5.4\",\"choices\":[{\"index\":0,\"delta\":{\"role\":\"assistant\",\"content\":\"Hi\"},\"finish_reason\":null}]}\n\n\ +data: {\"id\":\"chatcmpl-9\",\"choices\":[{\"index\":0,\"delta\":{},\"finish_reason\":\"stop\"}],\"usage\":{\"prompt_tokens\":4,\"completion_tokens\":1,\"total_tokens\":5}}\n\n\ +data: [DONE]\n\n"; + + let aggregated = chat_sse_to_response_value(sse).unwrap(); + let anthropic = transform::openai_to_anthropic(aggregated).unwrap(); + + assert_eq!(anthropic["model"], "gpt-5.4"); + assert_eq!(anthropic["content"][0]["type"], "text"); + assert_eq!(anthropic["content"][0]["text"], "Hi"); + assert_eq!(anthropic["stop_reason"], "end_turn"); + } + #[test] fn codex_oauth_responses_force_streaming_even_if_client_sent_false() { assert!(should_use_claude_transform_streaming( diff --git a/src-tauri/src/proxy/response_processor.rs b/src-tauri/src/proxy/response_processor.rs index ae8fde260..59d525b1b 100644 --- a/src-tauri/src/proxy/response_processor.rs +++ b/src-tauri/src/proxy/response_processor.rs @@ -35,28 +35,41 @@ use tokio::sync::Mutex; /// 根据 content-encoding 解压响应体字节 /// /// reqwest 自动解压已禁用(为了透传 accept-encoding),需要手动解压。 -fn decompress_body(content_encoding: &str, body: &[u8]) -> Result, std::io::Error> { +/// 返回 `Ok(None)` 表示编码不受支持、原样透传——此时调用方必须保留 +/// content-encoding 头,否则下游(诊断/客户端)会把压缩字节误当明文。 +fn decompress_body(content_encoding: &str, body: &[u8]) -> Result>, std::io::Error> { match content_encoding { "gzip" | "x-gzip" => { let mut decoder = flate2::read::GzDecoder::new(body); let mut decompressed = Vec::new(); decoder.read_to_end(&mut decompressed)?; - Ok(decompressed) + Ok(Some(decompressed)) } "deflate" => { - let mut decoder = flate2::read::DeflateDecoder::new(body); + // RFC 9110: deflate 指 zlib 包裹格式;但部分上游发 raw deflate 流。 + // 先按规范尝试 zlib,失败再回退 raw —— 否则合规上游必然解压失败, + // 原始压缩字节会被 fail-open 透传给 JSON 解析(#2234 形态 C 之一)。 let mut decompressed = Vec::new(); - decoder.read_to_end(&mut decompressed)?; - Ok(decompressed) + let mut zlib = flate2::read::ZlibDecoder::new(body); + match zlib.read_to_end(&mut decompressed) { + Ok(_) => Ok(Some(decompressed)), + Err(zlib_err) => { + log::debug!("deflate 按 zlib 解压失败({zlib_err}),回退 raw deflate"); + let mut decompressed = Vec::new(); + let mut raw = flate2::read::DeflateDecoder::new(body); + raw.read_to_end(&mut decompressed)?; + Ok(Some(decompressed)) + } + } } "br" => { let mut decompressed = Vec::new(); brotli::BrotliDecompress(&mut std::io::Cursor::new(body), &mut decompressed)?; - Ok(decompressed) + Ok(Some(decompressed)) } _ => { log::warn!("未知的 content-encoding: {content_encoding},跳过解压"); - Ok(body.to_vec()) + Ok(None) } } } @@ -150,10 +163,13 @@ pub(crate) async fn read_decoded_body( if let Some(encoding) = get_content_encoding(&headers) { log::debug!("[{tag}] 解压非流式响应: content-encoding={encoding}"); match decompress_body(&encoding, &raw_bytes) { - Ok(decompressed) => { + Ok(Some(decompressed)) => { body_bytes = Bytes::from(decompressed); decoded = true; } + // 不支持的编码:原样透传且保留 content-encoding 头, + // 让下游诊断/客户端知道这仍是压缩字节 + Ok(None) => {} Err(e) => { log::warn!("[{tag}] 解压失败 ({encoding}): {e},使用原始数据"); } @@ -862,6 +878,40 @@ mod tests { use std::sync::Arc; use tokio::sync::RwLock; + #[test] + fn decompress_body_deflate_handles_zlib_wrapped_per_rfc9110() { + // RFC 9110 规范的 deflate = zlib 包裹格式(合规上游发的就是这个) + let payload = br#"{"ok":true}"#; + let mut encoder = + flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default()); + std::io::Write::write_all(&mut encoder, payload).unwrap(); + let compressed = encoder.finish().unwrap(); + + let decompressed = decompress_body("deflate", &compressed).unwrap().unwrap(); + assert_eq!(decompressed, payload); + } + + #[test] + fn decompress_body_deflate_falls_back_to_raw_stream() { + // 部分上游违规发 raw deflate 流,保持兼容 + let payload = br#"{"ok":true}"#; + let mut encoder = + flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default()); + std::io::Write::write_all(&mut encoder, payload).unwrap(); + let compressed = encoder.finish().unwrap(); + + let decompressed = decompress_body("deflate", &compressed).unwrap().unwrap(); + assert_eq!(decompressed, payload); + } + + #[test] + fn decompress_body_unknown_encoding_returns_none_to_keep_headers() { + // 未知编码必须返回 None(而非伪装成"已解码"),否则 content-encoding + // 头被剥掉,下游诊断会把压缩字节误报成明文 + let result = decompress_body("zstd", b"\x28\xb5\x2f\xfd").unwrap(); + assert!(result.is_none()); + } + #[test] fn test_strip_sse_field_accepts_optional_space() { assert_eq!(