fix(proxy): aggregate mislabeled SSE bodies in transform fallback (#2234)

The Claude/Codex format-transform non-stream branch returned an opaque 422
"Failed to parse upstream response" whenever a 2xx upstream body was not
valid JSON. The common case: MaaS gateways force-stream a stream:false
request and return an SSE body with a non-SSE Content-Type, defeating the
header-only is_sse() check.

On serde failure, sniff for SSE and aggregate the chunks into a single
JSON, then run the existing converter so clients still receive a valid
non-stream response.

- chat_sse_to_response_value: aggregate chat.completion.chunk SSE
  (content / reasoning / refusal / tool_calls / legacy function_call),
  tool_calls index-keyed via BTreeMap to avoid unbounded densification,
  first-wins finish_reason, message-snapshot override, completeness and
  error-event guards; synthesize an id when the upstream omits one
- responses_sse_to_response_value: process the residual trailing block,
  tolerating truncation and skipping it once a completed event was seen
- enrich remaining parse failures with content-type / content-encoding /
  body-snippet diagnostics
- deflate: try zlib (RFC 9110) before raw; keep the content-encoding
  header for unsupported encodings
- gate zero-usage rows on the Claude transform path
This commit is contained in:
Jason
2026-06-11 08:29:21 +08:00
Unverified
parent e776160912
commit a6d718d0fc
2 changed files with 1118 additions and 26 deletions
File diff suppressed because it is too large Load Diff
+58 -8
View File
@@ -35,28 +35,41 @@ use tokio::sync::Mutex;
/// 根据 content-encoding 解压响应体字节
///
/// reqwest 自动解压已禁用(为了透传 accept-encoding),需要手动解压。
fn decompress_body(content_encoding: &str, body: &[u8]) -> Result<Vec<u8>, std::io::Error> {
/// 返回 `Ok(None)` 表示编码不受支持、原样透传——此时调用方必须保留
/// content-encoding 头,否则下游(诊断/客户端)会把压缩字节误当明文。
fn decompress_body(content_encoding: &str, body: &[u8]) -> Result<Option<Vec<u8>>, std::io::Error> {
match content_encoding {
"gzip" | "x-gzip" => {
let mut decoder = flate2::read::GzDecoder::new(body);
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
Ok(Some(decompressed))
}
"deflate" => {
let mut decoder = flate2::read::DeflateDecoder::new(body);
// RFC 9110: deflate 指 zlib 包裹格式;但部分上游发 raw deflate 流。
// 先按规范尝试 zlib,失败再回退 raw —— 否则合规上游必然解压失败,
// 原始压缩字节会被 fail-open 透传给 JSON 解析(#2234 形态 C 之一)。
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
let mut zlib = flate2::read::ZlibDecoder::new(body);
match zlib.read_to_end(&mut decompressed) {
Ok(_) => Ok(Some(decompressed)),
Err(zlib_err) => {
log::debug!("deflate 按 zlib 解压失败({zlib_err}),回退 raw deflate");
let mut decompressed = Vec::new();
let mut raw = flate2::read::DeflateDecoder::new(body);
raw.read_to_end(&mut decompressed)?;
Ok(Some(decompressed))
}
}
}
"br" => {
let mut decompressed = Vec::new();
brotli::BrotliDecompress(&mut std::io::Cursor::new(body), &mut decompressed)?;
Ok(decompressed)
Ok(Some(decompressed))
}
_ => {
log::warn!("未知的 content-encoding: {content_encoding},跳过解压");
Ok(body.to_vec())
Ok(None)
}
}
}
@@ -150,10 +163,13 @@ pub(crate) async fn read_decoded_body(
if let Some(encoding) = get_content_encoding(&headers) {
log::debug!("[{tag}] 解压非流式响应: content-encoding={encoding}");
match decompress_body(&encoding, &raw_bytes) {
Ok(decompressed) => {
Ok(Some(decompressed)) => {
body_bytes = Bytes::from(decompressed);
decoded = true;
}
// 不支持的编码:原样透传且保留 content-encoding 头,
// 让下游诊断/客户端知道这仍是压缩字节
Ok(None) => {}
Err(e) => {
log::warn!("[{tag}] 解压失败 ({encoding}): {e},使用原始数据");
}
@@ -862,6 +878,40 @@ mod tests {
use std::sync::Arc;
use tokio::sync::RwLock;
#[test]
fn decompress_body_deflate_handles_zlib_wrapped_per_rfc9110() {
// RFC 9110 规范的 deflate = zlib 包裹格式(合规上游发的就是这个)
let payload = br#"{"ok":true}"#;
let mut encoder =
flate2::write::ZlibEncoder::new(Vec::new(), flate2::Compression::default());
std::io::Write::write_all(&mut encoder, payload).unwrap();
let compressed = encoder.finish().unwrap();
let decompressed = decompress_body("deflate", &compressed).unwrap().unwrap();
assert_eq!(decompressed, payload);
}
#[test]
fn decompress_body_deflate_falls_back_to_raw_stream() {
// 部分上游违规发 raw deflate 流,保持兼容
let payload = br#"{"ok":true}"#;
let mut encoder =
flate2::write::DeflateEncoder::new(Vec::new(), flate2::Compression::default());
std::io::Write::write_all(&mut encoder, payload).unwrap();
let compressed = encoder.finish().unwrap();
let decompressed = decompress_body("deflate", &compressed).unwrap().unwrap();
assert_eq!(decompressed, payload);
}
#[test]
fn decompress_body_unknown_encoding_returns_none_to_keep_headers() {
// 未知编码必须返回 None(而非伪装成"已解码"),否则 content-encoding
// 头被剥掉,下游诊断会把压缩字节误报成明文
let result = decompress_body("zstd", b"\x28\xb5\x2f\xfd").unwrap();
assert!(result.is_none());
}
#[test]
fn test_strip_sse_field_accepts_optional_space() {
assert_eq!(