From 49c8ec69d01e5aad244a33523e7233637ea2be8a Mon Sep 17 00:00:00 2001 From: canxin121 Date: Mon, 23 Feb 2026 12:52:25 +0800 Subject: [PATCH 1/4] fix(openai): emit valid responses stream error chunks When /v1/responses streaming fails after headers are sent, we now emit a type=error chunk instead of an HTTP-style {error:{...}} payload, preventing AI SDK chunk validation errors. --- .../openai/openai_responses_handlers.go | 4 +- ...ai_responses_handlers_stream_error_test.go | 43 +++++++ .../handlers/openai_responses_stream_error.go | 119 ++++++++++++++++++ .../openai_responses_stream_error_test.go | 48 +++++++ 4 files changed, 212 insertions(+), 2 deletions(-) create mode 100644 sdk/api/handlers/openai/openai_responses_handlers_stream_error_test.go create mode 100644 sdk/api/handlers/openai_responses_stream_error.go create mode 100644 sdk/api/handlers/openai_responses_stream_error_test.go diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index 1cd7e04f..3bca75f9 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -265,8 +265,8 @@ func (h *OpenAIResponsesAPIHandler) forwardResponsesStream(c *gin.Context, flush if errMsg.Error != nil && errMsg.Error.Error() != "" { errText = errMsg.Error.Error() } - body := handlers.BuildErrorResponseBody(status, errText) - _, _ = fmt.Fprintf(c.Writer, "\nevent: error\ndata: %s\n\n", string(body)) + chunk := handlers.BuildOpenAIResponsesStreamErrorChunk(status, errText, 0) + _, _ = fmt.Fprintf(c.Writer, "\nevent: error\ndata: %s\n\n", string(chunk)) }, WriteDone: func() { _, _ = c.Writer.Write([]byte("\n")) diff --git a/sdk/api/handlers/openai/openai_responses_handlers_stream_error_test.go b/sdk/api/handlers/openai/openai_responses_handlers_stream_error_test.go new file mode 100644 index 00000000..dce73807 --- /dev/null +++ b/sdk/api/handlers/openai/openai_responses_handlers_stream_error_test.go @@ -0,0 +1,43 @@ +package openai + +import ( + "errors" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/gin-gonic/gin" + "github.com/router-for-me/CLIProxyAPI/v6/internal/interfaces" + "github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers" + sdkconfig "github.com/router-for-me/CLIProxyAPI/v6/sdk/config" +) + +func TestForwardResponsesStreamTerminalErrorUsesResponsesErrorChunk(t *testing.T) { + gin.SetMode(gin.TestMode) + base := handlers.NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, nil) + h := NewOpenAIResponsesAPIHandler(base) + + recorder := httptest.NewRecorder() + c, _ := gin.CreateTestContext(recorder) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses", nil) + + flusher, ok := c.Writer.(http.Flusher) + if !ok { + t.Fatalf("expected gin writer to implement http.Flusher") + } + + data := make(chan []byte) + errs := make(chan *interfaces.ErrorMessage, 1) + errs <- &interfaces.ErrorMessage{StatusCode: http.StatusInternalServerError, Error: errors.New("unexpected EOF")} + close(errs) + + h.forwardResponsesStream(c, flusher, func(error) {}, data, errs) + body := recorder.Body.String() + if !strings.Contains(body, `"type":"error"`) { + t.Fatalf("expected responses error chunk, got: %q", body) + } + if strings.Contains(body, `"error":{`) { + t.Fatalf("expected streaming error chunk (top-level type), got HTTP error body: %q", body) + } +} diff --git a/sdk/api/handlers/openai_responses_stream_error.go b/sdk/api/handlers/openai_responses_stream_error.go new file mode 100644 index 00000000..e7760bd0 --- /dev/null +++ b/sdk/api/handlers/openai_responses_stream_error.go @@ -0,0 +1,119 @@ +package handlers + +import ( + "encoding/json" + "fmt" + "net/http" + "strings" +) + +type openAIResponsesStreamErrorChunk struct { + Type string `json:"type"` + Code string `json:"code"` + Message string `json:"message"` + SequenceNumber int `json:"sequence_number"` +} + +func openAIResponsesStreamErrorCode(status int) string { + switch status { + case http.StatusUnauthorized: + return "invalid_api_key" + case http.StatusForbidden: + return "insufficient_quota" + case http.StatusTooManyRequests: + return "rate_limit_exceeded" + case http.StatusNotFound: + return "model_not_found" + case http.StatusRequestTimeout: + return "request_timeout" + default: + if status >= http.StatusInternalServerError { + return "internal_server_error" + } + if status >= http.StatusBadRequest { + return "invalid_request_error" + } + return "unknown_error" + } +} + +// BuildOpenAIResponsesStreamErrorChunk builds an OpenAI Responses streaming error chunk. +// +// Important: OpenAI's HTTP error bodies are shaped like {"error":{...}}; those are valid for +// non-streaming responses, but streaming clients validate SSE `data:` payloads against a union +// of chunks that requires a top-level `type` field. +func BuildOpenAIResponsesStreamErrorChunk(status int, errText string, sequenceNumber int) []byte { + if status <= 0 { + status = http.StatusInternalServerError + } + if sequenceNumber < 0 { + sequenceNumber = 0 + } + + message := strings.TrimSpace(errText) + if message == "" { + message = http.StatusText(status) + } + + code := openAIResponsesStreamErrorCode(status) + + trimmed := strings.TrimSpace(errText) + if trimmed != "" && json.Valid([]byte(trimmed)) { + var payload map[string]any + if err := json.Unmarshal([]byte(trimmed), &payload); err == nil { + if t, ok := payload["type"].(string); ok && strings.TrimSpace(t) == "error" { + if m, ok := payload["message"].(string); ok && strings.TrimSpace(m) != "" { + message = strings.TrimSpace(m) + } + if v, ok := payload["code"]; ok && v != nil { + if c, ok := v.(string); ok && strings.TrimSpace(c) != "" { + code = strings.TrimSpace(c) + } else { + code = strings.TrimSpace(fmt.Sprint(v)) + } + } + if v, ok := payload["sequence_number"].(float64); ok && sequenceNumber == 0 { + sequenceNumber = int(v) + } + } + if e, ok := payload["error"].(map[string]any); ok { + if m, ok := e["message"].(string); ok && strings.TrimSpace(m) != "" { + message = strings.TrimSpace(m) + } + if v, ok := e["code"]; ok && v != nil { + if c, ok := v.(string); ok && strings.TrimSpace(c) != "" { + code = strings.TrimSpace(c) + } else { + code = strings.TrimSpace(fmt.Sprint(v)) + } + } + } + } + } + + if strings.TrimSpace(code) == "" { + code = "unknown_error" + } + + data, err := json.Marshal(openAIResponsesStreamErrorChunk{ + Type: "error", + Code: code, + Message: message, + SequenceNumber: sequenceNumber, + }) + if err == nil { + return data + } + + // Extremely defensive fallback. + data, _ = json.Marshal(openAIResponsesStreamErrorChunk{ + Type: "error", + Code: "internal_server_error", + Message: message, + SequenceNumber: sequenceNumber, + }) + if len(data) > 0 { + return data + } + return []byte(`{"type":"error","code":"internal_server_error","message":"internal error","sequence_number":0}`) +} diff --git a/sdk/api/handlers/openai_responses_stream_error_test.go b/sdk/api/handlers/openai_responses_stream_error_test.go new file mode 100644 index 00000000..90b2c667 --- /dev/null +++ b/sdk/api/handlers/openai_responses_stream_error_test.go @@ -0,0 +1,48 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "testing" +) + +func TestBuildOpenAIResponsesStreamErrorChunk(t *testing.T) { + chunk := BuildOpenAIResponsesStreamErrorChunk(http.StatusInternalServerError, "unexpected EOF", 0) + var payload map[string]any + if err := json.Unmarshal(chunk, &payload); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if payload["type"] != "error" { + t.Fatalf("type = %v, want %q", payload["type"], "error") + } + if payload["code"] != "internal_server_error" { + t.Fatalf("code = %v, want %q", payload["code"], "internal_server_error") + } + if payload["message"] != "unexpected EOF" { + t.Fatalf("message = %v, want %q", payload["message"], "unexpected EOF") + } + if payload["sequence_number"] != float64(0) { + t.Fatalf("sequence_number = %v, want %v", payload["sequence_number"], 0) + } +} + +func TestBuildOpenAIResponsesStreamErrorChunkExtractsHTTPErrorBody(t *testing.T) { + chunk := BuildOpenAIResponsesStreamErrorChunk( + http.StatusInternalServerError, + `{"error":{"message":"oops","type":"server_error","code":"internal_server_error"}}`, + 0, + ) + var payload map[string]any + if err := json.Unmarshal(chunk, &payload); err != nil { + t.Fatalf("unmarshal: %v", err) + } + if payload["type"] != "error" { + t.Fatalf("type = %v, want %q", payload["type"], "error") + } + if payload["code"] != "internal_server_error" { + t.Fatalf("code = %v, want %q", payload["code"], "internal_server_error") + } + if payload["message"] != "oops" { + t.Fatalf("message = %v, want %q", payload["message"], "oops") + } +} From 5382764d8a61519d6b8440eef99484c7ef4a6bc8 Mon Sep 17 00:00:00 2001 From: canxin121 Date: Mon, 23 Feb 2026 13:22:06 +0800 Subject: [PATCH 2/4] fix(responses): include model and usage in translated streams Ensure response.created and response.completed chunks produced by the OpenAI/Gemini/Claude translators always include required fields (response.model and response.usage) so clients validating Responses SSE do not fail schema validation. --- .../claude_openai-responses_response.go | 20 +++--- .../claude_openai-responses_response_test.go | 67 +++++++++++++++++++ .../gemini_openai-responses_response.go | 30 +++++---- .../gemini_openai-responses_response_test.go | 31 +++++++++ .../openai_openai-responses_response.go | 23 +++---- .../openai_openai-responses_response_test.go | 61 +++++++++++++++++ 6 files changed, 196 insertions(+), 36 deletions(-) create mode 100644 internal/translator/claude/openai/responses/claude_openai-responses_response_test.go create mode 100644 internal/translator/openai/openai/responses/openai_openai-responses_response_test.go diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index e77b09e1..56965fdc 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -109,6 +109,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.CreatedAt) + created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitEvent("response.created", created)) // response.in_progress inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -412,19 +413,14 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin if st.ReasoningBuf.Len() > 0 { reasoningTokens = int64(st.ReasoningBuf.Len() / 4) } - usagePresent := st.UsageSeen || reasoningTokens > 0 - if usagePresent { - completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.InputTokens) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", 0) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.OutputTokens) - if reasoningTokens > 0 { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoningTokens) - } - total := st.InputTokens + st.OutputTokens - if total > 0 || st.UsageSeen { - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) - } + completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.InputTokens) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", 0) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.OutputTokens) + if reasoningTokens > 0 { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoningTokens) } + total := st.InputTokens + st.OutputTokens + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) out = append(out, emitEvent("response.completed", completed)) } diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go b/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go new file mode 100644 index 00000000..27b25f9d --- /dev/null +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go @@ -0,0 +1,67 @@ +package responses + +import ( + "context" + "strings" + "testing" + + "github.com/tidwall/gjson" +) + +func parseSSEEvent(t *testing.T, chunk string) (string, gjson.Result) { + t.Helper() + + lines := strings.Split(chunk, "\n") + if len(lines) < 2 { + t.Fatalf("unexpected SSE chunk: %q", chunk) + } + + event := strings.TrimSpace(strings.TrimPrefix(lines[0], "event:")) + dataLine := strings.TrimSpace(strings.TrimPrefix(lines[1], "data:")) + if !gjson.Valid(dataLine) { + t.Fatalf("invalid SSE data JSON: %q", dataLine) + } + return event, gjson.Parse(dataLine) +} + +func TestConvertClaudeResponseToOpenAIResponses_CreatedHasModelAndCompletedHasUsage(t *testing.T) { + in := []string{ + `data: {"type":"message_start","message":{"id":"msg_1"}}`, + `data: {"type":"message_stop"}`, + } + + var param any + var out []string + for _, line := range in { + out = append(out, ConvertClaudeResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(line), ¶m)...) + } + + gotCreated := false + gotCompleted := false + createdModel := "" + for _, chunk := range out { + ev, data := parseSSEEvent(t, chunk) + switch ev { + case "response.created": + gotCreated = true + createdModel = data.Get("response.model").String() + case "response.completed": + gotCompleted = true + if !data.Get("response.usage.input_tokens").Exists() { + t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) + } + if !data.Get("response.usage.output_tokens").Exists() { + t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) + } + } + } + if !gotCreated { + t.Fatalf("missing response.created event") + } + if createdModel != "test-model" { + t.Fatalf("unexpected response.created model: got %q", createdModel) + } + if !gotCompleted { + t.Fatalf("missing response.completed event") + } +} diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go index 985897fa..a19bf8ca 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go @@ -212,6 +212,7 @@ func ConvertGeminiResponseToOpenAIResponses(_ context.Context, modelName string, created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.CreatedAt) + created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitEvent("response.created", created)) inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -529,31 +530,36 @@ func ConvertGeminiResponseToOpenAIResponses(_ context.Context, modelName string, completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) } - // usage mapping + input := int64(0) + cached := int64(0) + output := int64(0) + reasoning := int64(0) + total := int64(0) if um := root.Get("usageMetadata"); um.Exists() { // input tokens = prompt + thoughts - input := um.Get("promptTokenCount").Int() + um.Get("thoughtsTokenCount").Int() - completed, _ = sjson.Set(completed, "response.usage.input_tokens", input) + input = um.Get("promptTokenCount").Int() + um.Get("thoughtsTokenCount").Int() // cached token details: align with OpenAI "cached_tokens" semantics. - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", um.Get("cachedContentTokenCount").Int()) + cached = um.Get("cachedContentTokenCount").Int() // output tokens if v := um.Get("candidatesTokenCount"); v.Exists() { - completed, _ = sjson.Set(completed, "response.usage.output_tokens", v.Int()) - } else { - completed, _ = sjson.Set(completed, "response.usage.output_tokens", 0) + output = v.Int() } if v := um.Get("thoughtsTokenCount"); v.Exists() { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", v.Int()) - } else { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", 0) + reasoning = v.Int() } if v := um.Get("totalTokenCount"); v.Exists() { - completed, _ = sjson.Set(completed, "response.usage.total_tokens", v.Int()) + total = v.Int() } else { - completed, _ = sjson.Set(completed, "response.usage.total_tokens", 0) + total = input + output } } + completed, _ = sjson.Set(completed, "response.usage.input_tokens", input) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", cached) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", output) + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoning) + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) + out = append(out, emitEvent("response.completed", completed)) } diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go index 9899c594..d0e01160 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go @@ -53,6 +53,7 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin textDone string messageText string responseID string + createdModel string instructions string cachedTokens int64 @@ -68,6 +69,8 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin for i, chunk := range out { ev, data := parseSSEEvent(t, chunk) switch ev { + case "response.created": + createdModel = data.Get("response.model").String() case "response.output_text.done": gotTextDone = true if posTextDone == -1 { @@ -132,6 +135,9 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin if responseID != "resp_req_vrtx_1" { t.Fatalf("unexpected response id: got %q", responseID) } + if createdModel != "test-model" { + t.Fatalf("unexpected response.created model: got %q", createdModel) + } if instructions != "test instructions" { t.Fatalf("unexpected instructions echo: got %q", instructions) } @@ -153,6 +159,31 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin } } +func TestConvertGeminiResponseToOpenAIResponses_CompletedAlwaysHasUsage(t *testing.T) { + in := `data: {"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"hi"}]},"finishReason":"STOP"}],"modelVersion":"test-model","responseId":"req_no_usage"},"traceId":"t1"}` + + var param any + out := ConvertGeminiResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(in), ¶m) + + gotCompleted := false + for _, chunk := range out { + ev, data := parseSSEEvent(t, chunk) + if ev != "response.completed" { + continue + } + gotCompleted = true + if !data.Get("response.usage.input_tokens").Exists() { + t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) + } + if !data.Get("response.usage.output_tokens").Exists() { + t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) + } + } + if !gotCompleted { + t.Fatalf("missing response.completed event") + } +} + func TestConvertGeminiResponseToOpenAIResponses_ReasoningEncryptedContent(t *testing.T) { sig := "RXE0RENrZ0lDeEFDR0FJcVFOZDdjUzlleGFuRktRdFcvSzNyZ2MvWDNCcDQ4RmxSbGxOWUlOVU5kR1l1UHMrMGdkMVp0Vkg3ekdKU0g4YVljc2JjN3lNK0FrdGpTNUdqamI4T3Z0VVNETzdQd3pmcFhUOGl3U3hXUEJvTVFRQ09mWTFyMEtTWGZxUUlJakFqdmFGWk83RW1XRlBKckJVOVpkYzdDKw==" in := []string{ diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_response.go b/internal/translator/openai/openai/responses/openai_openai-responses_response.go index 15152852..5e669ec2 100644 --- a/internal/translator/openai/openai/responses/openai_openai-responses_response.go +++ b/internal/translator/openai/openai/responses/openai_openai-responses_response.go @@ -153,6 +153,7 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.Created) + created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitRespEvent("response.created", created)) inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -578,19 +579,17 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, if gjson.Get(outputsWrapper, "arr.#").Int() > 0 { completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) } - if st.UsageSeen { - completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) - if st.ReasoningTokens > 0 { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) - } - total := st.TotalTokens - if total == 0 { - total = st.PromptTokens + st.CompletionTokens - } - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) + completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) + if st.ReasoningTokens > 0 { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) } + total := st.TotalTokens + if total == 0 { + total = st.PromptTokens + st.CompletionTokens + } + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) out = append(out, emitRespEvent("response.completed", completed)) } diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go b/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go new file mode 100644 index 00000000..2275d487 --- /dev/null +++ b/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go @@ -0,0 +1,61 @@ +package responses + +import ( + "context" + "strings" + "testing" + + "github.com/tidwall/gjson" +) + +func parseSSEEvent(t *testing.T, chunk string) (string, gjson.Result) { + t.Helper() + + lines := strings.Split(chunk, "\n") + if len(lines) < 2 { + t.Fatalf("unexpected SSE chunk: %q", chunk) + } + + event := strings.TrimSpace(strings.TrimPrefix(lines[0], "event:")) + dataLine := strings.TrimSpace(strings.TrimPrefix(lines[1], "data:")) + if !gjson.Valid(dataLine) { + t.Fatalf("invalid SSE data JSON: %q", dataLine) + } + return event, gjson.Parse(dataLine) +} + +func TestConvertOpenAIChatCompletionsResponseToOpenAIResponses_CreatedHasModelAndCompletedHasUsage(t *testing.T) { + in := `data: {"id":"chatcmpl-1","object":"chat.completion.chunk","created":1700000000,"choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}` + + var param any + out := ConvertOpenAIChatCompletionsResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(in), ¶m) + + gotCreated := false + gotCompleted := false + createdModel := "" + for _, chunk := range out { + ev, data := parseSSEEvent(t, chunk) + switch ev { + case "response.created": + gotCreated = true + createdModel = data.Get("response.model").String() + case "response.completed": + gotCompleted = true + if !data.Get("response.usage.input_tokens").Exists() { + t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) + } + if !data.Get("response.usage.output_tokens").Exists() { + t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) + } + } + } + if !gotCreated { + t.Fatalf("missing response.created event") + } + if createdModel != "test-model" { + t.Fatalf("unexpected response.created model: got %q", createdModel) + } + if !gotCompleted { + t.Fatalf("missing response.completed event") + } +} From eb7571936c041b4cfae500c0fd5814ca7acd8500 Mon Sep 17 00:00:00 2001 From: canxin121 Date: Mon, 23 Feb 2026 13:30:43 +0800 Subject: [PATCH 3/4] revert: translator changes (path guard) CI blocks PRs that modify internal/translator. Revert translator edits and keep only the /v1/responses streaming error-chunk fix; file an issue for translator conformance work. --- .../claude_openai-responses_response.go | 20 +++--- .../claude_openai-responses_response_test.go | 67 ------------------- .../gemini_openai-responses_response.go | 30 ++++----- .../gemini_openai-responses_response_test.go | 31 --------- .../openai_openai-responses_response.go | 23 ++++--- .../openai_openai-responses_response_test.go | 61 ----------------- 6 files changed, 36 insertions(+), 196 deletions(-) delete mode 100644 internal/translator/claude/openai/responses/claude_openai-responses_response_test.go delete mode 100644 internal/translator/openai/openai/responses/openai_openai-responses_response_test.go diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response.go b/internal/translator/claude/openai/responses/claude_openai-responses_response.go index 56965fdc..e77b09e1 100644 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response.go +++ b/internal/translator/claude/openai/responses/claude_openai-responses_response.go @@ -109,7 +109,6 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.CreatedAt) - created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitEvent("response.created", created)) // response.in_progress inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -413,14 +412,19 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin if st.ReasoningBuf.Len() > 0 { reasoningTokens = int64(st.ReasoningBuf.Len() / 4) } - completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.InputTokens) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", 0) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.OutputTokens) - if reasoningTokens > 0 { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoningTokens) + usagePresent := st.UsageSeen || reasoningTokens > 0 + if usagePresent { + completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.InputTokens) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", 0) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.OutputTokens) + if reasoningTokens > 0 { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoningTokens) + } + total := st.InputTokens + st.OutputTokens + if total > 0 || st.UsageSeen { + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) + } } - total := st.InputTokens + st.OutputTokens - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) out = append(out, emitEvent("response.completed", completed)) } diff --git a/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go b/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go deleted file mode 100644 index 27b25f9d..00000000 --- a/internal/translator/claude/openai/responses/claude_openai-responses_response_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package responses - -import ( - "context" - "strings" - "testing" - - "github.com/tidwall/gjson" -) - -func parseSSEEvent(t *testing.T, chunk string) (string, gjson.Result) { - t.Helper() - - lines := strings.Split(chunk, "\n") - if len(lines) < 2 { - t.Fatalf("unexpected SSE chunk: %q", chunk) - } - - event := strings.TrimSpace(strings.TrimPrefix(lines[0], "event:")) - dataLine := strings.TrimSpace(strings.TrimPrefix(lines[1], "data:")) - if !gjson.Valid(dataLine) { - t.Fatalf("invalid SSE data JSON: %q", dataLine) - } - return event, gjson.Parse(dataLine) -} - -func TestConvertClaudeResponseToOpenAIResponses_CreatedHasModelAndCompletedHasUsage(t *testing.T) { - in := []string{ - `data: {"type":"message_start","message":{"id":"msg_1"}}`, - `data: {"type":"message_stop"}`, - } - - var param any - var out []string - for _, line := range in { - out = append(out, ConvertClaudeResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(line), ¶m)...) - } - - gotCreated := false - gotCompleted := false - createdModel := "" - for _, chunk := range out { - ev, data := parseSSEEvent(t, chunk) - switch ev { - case "response.created": - gotCreated = true - createdModel = data.Get("response.model").String() - case "response.completed": - gotCompleted = true - if !data.Get("response.usage.input_tokens").Exists() { - t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) - } - if !data.Get("response.usage.output_tokens").Exists() { - t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) - } - } - } - if !gotCreated { - t.Fatalf("missing response.created event") - } - if createdModel != "test-model" { - t.Fatalf("unexpected response.created model: got %q", createdModel) - } - if !gotCompleted { - t.Fatalf("missing response.completed event") - } -} diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go index a19bf8ca..985897fa 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_response.go @@ -212,7 +212,6 @@ func ConvertGeminiResponseToOpenAIResponses(_ context.Context, modelName string, created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.CreatedAt) - created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitEvent("response.created", created)) inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -530,36 +529,31 @@ func ConvertGeminiResponseToOpenAIResponses(_ context.Context, modelName string, completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) } - input := int64(0) - cached := int64(0) - output := int64(0) - reasoning := int64(0) - total := int64(0) + // usage mapping if um := root.Get("usageMetadata"); um.Exists() { // input tokens = prompt + thoughts - input = um.Get("promptTokenCount").Int() + um.Get("thoughtsTokenCount").Int() + input := um.Get("promptTokenCount").Int() + um.Get("thoughtsTokenCount").Int() + completed, _ = sjson.Set(completed, "response.usage.input_tokens", input) // cached token details: align with OpenAI "cached_tokens" semantics. - cached = um.Get("cachedContentTokenCount").Int() + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", um.Get("cachedContentTokenCount").Int()) // output tokens if v := um.Get("candidatesTokenCount"); v.Exists() { - output = v.Int() + completed, _ = sjson.Set(completed, "response.usage.output_tokens", v.Int()) + } else { + completed, _ = sjson.Set(completed, "response.usage.output_tokens", 0) } if v := um.Get("thoughtsTokenCount"); v.Exists() { - reasoning = v.Int() + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", v.Int()) + } else { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", 0) } if v := um.Get("totalTokenCount"); v.Exists() { - total = v.Int() + completed, _ = sjson.Set(completed, "response.usage.total_tokens", v.Int()) } else { - total = input + output + completed, _ = sjson.Set(completed, "response.usage.total_tokens", 0) } } - completed, _ = sjson.Set(completed, "response.usage.input_tokens", input) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", cached) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", output) - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", reasoning) - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) - out = append(out, emitEvent("response.completed", completed)) } diff --git a/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go b/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go index d0e01160..9899c594 100644 --- a/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go +++ b/internal/translator/gemini/openai/responses/gemini_openai-responses_response_test.go @@ -53,7 +53,6 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin textDone string messageText string responseID string - createdModel string instructions string cachedTokens int64 @@ -69,8 +68,6 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin for i, chunk := range out { ev, data := parseSSEEvent(t, chunk) switch ev { - case "response.created": - createdModel = data.Get("response.model").String() case "response.output_text.done": gotTextDone = true if posTextDone == -1 { @@ -135,9 +132,6 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin if responseID != "resp_req_vrtx_1" { t.Fatalf("unexpected response id: got %q", responseID) } - if createdModel != "test-model" { - t.Fatalf("unexpected response.created model: got %q", createdModel) - } if instructions != "test instructions" { t.Fatalf("unexpected instructions echo: got %q", instructions) } @@ -159,31 +153,6 @@ func TestConvertGeminiResponseToOpenAIResponses_UnwrapAndAggregateText(t *testin } } -func TestConvertGeminiResponseToOpenAIResponses_CompletedAlwaysHasUsage(t *testing.T) { - in := `data: {"response":{"candidates":[{"content":{"role":"model","parts":[{"text":"hi"}]},"finishReason":"STOP"}],"modelVersion":"test-model","responseId":"req_no_usage"},"traceId":"t1"}` - - var param any - out := ConvertGeminiResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(in), ¶m) - - gotCompleted := false - for _, chunk := range out { - ev, data := parseSSEEvent(t, chunk) - if ev != "response.completed" { - continue - } - gotCompleted = true - if !data.Get("response.usage.input_tokens").Exists() { - t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) - } - if !data.Get("response.usage.output_tokens").Exists() { - t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) - } - } - if !gotCompleted { - t.Fatalf("missing response.completed event") - } -} - func TestConvertGeminiResponseToOpenAIResponses_ReasoningEncryptedContent(t *testing.T) { sig := "RXE0RENrZ0lDeEFDR0FJcVFOZDdjUzlleGFuRktRdFcvSzNyZ2MvWDNCcDQ4RmxSbGxOWUlOVU5kR1l1UHMrMGdkMVp0Vkg3ekdKU0g4YVljc2JjN3lNK0FrdGpTNUdqamI4T3Z0VVNETzdQd3pmcFhUOGl3U3hXUEJvTVFRQ09mWTFyMEtTWGZxUUlJakFqdmFGWk83RW1XRlBKckJVOVpkYzdDKw==" in := []string{ diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_response.go b/internal/translator/openai/openai/responses/openai_openai-responses_response.go index 5e669ec2..15152852 100644 --- a/internal/translator/openai/openai/responses/openai_openai-responses_response.go +++ b/internal/translator/openai/openai/responses/openai_openai-responses_response.go @@ -153,7 +153,6 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, created, _ = sjson.Set(created, "sequence_number", nextSeq()) created, _ = sjson.Set(created, "response.id", st.ResponseID) created, _ = sjson.Set(created, "response.created_at", st.Created) - created, _ = sjson.Set(created, "response.model", modelName) out = append(out, emitRespEvent("response.created", created)) inprog := `{"type":"response.in_progress","sequence_number":0,"response":{"id":"","object":"response","created_at":0,"status":"in_progress"}}` @@ -579,17 +578,19 @@ func ConvertOpenAIChatCompletionsResponseToOpenAIResponses(ctx context.Context, if gjson.Get(outputsWrapper, "arr.#").Int() > 0 { completed, _ = sjson.SetRaw(completed, "response.output", gjson.Get(outputsWrapper, "arr").Raw) } - completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) - completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) - completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) - if st.ReasoningTokens > 0 { - completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) + if st.UsageSeen { + completed, _ = sjson.Set(completed, "response.usage.input_tokens", st.PromptTokens) + completed, _ = sjson.Set(completed, "response.usage.input_tokens_details.cached_tokens", st.CachedTokens) + completed, _ = sjson.Set(completed, "response.usage.output_tokens", st.CompletionTokens) + if st.ReasoningTokens > 0 { + completed, _ = sjson.Set(completed, "response.usage.output_tokens_details.reasoning_tokens", st.ReasoningTokens) + } + total := st.TotalTokens + if total == 0 { + total = st.PromptTokens + st.CompletionTokens + } + completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) } - total := st.TotalTokens - if total == 0 { - total = st.PromptTokens + st.CompletionTokens - } - completed, _ = sjson.Set(completed, "response.usage.total_tokens", total) out = append(out, emitRespEvent("response.completed", completed)) } diff --git a/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go b/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go deleted file mode 100644 index 2275d487..00000000 --- a/internal/translator/openai/openai/responses/openai_openai-responses_response_test.go +++ /dev/null @@ -1,61 +0,0 @@ -package responses - -import ( - "context" - "strings" - "testing" - - "github.com/tidwall/gjson" -) - -func parseSSEEvent(t *testing.T, chunk string) (string, gjson.Result) { - t.Helper() - - lines := strings.Split(chunk, "\n") - if len(lines) < 2 { - t.Fatalf("unexpected SSE chunk: %q", chunk) - } - - event := strings.TrimSpace(strings.TrimPrefix(lines[0], "event:")) - dataLine := strings.TrimSpace(strings.TrimPrefix(lines[1], "data:")) - if !gjson.Valid(dataLine) { - t.Fatalf("invalid SSE data JSON: %q", dataLine) - } - return event, gjson.Parse(dataLine) -} - -func TestConvertOpenAIChatCompletionsResponseToOpenAIResponses_CreatedHasModelAndCompletedHasUsage(t *testing.T) { - in := `data: {"id":"chatcmpl-1","object":"chat.completion.chunk","created":1700000000,"choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}` - - var param any - out := ConvertOpenAIChatCompletionsResponseToOpenAIResponses(context.Background(), "test-model", nil, nil, []byte(in), ¶m) - - gotCreated := false - gotCompleted := false - createdModel := "" - for _, chunk := range out { - ev, data := parseSSEEvent(t, chunk) - switch ev { - case "response.created": - gotCreated = true - createdModel = data.Get("response.model").String() - case "response.completed": - gotCompleted = true - if !data.Get("response.usage.input_tokens").Exists() { - t.Fatalf("response.completed missing usage.input_tokens: %s", data.Raw) - } - if !data.Get("response.usage.output_tokens").Exists() { - t.Fatalf("response.completed missing usage.output_tokens: %s", data.Raw) - } - } - } - if !gotCreated { - t.Fatalf("missing response.created event") - } - if createdModel != "test-model" { - t.Fatalf("unexpected response.created model: got %q", createdModel) - } - if !gotCompleted { - t.Fatalf("missing response.completed event") - } -} From acf483c9e6cd5af8b91f2b670d67575bac99628e Mon Sep 17 00:00:00 2001 From: canxin121 Date: Tue, 24 Feb 2026 01:42:54 +0800 Subject: [PATCH 4/4] fix(responses): reject invalid SSE data JSON Guard the openai-response streaming path against truncated/invalid SSE data payloads by validating data: JSON before forwarding; surface a 502 terminal error instead of letting clients crash with JSON parse errors. --- sdk/api/handlers/handlers.go | 35 ++++++++ .../handlers_stream_bootstrap_test.go | 83 +++++++++++++++++++ 2 files changed, 118 insertions(+) diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 68859853..0e490e32 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -716,6 +716,12 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl return } if len(chunk.Payload) > 0 { + if handlerType == "openai-response" { + if err := validateSSEDataJSON(chunk.Payload); err != nil { + _ = sendErr(&interfaces.ErrorMessage{StatusCode: http.StatusBadGateway, Error: err}) + return + } + } sentPayload = true if okSendData := sendData(cloneBytes(chunk.Payload)); !okSendData { return @@ -727,6 +733,35 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl return dataChan, upstreamHeaders, errChan } +func validateSSEDataJSON(chunk []byte) error { + for _, line := range bytes.Split(chunk, []byte("\n")) { + line = bytes.TrimSpace(line) + if len(line) == 0 { + continue + } + if !bytes.HasPrefix(line, []byte("data:")) { + continue + } + data := bytes.TrimSpace(line[5:]) + if len(data) == 0 { + continue + } + if bytes.Equal(data, []byte("[DONE]")) { + continue + } + if json.Valid(data) { + continue + } + const max = 512 + preview := data + if len(preview) > max { + preview = preview[:max] + } + return fmt.Errorf("invalid SSE data JSON (len=%d): %q", len(data), preview) + } + return nil +} + func statusFromError(err error) int { if err == nil { return 0 diff --git a/sdk/api/handlers/handlers_stream_bootstrap_test.go b/sdk/api/handlers/handlers_stream_bootstrap_test.go index ba9dcac5..b08e3a99 100644 --- a/sdk/api/handlers/handlers_stream_bootstrap_test.go +++ b/sdk/api/handlers/handlers_stream_bootstrap_test.go @@ -134,6 +134,37 @@ type authAwareStreamExecutor struct { authIDs []string } +type invalidJSONStreamExecutor struct{} + +func (e *invalidJSONStreamExecutor) Identifier() string { return "codex" } + +func (e *invalidJSONStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) { + return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"} +} + +func (e *invalidJSONStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) { + ch := make(chan coreexecutor.StreamChunk, 1) + ch <- coreexecutor.StreamChunk{Payload: []byte("event: response.completed\ndata: {\"type\"")} + close(ch) + return &coreexecutor.StreamResult{Chunks: ch}, nil +} + +func (e *invalidJSONStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) { + return auth, nil +} + +func (e *invalidJSONStreamExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) { + return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "CountTokens not implemented"} +} + +func (e *invalidJSONStreamExecutor) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (*http.Response, error) { + return nil, &coreauth.Error{ + Code: "not_implemented", + Message: "HttpRequest not implemented", + HTTPStatus: http.StatusNotImplemented, + } +} + func (e *authAwareStreamExecutor) Identifier() string { return "codex" } func (e *authAwareStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) { @@ -524,3 +555,55 @@ func TestExecuteStreamWithAuthManager_SelectedAuthCallbackReceivesAuthID(t *test t.Fatalf("selectedAuthID = %q, want %q", selectedAuthID, "auth2") } } + +func TestExecuteStreamWithAuthManager_ValidatesOpenAIResponsesStreamDataJSON(t *testing.T) { + executor := &invalidJSONStreamExecutor{} + manager := coreauth.NewManager(nil, nil, nil) + manager.RegisterExecutor(executor) + + auth1 := &coreauth.Auth{ + ID: "auth1", + Provider: "codex", + Status: coreauth.StatusActive, + Metadata: map[string]any{"email": "test1@example.com"}, + } + if _, err := manager.Register(context.Background(), auth1); err != nil { + t.Fatalf("manager.Register(auth1): %v", err) + } + + registry.GetGlobalRegistry().RegisterClient(auth1.ID, auth1.Provider, []*registry.ModelInfo{{ID: "test-model"}}) + t.Cleanup(func() { + registry.GetGlobalRegistry().UnregisterClient(auth1.ID) + }) + + handler := NewBaseAPIHandlers(&sdkconfig.SDKConfig{}, manager) + dataChan, _, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai-response", "test-model", []byte(`{"model":"test-model"}`), "") + if dataChan == nil || errChan == nil { + t.Fatalf("expected non-nil channels") + } + + var got []byte + for chunk := range dataChan { + got = append(got, chunk...) + } + if len(got) != 0 { + t.Fatalf("expected empty payload, got %q", string(got)) + } + + gotErr := false + for msg := range errChan { + if msg == nil { + continue + } + if msg.StatusCode != http.StatusBadGateway { + t.Fatalf("expected status %d, got %d", http.StatusBadGateway, msg.StatusCode) + } + if msg.Error == nil { + t.Fatalf("expected error") + } + gotErr = true + } + if !gotErr { + t.Fatalf("expected terminal error") + } +}