feat(api): add non-streaming keep-alive support for idle timeout prevention

- Introduced `StartNonStreamingKeepAlive` to emit periodic blank lines during non-streaming responses.
- Added `nonstream-keepalive` configuration option in `SDKConfig`.
- Updated handlers to utilize `StartNonStreamingKeepAlive` and ensure proper cleanup.
- Extended config diff and tests to include `nonstream-keepalive` changes.
This commit is contained in:
Luis Pater
2026-01-13 02:36:07 +08:00
parent 21ac161b21
commit b1b379ea18
9 changed files with 89 additions and 21 deletions

View File

@@ -77,6 +77,9 @@ routing:
# When true, enable authentication for the WebSocket API (/v1/ws). # When true, enable authentication for the WebSocket API (/v1/ws).
ws-auth: false ws-auth: false
# When true, emit blank lines every 5s for non-streaming responses to prevent idle timeouts.
nonstream-keepalive: false
# Streaming behavior (SSE keep-alives + safe bootstrap retries). # Streaming behavior (SSE keep-alives + safe bootstrap retries).
# streaming: # streaming:
# keepalive-seconds: 15 # Default: 0 (disabled). <= 0 disables keep-alives. # keepalive-seconds: 15 # Default: 0 (disabled). <= 0 disables keep-alives.

View File

@@ -25,6 +25,9 @@ type SDKConfig struct {
// Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries). // Streaming configures server-side streaming behavior (keep-alives and safe bootstrap retries).
Streaming StreamingConfig `yaml:"streaming" json:"streaming"` Streaming StreamingConfig `yaml:"streaming" json:"streaming"`
// NonStreamKeepAlive enables emitting blank lines every 5 seconds for non-streaming responses.
NonStreamKeepAlive bool `yaml:"nonstream-keepalive" json:"nonstream-keepalive"`
} }
// StreamingConfig holds server streaming behavior configuration. // StreamingConfig holds server streaming behavior configuration.

View File

@@ -54,6 +54,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string {
if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix { if oldCfg.ForceModelPrefix != newCfg.ForceModelPrefix {
changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix)) changes = append(changes, fmt.Sprintf("force-model-prefix: %t -> %t", oldCfg.ForceModelPrefix, newCfg.ForceModelPrefix))
} }
if oldCfg.NonStreamKeepAlive != newCfg.NonStreamKeepAlive {
changes = append(changes, fmt.Sprintf("nonstream-keepalive: %t -> %t", oldCfg.NonStreamKeepAlive, newCfg.NonStreamKeepAlive))
}
// Quota-exceeded behavior // Quota-exceeded behavior
if oldCfg.QuotaExceeded.SwitchProject != newCfg.QuotaExceeded.SwitchProject { if oldCfg.QuotaExceeded.SwitchProject != newCfg.QuotaExceeded.SwitchProject {

View File

@@ -231,10 +231,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) {
AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false}, AmpCode: config.AmpCode{UpstreamAPIKey: "keep", RestrictManagementToLocalhost: false},
RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"}, RemoteManagement: config.RemoteManagement{DisableControlPanel: false, PanelGitHubRepository: "old/repo", SecretKey: "keep"},
SDKConfig: sdkconfig.SDKConfig{ SDKConfig: sdkconfig.SDKConfig{
RequestLog: false, RequestLog: false,
ProxyURL: "http://old-proxy", ProxyURL: "http://old-proxy",
APIKeys: []string{"key-1"}, APIKeys: []string{"key-1"},
ForceModelPrefix: false, ForceModelPrefix: false,
NonStreamKeepAlive: false,
}, },
} }
newCfg := &config.Config{ newCfg := &config.Config{
@@ -267,10 +268,11 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) {
SecretKey: "", SecretKey: "",
}, },
SDKConfig: sdkconfig.SDKConfig{ SDKConfig: sdkconfig.SDKConfig{
RequestLog: true, RequestLog: true,
ProxyURL: "http://new-proxy", ProxyURL: "http://new-proxy",
APIKeys: []string{" key-1 ", "key-2"}, APIKeys: []string{" key-1 ", "key-2"},
ForceModelPrefix: true, ForceModelPrefix: true,
NonStreamKeepAlive: true,
}, },
} }
@@ -285,6 +287,7 @@ func TestBuildConfigChangeDetails_FlagsAndKeys(t *testing.T) {
expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy") expectContains(t, details, "proxy-url: http://old-proxy -> http://new-proxy")
expectContains(t, details, "ws-auth: false -> true") expectContains(t, details, "ws-auth: false -> true")
expectContains(t, details, "force-model-prefix: false -> true") expectContains(t, details, "force-model-prefix: false -> true")
expectContains(t, details, "nonstream-keepalive: false -> true")
expectContains(t, details, "quota-exceeded.switch-project: false -> true") expectContains(t, details, "quota-exceeded.switch-project: false -> true")
expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true") expectContains(t, details, "quota-exceeded.switch-preview-model: false -> true")
expectContains(t, details, "api-keys count: 1 -> 2") expectContains(t, details, "api-keys count: 1 -> 2")

View File

@@ -146,10 +146,12 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO
c.Header("Content-Type", "application/json") c.Header("Content-Type", "application/json")
alt := h.GetAlt(c) alt := h.GetAlt(c)
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
modelName := gjson.GetBytes(rawJSON, "model").String() modelName := gjson.GetBytes(rawJSON, "model").String()
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
stopKeepAlive()
if errMsg != nil { if errMsg != nil {
h.WriteErrorResponse(c, errMsg) h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error) cliCancel(errMsg.Error)
@@ -159,13 +161,18 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO
// Decompress gzipped responses - Claude API sometimes returns gzip without Content-Encoding header // Decompress gzipped responses - Claude API sometimes returns gzip without Content-Encoding header
// This fixes title generation and other non-streaming responses that arrive compressed // This fixes title generation and other non-streaming responses that arrive compressed
if len(resp) >= 2 && resp[0] == 0x1f && resp[1] == 0x8b { if len(resp) >= 2 && resp[0] == 0x1f && resp[1] == 0x8b {
gzReader, err := gzip.NewReader(bytes.NewReader(resp)) gzReader, errGzip := gzip.NewReader(bytes.NewReader(resp))
if err != nil { if errGzip != nil {
log.Warnf("failed to decompress gzipped Claude response: %v", err) log.Warnf("failed to decompress gzipped Claude response: %v", errGzip)
} else { } else {
defer gzReader.Close() defer func() {
if decompressed, err := io.ReadAll(gzReader); err != nil { if errClose := gzReader.Close(); errClose != nil {
log.Warnf("failed to read decompressed Claude response: %v", err) log.Warnf("failed to close Claude gzip reader: %v", errClose)
}
}()
decompressed, errRead := io.ReadAll(gzReader)
if errRead != nil {
log.Warnf("failed to read decompressed Claude response: %v", errRead)
} else { } else {
resp = decompressed resp = decompressed
} }

View File

@@ -336,7 +336,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin
c.Header("Content-Type", "application/json") c.Header("Content-Type", "application/json")
alt := h.GetAlt(c) alt := h.GetAlt(c)
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt) resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
stopKeepAlive()
if errMsg != nil { if errMsg != nil {
h.WriteErrorResponse(c, errMsg) h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error) cliCancel(errMsg.Error)

View File

@@ -9,6 +9,7 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"strings" "strings"
"sync"
"time" "time"
"github.com/gin-gonic/gin" "github.com/gin-gonic/gin"
@@ -48,6 +49,7 @@ const idempotencyKeyMetadataKey = "idempotency_key"
const ( const (
defaultStreamingKeepAliveSeconds = 0 defaultStreamingKeepAliveSeconds = 0
defaultStreamingBootstrapRetries = 0 defaultStreamingBootstrapRetries = 0
nonStreamingKeepAliveInterval = 5 * time.Second
) )
// BuildErrorResponseBody builds an OpenAI-compatible JSON error response body. // BuildErrorResponseBody builds an OpenAI-compatible JSON error response body.
@@ -293,6 +295,52 @@ func (h *BaseAPIHandler) GetContextWithCancel(handler interfaces.APIHandler, c *
} }
} }
// StartNonStreamingKeepAlive emits blank lines every 5 seconds while waiting for a non-streaming response.
// It returns a stop function that must be called before writing the final response.
func (h *BaseAPIHandler) StartNonStreamingKeepAlive(c *gin.Context, ctx context.Context) func() {
if h == nil || h.Cfg == nil || !h.Cfg.NonStreamKeepAlive {
return func() {}
}
if c == nil {
return func() {}
}
flusher, ok := c.Writer.(http.Flusher)
if !ok {
return func() {}
}
if ctx == nil {
ctx = context.Background()
}
stopChan := make(chan struct{})
var stopOnce sync.Once
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ticker := time.NewTicker(nonStreamingKeepAliveInterval)
defer ticker.Stop()
for {
select {
case <-stopChan:
return
case <-ctx.Done():
return
case <-ticker.C:
_, _ = c.Writer.Write([]byte("\n"))
flusher.Flush()
}
}
}()
return func() {
stopOnce.Do(func() {
close(stopChan)
})
wg.Wait()
}
}
// appendAPIResponse preserves any previously captured API response and appends new data. // appendAPIResponse preserves any previously captured API response and appends new data.
func appendAPIResponse(c *gin.Context, data []byte) { func appendAPIResponse(c *gin.Context, data []byte) {
if c == nil || len(data) == 0 { if c == nil || len(data) == 0 {

View File

@@ -524,7 +524,9 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context,
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String() modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "") resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "")
stopKeepAlive()
if errMsg != nil { if errMsg != nil {
h.WriteErrorResponse(c, errMsg) h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error) cliCancel(errMsg.Error)

View File

@@ -103,20 +103,17 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponse(c *gin.Context, r
modelName := gjson.GetBytes(rawJSON, "model").String() modelName := gjson.GetBytes(rawJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
defer func() { stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
cliCancel()
}()
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "") resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
stopKeepAlive()
if errMsg != nil { if errMsg != nil {
h.WriteErrorResponse(c, errMsg) h.WriteErrorResponse(c, errMsg)
cliCancel(errMsg.Error)
return return
} }
_, _ = c.Writer.Write(resp) _, _ = c.Writer.Write(resp)
return cliCancel()
// no legacy fallback
} }
// handleStreamingResponse handles streaming responses for Gemini models. // handleStreamingResponse handles streaming responses for Gemini models.