From 1215c635a0e7ba72a8e94864d83f4def3480be73 Mon Sep 17 00:00:00 2001 From: teeverc <72298507+teeverc@users.noreply.github.com> Date: Fri, 12 Dec 2025 00:14:19 -0800 Subject: [PATCH] fix: flush Claude SSE chunks immediately to match OpenAI behavior - Write each SSE chunk directly to c.Writer and flush immediately - Remove buffered writer and ticker-based flushing that caused delayed output - Add 500ms timeout case for consistency with OpenAI/Gemini handlers - Clean up unused bufio import This fixes the 'not streaming' issue where small responses were held in the buffer until timeout/threshold was reached. Amp-Thread-ID: https://ampcode.com/threads/T-019b1186-164e-740c-96ab-856f64ee6bee Co-authored-by: Amp --- sdk/api/handlers/claude/code_handlers.go | 51 ++++++------------------ 1 file changed, 12 insertions(+), 39 deletions(-) diff --git a/sdk/api/handlers/claude/code_handlers.go b/sdk/api/handlers/claude/code_handlers.go index 8a57a0cc..5cca651a 100644 --- a/sdk/api/handlers/claude/code_handlers.go +++ b/sdk/api/handlers/claude/code_handlers.go @@ -7,7 +7,6 @@ package claude import ( - "bufio" "bytes" "compress/gzip" "context" @@ -219,52 +218,24 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [ } func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) { - // v6.1: Intelligent Buffered Streamer strategy - // Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms). - // Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing - // flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency. - writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB - ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms - defer ticker.Stop() - - var chunkIdx int - + // OpenAI-style stream forwarding: write each SSE chunk and flush immediately. + // This guarantees clients see incremental output even for small responses. for { select { case <-c.Request.Context().Done(): - // Context cancelled, flush any remaining data before exit - _ = writer.Flush() cancel(c.Request.Context().Err()) return - case <-ticker.C: - // Smart flush: only flush when buffer has sufficient data (≥50% full) - // This reduces flush frequency while ensuring data flows naturally - buffered := writer.Buffered() - if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer) - if err := writer.Flush(); err != nil { - // Error flushing, cancel and return - cancel(err) - return - } - flusher.Flush() // Also flush the underlying http.ResponseWriter - } - case chunk, ok := <-data: if !ok { - // Stream ended, flush remaining data - _ = writer.Flush() + flusher.Flush() cancel(nil) return } - - // Forward the complete SSE event block directly (already formatted by the translator). - // The translator returns a complete SSE-compliant event block, including event:, data:, and separators. - // The handler just needs to forward it without reassembly. if len(chunk) > 0 { - _, _ = writer.Write(chunk) + _, _ = c.Writer.Write(chunk) + flusher.Flush() } - chunkIdx++ case errMsg, ok := <-errs: if !ok { @@ -276,21 +247,23 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http. status = errMsg.StatusCode } c.Status(status) + // An error occurred: emit as a proper SSE error event errorBytes, _ := json.Marshal(h.toClaudeError(errMsg)) - _, _ = writer.WriteString("event: error\n") - _, _ = writer.WriteString("data: ") - _, _ = writer.Write(errorBytes) - _, _ = writer.WriteString("\n\n") - _ = writer.Flush() + _, _ = c.Writer.Write([]byte("event: error\n")) + _, _ = c.Writer.Write([]byte("data: ")) + _, _ = c.Writer.Write(errorBytes) + _, _ = c.Writer.Write([]byte("\n\n")) flusher.Flush() } + var execErr error if errMsg != nil { execErr = errMsg.Error } cancel(execErr) return + case <-time.After(500 * time.Millisecond): } } }