diff --git a/sdk/api/handlers/claude/code_handlers.go b/sdk/api/handlers/claude/code_handlers.go index bdf7c9c7..6554cc9a 100644 --- a/sdk/api/handlers/claude/code_handlers.go +++ b/sdk/api/handlers/claude/code_handlers.go @@ -212,39 +212,47 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [ } // Peek at the first chunk to determine success or failure before setting headers - select { - case <-c.Request.Context().Done(): - cliCancel(c.Request.Context().Err()) - return - case errMsg := <-errChan: - // Upstream failed immediately. Return proper error status and JSON. - h.WriteErrorResponse(c, errMsg) - if errMsg != nil { - cliCancel(errMsg.Error) - } else { - cliCancel(nil) - } - return - case chunk, ok := <-dataChan: - if !ok { - // Stream closed without data? Send DONE or just headers. + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + // Err channel closed cleanly; wait for data channel. + errChan = nil + continue + } + // Upstream failed immediately. Return proper error status and JSON. + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + // Stream closed without data? Send DONE or just headers. + setSSEHeaders() + flusher.Flush() + cliCancel(nil) + return + } + + // Success! Set headers now. setSSEHeaders() - flusher.Flush() - cliCancel(nil) + + // Write the first chunk + if len(chunk) > 0 { + _, _ = c.Writer.Write(chunk) + flusher.Flush() + } + + // Continue streaming the rest + h.forwardClaudeStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) return } - - // Success! Set headers now. - setSSEHeaders() - - // Write the first chunk - if len(chunk) > 0 { - _, _ = c.Writer.Write(chunk) - flusher.Flush() - } - - // Continue streaming the rest - h.forwardClaudeStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) } } diff --git a/sdk/api/handlers/gemini/gemini_handlers.go b/sdk/api/handlers/gemini/gemini_handlers.go index baf68aac..2b17a9f2 100644 --- a/sdk/api/handlers/gemini/gemini_handlers.go +++ b/sdk/api/handlers/gemini/gemini_handlers.go @@ -249,47 +249,55 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName } // Peek at the first chunk - select { - case <-c.Request.Context().Done(): - cliCancel(c.Request.Context().Err()) - return - case errMsg := <-errChan: - // Upstream failed immediately. Return proper error status and JSON. - h.WriteErrorResponse(c, errMsg) - if errMsg != nil { - cliCancel(errMsg.Error) - } else { - cliCancel(nil) - } - return - case chunk, ok := <-dataChan: - if !ok { - // Closed without data + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + // Err channel closed cleanly; wait for data channel. + errChan = nil + continue + } + // Upstream failed immediately. Return proper error status and JSON. + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + // Closed without data + if alt == "" { + setSSEHeaders() + } + flusher.Flush() + cliCancel(nil) + return + } + + // Success! Set headers. if alt == "" { setSSEHeaders() } + + // Write first chunk + if alt == "" { + _, _ = c.Writer.Write([]byte("data: ")) + _, _ = c.Writer.Write(chunk) + _, _ = c.Writer.Write([]byte("\n\n")) + } else { + _, _ = c.Writer.Write(chunk) + } flusher.Flush() - cliCancel(nil) + + // Continue + h.forwardGeminiStream(c, flusher, alt, func(err error) { cliCancel(err) }, dataChan, errChan) return } - - // Success! Set headers. - if alt == "" { - setSSEHeaders() - } - - // Write first chunk - if alt == "" { - _, _ = c.Writer.Write([]byte("data: ")) - _, _ = c.Writer.Write(chunk) - _, _ = c.Writer.Write([]byte("\n\n")) - } else { - _, _ = c.Writer.Write(chunk) - } - flusher.Flush() - - // Continue - h.forwardGeminiStream(c, flusher, alt, func(err error) { cliCancel(err) }, dataChan, errChan) } } diff --git a/sdk/api/handlers/openai/openai_handlers.go b/sdk/api/handlers/openai/openai_handlers.go index d5962ea7..65936be7 100644 --- a/sdk/api/handlers/openai/openai_handlers.go +++ b/sdk/api/handlers/openai/openai_handlers.go @@ -467,37 +467,45 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt } // Peek at the first chunk to determine success or failure before setting headers - select { - case <-c.Request.Context().Done(): - cliCancel(c.Request.Context().Err()) - return - case errMsg := <-errChan: - // Upstream failed immediately. Return proper error status and JSON. - h.WriteErrorResponse(c, errMsg) - if errMsg != nil { - cliCancel(errMsg.Error) - } else { - cliCancel(nil) - } - return - case chunk, ok := <-dataChan: - if !ok { - // Stream closed without data? Send DONE or just headers. + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + // Err channel closed cleanly; wait for data channel. + errChan = nil + continue + } + // Upstream failed immediately. Return proper error status and JSON. + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + // Stream closed without data? Send DONE or just headers. + setSSEHeaders() + _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") + flusher.Flush() + cliCancel(nil) + return + } + + // Success! Commit to streaming headers. setSSEHeaders() - _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") + + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk)) flusher.Flush() - cliCancel(nil) + + // Continue streaming the rest + h.handleStreamResult(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) return } - - // Success! Commit to streaming headers. - setSSEHeaders() - - _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk)) - flusher.Flush() - - // Continue streaming the rest - h.handleStreamResult(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) } } @@ -562,69 +570,77 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra } // Peek at the first chunk - select { - case <-c.Request.Context().Done(): - cliCancel(c.Request.Context().Err()) - return - case errMsg := <-errChan: - h.WriteErrorResponse(c, errMsg) - if errMsg != nil { - cliCancel(errMsg.Error) - } else { - cliCancel(nil) - } - return - case chunk, ok := <-dataChan: - if !ok { - setSSEHeaders() - _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") - flusher.Flush() - cliCancel(nil) + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) return - } + case errMsg, ok := <-errChan: + if !ok { + // Err channel closed cleanly; wait for data channel. + errChan = nil + continue + } + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + setSSEHeaders() + _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") + flusher.Flush() + cliCancel(nil) + return + } - // Success! Set headers. - setSSEHeaders() + // Success! Set headers. + setSSEHeaders() - // Write the first chunk - converted := convertChatCompletionsStreamChunkToCompletions(chunk) - if converted != nil { - _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(converted)) - flusher.Flush() - } + // Write the first chunk + converted := convertChatCompletionsStreamChunkToCompletions(chunk) + if converted != nil { + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(converted)) + flusher.Flush() + } - done := make(chan struct{}) - var doneOnce sync.Once - stop := func() { doneOnce.Do(func() { close(done) }) } + done := make(chan struct{}) + var doneOnce sync.Once + stop := func() { doneOnce.Do(func() { close(done) }) } - convertedChan := make(chan []byte) - go func() { - defer close(convertedChan) - for { - select { - case <-done: - return - case chunk, ok := <-dataChan: - if !ok { - return - } - converted := convertChatCompletionsStreamChunkToCompletions(chunk) - if converted == nil { - continue - } + convertedChan := make(chan []byte) + go func() { + defer close(convertedChan) + for { select { case <-done: return - case convertedChan <- converted: + case chunk, ok := <-dataChan: + if !ok { + return + } + converted := convertChatCompletionsStreamChunkToCompletions(chunk) + if converted == nil { + continue + } + select { + case <-done: + return + case convertedChan <- converted: + } } } - } - }() + }() - h.handleStreamResult(c, flusher, func(err error) { - stop() - cliCancel(err) - }, convertedChan, errChan) + h.handleStreamResult(c, flusher, func(err error) { + stop() + cliCancel(err) + }, convertedChan, errChan) + return + } } } func (h *OpenAIAPIHandler) handleStreamResult(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) { diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index dd63deeb..b6d7c8f2 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -152,42 +152,50 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponse(c *gin.Context, rawJ } // Peek at the first chunk - select { - case <-c.Request.Context().Done(): - cliCancel(c.Request.Context().Err()) - return - case errMsg := <-errChan: - // Upstream failed immediately. Return proper error status and JSON. - h.WriteErrorResponse(c, errMsg) - if errMsg != nil { - cliCancel(errMsg.Error) - } else { - cliCancel(nil) - } - return - case chunk, ok := <-dataChan: - if !ok { - // Stream closed without data? Send headers and done. + for { + select { + case <-c.Request.Context().Done(): + cliCancel(c.Request.Context().Err()) + return + case errMsg, ok := <-errChan: + if !ok { + // Err channel closed cleanly; wait for data channel. + errChan = nil + continue + } + // Upstream failed immediately. Return proper error status and JSON. + h.WriteErrorResponse(c, errMsg) + if errMsg != nil { + cliCancel(errMsg.Error) + } else { + cliCancel(nil) + } + return + case chunk, ok := <-dataChan: + if !ok { + // Stream closed without data? Send headers and done. + setSSEHeaders() + _, _ = c.Writer.Write([]byte("\n")) + flusher.Flush() + cliCancel(nil) + return + } + + // Success! Set headers. setSSEHeaders() + + // Write first chunk logic (matching forwardResponsesStream) + if bytes.HasPrefix(chunk, []byte("event:")) { + _, _ = c.Writer.Write([]byte("\n")) + } + _, _ = c.Writer.Write(chunk) _, _ = c.Writer.Write([]byte("\n")) flusher.Flush() - cliCancel(nil) + + // Continue + h.forwardResponsesStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) return } - - // Success! Set headers. - setSSEHeaders() - - // Write first chunk logic (matching forwardResponsesStream) - if bytes.HasPrefix(chunk, []byte("event:")) { - _, _ = c.Writer.Write([]byte("\n")) - } - _, _ = c.Writer.Write(chunk) - _, _ = c.Writer.Write([]byte("\n")) - flusher.Flush() - - // Continue - h.forwardResponsesStream(c, flusher, func(err error) { cliCancel(err) }, dataChan, errChan) } }