diff --git a/config.example.yaml b/config.example.yaml index bf813433..a0eacc14 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -77,6 +77,9 @@ routing: # When true, enable authentication for the WebSocket API (/v1/ws). ws-auth: false +# When true, emit blank lines every 5s for non-streaming responses to prevent idle timeouts. +nonstream-keepalive: false + # Streaming behavior (SSE keep-alives + safe bootstrap retries). # streaming: # keepalive-seconds: 15 # Default: 0 (disabled). <= 0 disables keep-alives. diff --git a/internal/config/sdk_config.go b/internal/config/sdk_config.go index 596cbb2c..4ce35d6a 100644 --- a/internal/config/sdk_config.go +++ b/internal/config/sdk_config.go @@ -25,6 +25,9 @@ type SDKConfig struct { // Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries). Streaming StreamingConfig `yaml:"streaming" json:"streaming"` + + // NonStreamKeepAlive enables emitting blank lines every 5 seconds for non-streaming responses. + NonStreamKeepAlive bool `yaml:"nonstream-keepalive" json:"nonstream-keepalive"` } // StreamingConfig holds server streaming behavior configuration. diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index e24fc893..e21d910c 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -54,6 +54,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix { changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix)) } + if oldCfg.NonStreamKeepAlive != newCfg.NonStreamKeepAlive { + changes = append(changes, fmt.Sprintf("nonstream-keepalive: %t -> %t", oldCfg.NonStreamKeepAlive, newCfg.NonStreamKeepAlive)) + } // Quota-exceeded behavior if oldCfg.QuotaExceeded.SwitchProject != newCfg.QuotaExceeded.SwitchProject { diff --git a/internal/watcher/diff/config_diff_test.go b/internal/watcher/diff/config_diff_test.go index 6848f1d5..6606c776 100644 --- a/internal/watcher/diff/config_diff_test.go +++ b/internal/watcher/diff/config_diff_test.go @@ -231,10 +231,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false}, RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"}, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: false, - ProxyURL: "http://old-proxy", - APIKeys: []string{"key-1"}, - ForceModelPrefix: false, + RequestLog: false, + ProxyURL: "http://old-proxy", + APIKeys: []string{"key-1"}, + ForceModelPrefix: false, + NonStreamKeepAlive: false, }, } newCfg := &config.Config{ @@ -267,10 +268,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { SecretKey: "", }, SDKConfig: sdkconfig.SDKConfig{ - RequestLog: true, - ProxyURL: "http://new-proxy", - APIKeys: []string{" key-1 ", "key-2"}, - ForceModelPrefix: true, + RequestLog: true, + ProxyURL: "http://new-proxy", + APIKeys: []string{" key-1 ", "key-2"}, + ForceModelPrefix: true, + NonStreamKeepAlive: true, }, } @@ -285,6 +287,7 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) { expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy") expectContains(t, details, "ws-auth: false -> true") expectContains(t, details, "force-model-prefix: false -> true") + expectContains(t, details, "nonstream-keepalive: false -> true") expectContains(t, details, "quota-exceeded.switch-project: false -> true") expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true") expectContains(t, details, "api-keys count: 1 -> 2") diff --git a/sdk/api/handlers/claude/code_handlers.go b/sdk/api/handlers/claude/code_handlers.go index 6554cc9a..30ff228d 100644 --- a/sdk/api/handlers/claude/code_handlers.go +++ b/sdk/api/handlers/claude/code_handlers.go @@ -146,10 +146,12 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO c.Header("Content-Type", "application/json") alt := h.GetAlt(c) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) modelName := gjson.GetBytes(rawJSON, "model").String() resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) @@ -159,13 +161,18 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO // Decompress gzipped responses - Claude API sometimes returns gzip without Content-Encoding header // This fixes title generation and other non-streaming responses that arrive compressed if len(resp) >= 2 && resp[0] == 0x1f && resp[1] == 0x8b { - gzReader, err := gzip.NewReader(bytes.NewReader(resp)) - if err != nil { - log.Warnf("failed to decompress gzipped Claude response: %v", err) + gzReader, errGzip := gzip.NewReader(bytes.NewReader(resp)) + if errGzip != nil { + log.Warnf("failed to decompress gzipped Claude response: %v", errGzip) } else { - defer gzReader.Close() - if decompressed, err := io.ReadAll(gzReader); err != nil { - log.Warnf("failed to read decompressed Claude response: %v", err) + defer func() { + if errClose := gzReader.Close(); errClose != nil { + log.Warnf("failed to close Claude gzip reader: %v", errClose) + } + }() + decompressed, errRead := io.ReadAll(gzReader) + if errRead != nil { + log.Warnf("failed to read decompressed Claude response: %v", errRead) } else { resp = decompressed } diff --git a/sdk/api/handlers/gemini/gemini_handlers.go b/sdk/api/handlers/gemini/gemini_handlers.go index 2b17a9f2..f2bdb058 100644 --- a/sdk/api/handlers/gemini/gemini_handlers.go +++ b/sdk/api/handlers/gemini/gemini_handlers.go @@ -336,7 +336,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin c.Header("Content-Type", "application/json") alt := h.GetAlt(c) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) diff --git a/sdk/api/handlers/handlers.go b/sdk/api/handlers/handlers.go index 5a24c63a..6239c20b 100644 --- a/sdk/api/handlers/handlers.go +++ b/sdk/api/handlers/handlers.go @@ -9,6 +9,7 @@ import ( "fmt" "net/http" "strings" + "sync" "time" "github.com/gin-gonic/gin" @@ -48,6 +49,7 @@ const idempotencyKeyMetadataKey = "idempotency_key" const ( defaultStreamingKeepAliveSeconds = 0 defaultStreamingBootstrapRetries = 0 + nonStreamingKeepAliveInterval = 5 * time.Second ) // BuildErrorResponseBody builds an OpenAI-compatible JSON error response body. @@ -293,6 +295,52 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c * } } +// StartNonStreamingKeepAlive emits blank lines every 5 seconds while waiting for a non-streaming response. +// It returns a stop function that must be called before writing the final response. +func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context.Context) func() { + if h == nil || h.Cfg == nil || !h.Cfg.NonStreamKeepAlive { + return func() {} + } + if c == nil { + return func() {} + } + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return func() {} + } + if ctx == nil { + ctx = context.Background() + } + + stopChan := make(chan struct{}) + var stopOnce sync.Once + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(nonStreamingKeepAliveInterval) + defer ticker.Stop() + for { + select { + case <-stopChan: + return + case <-ctx.Done(): + return + case <-ticker.C: + _, _ = c.Writer.Write([]byte("\n")) + flusher.Flush() + } + } + }() + + return func() { + stopOnce.Do(func() { + close(stopChan) + }) + wg.Wait() + } +} + // appendAPIResponse preserves any previously captured API response and appends new data. func appendAPIResponse(c *gin.Context, data []byte) { if c == nil || len(data) == 0 { diff --git a/sdk/api/handlers/openai/openai_handlers.go b/sdk/api/handlers/openai/openai_handlers.go index 65936be7..09471ce1 100644 --- a/sdk/api/handlers/openai/openai_handlers.go +++ b/sdk/api/handlers/openai/openai_handlers.go @@ -524,7 +524,9 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context, modelName := gjson.GetBytes(chatCompletionsJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "") + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) cliCancel(errMsg.Error) diff --git a/sdk/api/handlers/openai/openai_responses_handlers.go b/sdk/api/handlers/openai/openai_responses_handlers.go index b6d7c8f2..31099f81 100644 --- a/sdk/api/handlers/openai/openai_responses_handlers.go +++ b/sdk/api/handlers/openai/openai_responses_handlers.go @@ -103,20 +103,17 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponse(c *gin.Context, r modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) - defer func() { - cliCancel() - }() + stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "") + stopKeepAlive() if errMsg != nil { h.WriteErrorResponse(c, errMsg) + cliCancel(errMsg.Error) return } _, _ = c.Writer.Write(resp) - return - - // no legacy fallback - + cliCancel() } // handleStreamingResponse handles streaming responses for Gemini models.