mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-03 13:00:52 +08:00
Merge pull request #498 from teeverc/fix/claude-streaming-flush
fix(claude): flush Claude SSE chunks immediately
This commit is contained in:
@@ -7,7 +7,6 @@
|
|||||||
package claude
|
package claude
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
|
||||||
"bytes"
|
"bytes"
|
||||||
"compress/gzip"
|
"compress/gzip"
|
||||||
"context"
|
"context"
|
||||||
@@ -219,52 +218,24 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
||||||
// v6.1: Intelligent Buffered Streamer strategy
|
// OpenAI-style stream forwarding: write each SSE chunk and flush immediately.
|
||||||
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
|
// This guarantees clients see incremental output even for small responses.
|
||||||
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
|
|
||||||
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
|
|
||||||
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
|
|
||||||
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
var chunkIdx int
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-c.Request.Context().Done():
|
case <-c.Request.Context().Done():
|
||||||
// Context cancelled, flush any remaining data before exit
|
|
||||||
_ = writer.Flush()
|
|
||||||
cancel(c.Request.Context().Err())
|
cancel(c.Request.Context().Err())
|
||||||
return
|
return
|
||||||
|
|
||||||
case <-ticker.C:
|
|
||||||
// Smart flush: only flush when buffer has sufficient data (≥50% full)
|
|
||||||
// This reduces flush frequency while ensuring data flows naturally
|
|
||||||
buffered := writer.Buffered()
|
|
||||||
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
|
|
||||||
if err := writer.Flush(); err != nil {
|
|
||||||
// Error flushing, cancel and return
|
|
||||||
cancel(err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
flusher.Flush() // Also flush the underlying http.ResponseWriter
|
|
||||||
}
|
|
||||||
|
|
||||||
case chunk, ok := <-data:
|
case chunk, ok := <-data:
|
||||||
if !ok {
|
if !ok {
|
||||||
// Stream ended, flush remaining data
|
flusher.Flush()
|
||||||
_ = writer.Flush()
|
|
||||||
cancel(nil)
|
cancel(nil)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Forward the complete SSE event block directly (already formatted by the translator).
|
|
||||||
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
|
|
||||||
// The handler just needs to forward it without reassembly.
|
|
||||||
if len(chunk) > 0 {
|
if len(chunk) > 0 {
|
||||||
_, _ = writer.Write(chunk)
|
_, _ = c.Writer.Write(chunk)
|
||||||
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
chunkIdx++
|
|
||||||
|
|
||||||
case errMsg, ok := <-errs:
|
case errMsg, ok := <-errs:
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -276,21 +247,20 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
|
|||||||
status = errMsg.StatusCode
|
status = errMsg.StatusCode
|
||||||
}
|
}
|
||||||
c.Status(status)
|
c.Status(status)
|
||||||
|
|
||||||
// An error occurred: emit as a proper SSE error event
|
// An error occurred: emit as a proper SSE error event
|
||||||
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
||||||
_, _ = writer.WriteString("event: error\n")
|
_, _ = fmt.Fprintf(c.Writer, "event: error\ndata: %s\n\n", errorBytes)
|
||||||
_, _ = writer.WriteString("data: ")
|
|
||||||
_, _ = writer.Write(errorBytes)
|
|
||||||
_, _ = writer.WriteString("\n\n")
|
|
||||||
_ = writer.Flush()
|
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
|
|
||||||
var execErr error
|
var execErr error
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
execErr = errMsg.Error
|
execErr = errMsg.Error
|
||||||
}
|
}
|
||||||
cancel(execErr)
|
cancel(execErr)
|
||||||
return
|
return
|
||||||
|
case <-time.After(500 * time.Millisecond):
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user