Compare commits

...

5 Commits

Author SHA1 Message Date
Luis Pater
6b23e2da74 feat(claude): add Claude 4.5 Haiku model definition 2025-10-16 04:53:07 +08:00
Luis Pater
5ab0854b5b fix(claude): track message_start event in streaming response
Add a `MessageStarted` flag to `ConvertOpenAIResponseToAnthropicParams` to ensure the `message_start` event is emitted only once during streaming.
Refactor response handling to detect streaming mode via the `stream` field instead of the `object` type, simplifying the branching logic.
Update the streaming conversion to set `MessageStarted` after sending the `message_start` event, preventing duplicate starts.
These changes improve correctness of streaming response handling for Claude integration.
2025-10-16 03:54:48 +08:00
Adamcf
15981aa412 fix: add Claude→Claude passthrough to prevent SSE event fragmentation
When from==to (Claude→Claude scenario), directly forward SSE stream
line-by-line without invoking TranslateStream. This preserves the
multi-line SSE event structure (event:/data:/blank) and prevents
JSON parsing errors caused by event fragmentation.

Resolves: JSON parsing error when using Claude Code streaming responses

fix: correct SSE event formatting in Handler layer

Remove duplicate newline additions (\n\n) that were breaking SSE event format.
The Executor layer already provides properly formatted SSE chunks with correct
line endings, so the Handler should forward them as-is without modification.

Changes:
- Remove redundant \n\n addition after each chunk
- Add len(chunk) > 0 check before writing
- Format error messages as proper SSE events (event: error\ndata: {...}\n\n)
- Add chunkIdx counter for future debugging needs

This fixes JSON parsing errors caused by malformed SSE event streams.

fix: update comments for clarity in SSE event forwarding
2025-10-15 22:13:44 +08:00
Luis Pater
ac4f52c532 Merge pull request #127 from router-for-me/usage
fix(server): snapshot config with YAML to handle in-place mutations
2025-10-15 21:39:44 +08:00
hkfires
84fa497169 fix(server): snapshot config with YAML to handle in-place mutations
- Add oldConfigYaml to store previous config snapshot
- Rebuild oldCfg from YAML in UpdateClients for reliable change detection
- Initialize and refresh snapshot on startup and after updates
- Prevents change detection bugs when Management API mutates cfg in place
- Import gopkg.in/yaml.v3
2025-10-15 18:26:23 +08:00
5 changed files with 119 additions and 25 deletions

View File

@@ -32,6 +32,7 @@ import (
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers/openai"
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v3"
)
const oauthCallbackSuccessHTML = `<html><head><meta charset="utf-8"><title>Authentication successful</title><script>setTimeout(function(){window.close();},5000);</script></head><body><h1>Authentication successful!</h1><p>You can close this window.</p><p>This window will close automatically in 5 seconds.</p></body></html>`
@@ -116,6 +117,10 @@ type Server struct {
// cfg holds the current server configuration.
cfg *config.Config
// oldConfigYaml stores a YAML snapshot of the previous configuration for change detection.
// This prevents issues when the config object is modified in place by Management API.
oldConfigYaml []byte
// accessManager handles request authentication providers.
accessManager *sdkaccess.Manager
@@ -220,6 +225,8 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
currentPath: wd,
envManagementSecret: envManagementSecret,
}
// Save initial YAML snapshot
s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.applyAccessConfig(nil, cfg)
// Initialize management handler
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
@@ -654,7 +661,11 @@ func (s *Server) applyAccessConfig(oldCfg, newCfg *config.Config) {
// - clients: The new slice of AI service clients
// - cfg: The new application configuration
func (s *Server) UpdateClients(cfg *config.Config) {
oldCfg := s.cfg
// Reconstruct old config from YAML snapshot to avoid reference sharing issues
var oldCfg *config.Config
if len(s.oldConfigYaml) > 0 {
_ = yaml.Unmarshal(s.oldConfigYaml, &oldCfg)
}
// Update request logger enabled state if it has changed
previousRequestLog := false
@@ -735,6 +746,8 @@ func (s *Server) UpdateClients(cfg *config.Config) {
s.applyAccessConfig(oldCfg, cfg)
s.cfg = cfg
// Save YAML snapshot for next comparison
s.oldConfigYaml, _ = yaml.Marshal(cfg)
s.handlers.UpdateClients(&cfg.SDKConfig)
if !cfg.RemoteManagement.DisableControlPanel {

View File

@@ -8,6 +8,15 @@ import "time"
// GetClaudeModels returns the standard Claude model definitions
func GetClaudeModels() []*ModelInfo {
return []*ModelInfo{
{
ID: "claude-haiku-4-5-20251001",
Object: "model",
Created: 1759276800, // 2025-10-01
OwnedBy: "anthropic",
Type: "claude",
DisplayName: "Claude 4.5 Haiku",
},
{
ID: "claude-sonnet-4-5-20250929",
Object: "model",

View File

@@ -143,6 +143,31 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
go func() {
defer close(out)
defer func() { _ = resp.Body.Close() }()
// If from == to (Claude → Claude), directly forward the SSE stream without translation
if from == to {
scanner := bufio.NewScanner(resp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)
for scanner.Scan() {
line := scanner.Bytes()
appendAPIResponseChunk(ctx, e.cfg, line)
if detail, ok := parseClaudeStreamUsage(line); ok {
reporter.publish(ctx, detail)
}
// Forward the line as-is to preserve SSE format
cloned := make([]byte, len(line)+1)
copy(cloned, line)
cloned[len(line)] = '\n'
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
}
if err = scanner.Err(); err != nil {
out <- cliproxyexecutor.StreamChunk{Err: err}
}
return
}
// For other formats, use translation
scanner := bufio.NewScanner(resp.Body)
buf := make([]byte, 20_971_520)
scanner.Buffer(buf, 20_971_520)

View File

@@ -37,6 +37,8 @@ type ConvertOpenAIResponseToAnthropicParams struct {
ContentBlocksStopped bool
// Track if message_delta has been sent
MessageDeltaSent bool
// Track if message_start has been sent
MessageStarted bool
}
// ToolCallAccumulator holds the state for accumulating tool call data
@@ -84,20 +86,12 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
}
root := gjson.ParseBytes(rawJSON)
// Check if this is a streaming chunk or non-streaming response
objectType := root.Get("object").String()
if objectType == "chat.completion.chunk" {
// Handle streaming response
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
} else if objectType == "chat.completion" {
// Handle non-streaming response
streamResult := gjson.GetBytes(originalRequestRawJSON, "stream")
if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) {
return convertOpenAINonStreamingToAnthropic(rawJSON)
} else {
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
}
return []string{}
}
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
@@ -118,7 +112,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
// Check if this is the first chunk (has role)
if delta := root.Get("choices.0.delta"); delta.Exists() {
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" {
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" && !param.MessageStarted {
// Send message_start event
messageStart := map[string]interface{}{
"type": "message_start",
@@ -138,6 +132,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
}
messageStartJSON, _ := json.Marshal(messageStart)
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
param.MessageStarted = true
// Don't send content_block_start for text here - wait for actual content
}

View File

@@ -7,8 +7,9 @@
package claude
import (
"bytes"
"bufio"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
@@ -197,33 +198,65 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
}
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
// v6.1: Intelligent Buffered Streamer strategy
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
defer ticker.Stop()
var chunkIdx int
for {
select {
case <-c.Request.Context().Done():
// Context cancelled, flush any remaining data before exit
_ = writer.Flush()
cancel(c.Request.Context().Err())
return
case <-ticker.C:
// Smart flush: only flush when buffer has sufficient data (≥50% full)
// This reduces flush frequency while ensuring data flows naturally
buffered := writer.Buffered()
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
if err := writer.Flush(); err != nil {
// Error flushing, cancel and return
cancel(err)
return
}
flusher.Flush() // Also flush the underlying http.ResponseWriter
}
case chunk, ok := <-data:
if !ok {
flusher.Flush()
// Stream ended, flush remaining data
_ = writer.Flush()
cancel(nil)
return
}
if bytes.HasPrefix(chunk, []byte("event:")) {
_, _ = c.Writer.Write([]byte("\n"))
// Forward the complete SSE event block directly (already formatted by the translator).
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
// The handler just needs to forward it without reassembly.
if len(chunk) > 0 {
_, _ = writer.Write(chunk)
}
chunkIdx++
_, _ = c.Writer.Write(chunk)
_, _ = c.Writer.Write([]byte("\n"))
flusher.Flush()
case errMsg, ok := <-errs:
if !ok {
continue
}
if errMsg != nil {
h.WriteErrorResponse(c, errMsg)
flusher.Flush()
// An error occurred: emit as a proper SSE error event
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
_, _ = writer.WriteString("event: error\n")
_, _ = writer.WriteString("data: ")
_, _ = writer.Write(errorBytes)
_, _ = writer.WriteString("\n\n")
_ = writer.Flush()
}
var execErr error
if errMsg != nil {
@@ -231,7 +264,26 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
}
cancel(execErr)
return
case <-time.After(500 * time.Millisecond):
}
}
}
type claudeErrorDetail struct {
Type string `json:"type"`
Message string `json:"message"`
}
type claudeErrorResponse struct {
Type string `json:"type"`
Error claudeErrorDetail `json:"error"`
}
func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse {
return claudeErrorResponse{
Type: "error",
Error: claudeErrorDetail{
Type: "api_error",
Message: msg.Error.Error(),
},
}
}