mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-02 12:30:50 +08:00
Compare commits
5 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b23e2da74 | ||
|
|
5ab0854b5b | ||
|
|
15981aa412 | ||
|
|
ac4f52c532 | ||
|
|
84fa497169 |
@@ -32,6 +32,7 @@ import (
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/api/handlers/openai"
|
||||
"github.com/router-for-me/CLIProxyAPI/v6/sdk/cliproxy/auth"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
|
||||
const oauthCallbackSuccessHTML = `<html><head><meta charset="utf-8"><title>Authentication successful</title><script>setTimeout(function(){window.close();},5000);</script></head><body><h1>Authentication successful!</h1><p>You can close this window.</p><p>This window will close automatically in 5 seconds.</p></body></html>`
|
||||
@@ -116,6 +117,10 @@ type Server struct {
|
||||
// cfg holds the current server configuration.
|
||||
cfg *config.Config
|
||||
|
||||
// oldConfigYaml stores a YAML snapshot of the previous configuration for change detection.
|
||||
// This prevents issues when the config object is modified in place by Management API.
|
||||
oldConfigYaml []byte
|
||||
|
||||
// accessManager handles request authentication providers.
|
||||
accessManager *sdkaccess.Manager
|
||||
|
||||
@@ -220,6 +225,8 @@ func NewServer(cfg *config.Config, authManager *auth.Manager, accessManager *sdk
|
||||
currentPath: wd,
|
||||
envManagementSecret: envManagementSecret,
|
||||
}
|
||||
// Save initial YAML snapshot
|
||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||
s.applyAccessConfig(nil, cfg)
|
||||
// Initialize management handler
|
||||
s.mgmt = managementHandlers.NewHandler(cfg, configFilePath, authManager)
|
||||
@@ -654,7 +661,11 @@ func (s *Server) applyAccessConfig(oldCfg, newCfg *config.Config) {
|
||||
// - clients: The new slice of AI service clients
|
||||
// - cfg: The new application configuration
|
||||
func (s *Server) UpdateClients(cfg *config.Config) {
|
||||
oldCfg := s.cfg
|
||||
// Reconstruct old config from YAML snapshot to avoid reference sharing issues
|
||||
var oldCfg *config.Config
|
||||
if len(s.oldConfigYaml) > 0 {
|
||||
_ = yaml.Unmarshal(s.oldConfigYaml, &oldCfg)
|
||||
}
|
||||
|
||||
// Update request logger enabled state if it has changed
|
||||
previousRequestLog := false
|
||||
@@ -735,6 +746,8 @@ func (s *Server) UpdateClients(cfg *config.Config) {
|
||||
|
||||
s.applyAccessConfig(oldCfg, cfg)
|
||||
s.cfg = cfg
|
||||
// Save YAML snapshot for next comparison
|
||||
s.oldConfigYaml, _ = yaml.Marshal(cfg)
|
||||
s.handlers.UpdateClients(&cfg.SDKConfig)
|
||||
|
||||
if !cfg.RemoteManagement.DisableControlPanel {
|
||||
|
||||
@@ -8,6 +8,15 @@ import "time"
|
||||
// GetClaudeModels returns the standard Claude model definitions
|
||||
func GetClaudeModels() []*ModelInfo {
|
||||
return []*ModelInfo{
|
||||
|
||||
{
|
||||
ID: "claude-haiku-4-5-20251001",
|
||||
Object: "model",
|
||||
Created: 1759276800, // 2025-10-01
|
||||
OwnedBy: "anthropic",
|
||||
Type: "claude",
|
||||
DisplayName: "Claude 4.5 Haiku",
|
||||
},
|
||||
{
|
||||
ID: "claude-sonnet-4-5-20250929",
|
||||
Object: "model",
|
||||
|
||||
@@ -143,6 +143,31 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
||||
go func() {
|
||||
defer close(out)
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
// If from == to (Claude → Claude), directly forward the SSE stream without translation
|
||||
if from == to {
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 20_971_520)
|
||||
scanner.Buffer(buf, 20_971_520)
|
||||
for scanner.Scan() {
|
||||
line := scanner.Bytes()
|
||||
appendAPIResponseChunk(ctx, e.cfg, line)
|
||||
if detail, ok := parseClaudeStreamUsage(line); ok {
|
||||
reporter.publish(ctx, detail)
|
||||
}
|
||||
// Forward the line as-is to preserve SSE format
|
||||
cloned := make([]byte, len(line)+1)
|
||||
copy(cloned, line)
|
||||
cloned[len(line)] = '\n'
|
||||
out <- cliproxyexecutor.StreamChunk{Payload: cloned}
|
||||
}
|
||||
if err = scanner.Err(); err != nil {
|
||||
out <- cliproxyexecutor.StreamChunk{Err: err}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// For other formats, use translation
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 20_971_520)
|
||||
scanner.Buffer(buf, 20_971_520)
|
||||
|
||||
@@ -37,6 +37,8 @@ type ConvertOpenAIResponseToAnthropicParams struct {
|
||||
ContentBlocksStopped bool
|
||||
// Track if message_delta has been sent
|
||||
MessageDeltaSent bool
|
||||
// Track if message_start has been sent
|
||||
MessageStarted bool
|
||||
}
|
||||
|
||||
// ToolCallAccumulator holds the state for accumulating tool call data
|
||||
@@ -84,20 +86,12 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR
|
||||
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
}
|
||||
|
||||
root := gjson.ParseBytes(rawJSON)
|
||||
|
||||
// Check if this is a streaming chunk or non-streaming response
|
||||
objectType := root.Get("object").String()
|
||||
|
||||
if objectType == "chat.completion.chunk" {
|
||||
// Handle streaming response
|
||||
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
} else if objectType == "chat.completion" {
|
||||
// Handle non-streaming response
|
||||
streamResult := gjson.GetBytes(originalRequestRawJSON, "stream")
|
||||
if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) {
|
||||
return convertOpenAINonStreamingToAnthropic(rawJSON)
|
||||
} else {
|
||||
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
|
||||
}
|
||||
|
||||
return []string{}
|
||||
}
|
||||
|
||||
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
|
||||
@@ -118,7 +112,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
|
||||
// Check if this is the first chunk (has role)
|
||||
if delta := root.Get("choices.0.delta"); delta.Exists() {
|
||||
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" {
|
||||
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" && !param.MessageStarted {
|
||||
// Send message_start event
|
||||
messageStart := map[string]interface{}{
|
||||
"type": "message_start",
|
||||
@@ -138,6 +132,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
|
||||
}
|
||||
messageStartJSON, _ := json.Marshal(messageStart)
|
||||
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
|
||||
param.MessageStarted = true
|
||||
|
||||
// Don't send content_block_start for text here - wait for actual content
|
||||
}
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
package claude
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"bufio"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"time"
|
||||
@@ -197,33 +198,65 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
||||
}
|
||||
|
||||
func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.Flusher, cancel func(error), data <-chan []byte, errs <-chan *interfaces.ErrorMessage) {
|
||||
// v6.1: Intelligent Buffered Streamer strategy
|
||||
// Enhanced buffering with larger buffer size (16KB) and longer flush interval (120ms).
|
||||
// Smart flush only when buffer is sufficiently filled (≥50%), dramatically reducing
|
||||
// flush frequency from ~12.5Hz to ~5-8Hz while maintaining low latency.
|
||||
writer := bufio.NewWriterSize(c.Writer, 16*1024) // 4KB → 16KB
|
||||
ticker := time.NewTicker(120 * time.Millisecond) // 80ms → 120ms
|
||||
defer ticker.Stop()
|
||||
|
||||
var chunkIdx int
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-c.Request.Context().Done():
|
||||
// Context cancelled, flush any remaining data before exit
|
||||
_ = writer.Flush()
|
||||
cancel(c.Request.Context().Err())
|
||||
return
|
||||
|
||||
case <-ticker.C:
|
||||
// Smart flush: only flush when buffer has sufficient data (≥50% full)
|
||||
// This reduces flush frequency while ensuring data flows naturally
|
||||
buffered := writer.Buffered()
|
||||
if buffered >= 8*1024 { // At least 8KB (50% of 16KB buffer)
|
||||
if err := writer.Flush(); err != nil {
|
||||
// Error flushing, cancel and return
|
||||
cancel(err)
|
||||
return
|
||||
}
|
||||
flusher.Flush() // Also flush the underlying http.ResponseWriter
|
||||
}
|
||||
|
||||
case chunk, ok := <-data:
|
||||
if !ok {
|
||||
flusher.Flush()
|
||||
// Stream ended, flush remaining data
|
||||
_ = writer.Flush()
|
||||
cancel(nil)
|
||||
return
|
||||
}
|
||||
|
||||
if bytes.HasPrefix(chunk, []byte("event:")) {
|
||||
_, _ = c.Writer.Write([]byte("\n"))
|
||||
// Forward the complete SSE event block directly (already formatted by the translator).
|
||||
// The translator returns a complete SSE-compliant event block, including event:, data:, and separators.
|
||||
// The handler just needs to forward it without reassembly.
|
||||
if len(chunk) > 0 {
|
||||
_, _ = writer.Write(chunk)
|
||||
}
|
||||
chunkIdx++
|
||||
|
||||
_, _ = c.Writer.Write(chunk)
|
||||
_, _ = c.Writer.Write([]byte("\n"))
|
||||
|
||||
flusher.Flush()
|
||||
case errMsg, ok := <-errs:
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if errMsg != nil {
|
||||
h.WriteErrorResponse(c, errMsg)
|
||||
flusher.Flush()
|
||||
// An error occurred: emit as a proper SSE error event
|
||||
errorBytes, _ := json.Marshal(h.toClaudeError(errMsg))
|
||||
_, _ = writer.WriteString("event: error\n")
|
||||
_, _ = writer.WriteString("data: ")
|
||||
_, _ = writer.Write(errorBytes)
|
||||
_, _ = writer.WriteString("\n\n")
|
||||
_ = writer.Flush()
|
||||
}
|
||||
var execErr error
|
||||
if errMsg != nil {
|
||||
@@ -231,7 +264,26 @@ func (h *ClaudeCodeAPIHandler) forwardClaudeStream(c *gin.Context, flusher http.
|
||||
}
|
||||
cancel(execErr)
|
||||
return
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type claudeErrorDetail struct {
|
||||
Type string `json:"type"`
|
||||
Message string `json:"message"`
|
||||
}
|
||||
|
||||
type claudeErrorResponse struct {
|
||||
Type string `json:"type"`
|
||||
Error claudeErrorDetail `json:"error"`
|
||||
}
|
||||
|
||||
func (h *ClaudeCodeAPIHandler) toClaudeError(msg *interfaces.ErrorMessage) claudeErrorResponse {
|
||||
return claudeErrorResponse{
|
||||
Type: "error",
|
||||
Error: claudeErrorDetail{
|
||||
Type: "api_error",
|
||||
Message: msg.Error.Error(),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user