Compare commits

...

5 Commits

Author SHA1 Message Date
Luis Pater
c2b2c9eafe Update README documentation to clarify auth-dir configuration for Windows users
- Added a note for setting `auth-dir` on Windows systems in both English and Chinese README files.
- Improved descriptions for existing configuration options.
2025-08-29 09:52:10 +08:00
Luis Pater
09b9d3b3fa Unlock mutex before returning error in handlers.go to prevent deadlocks 2025-08-29 09:46:12 +08:00
Luis Pater
e9e0016a63 Fix some bugs. 2025-08-29 04:05:08 +08:00
Luis Pater
3704dae342 Add nil-check for GetRequestMutex across handlers to prevent potential panics
- Updated all handlers to safely unlock the request mutex only if it's non-nil.
- Enhanced mutex locking and unlocking logic to avoid runtime errors.
- Improved robustness of resource cleanup across clients.

Add `GetRequestMutex` method for synchronization across clients

- Introduced a new `GetRequestMutex` method in OpenAICompatibilityClient, CodexClient, GeminiCLIClient, GeminiClient, and QwenClient for request synchronization.
- Ensures only one request is processed at a time to manage quotas effectively.
2025-08-29 00:23:37 +08:00
Luis Pater
bea5f97cbf Add /v1/completions endpoint with OpenAI compatibility
- Implemented `/v1/completions` endpoint mirroring OpenAI's completions API specification.
- Added conversion functions to translate between completions and chat completions formats.
- Introduced streaming and non-streaming response handling for completions requests.
- Updated `server.go` to register the new endpoint and include it in the API's metadata.
2025-08-28 00:30:46 +08:00
15 changed files with 592 additions and 62 deletions

View File

@@ -239,28 +239,28 @@ The server uses a YAML configuration file (`config.yaml`) located in the project
### Configuration Options
| Parameter | Type | Default | Description |
|-----------------------------------------|----------|--------------------|---------------------------------------------------------------------------------------------------------|
| `port` | integer | 8317 | The port number on which the server will listen. |
| `auth-dir` | string | "~/.cli-proxy-api" | Directory where authentication tokens are stored. Supports using `~` for the home directory. |
| `proxy-url` | string | "" | Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/ |
| `request-retry` | integer | 0 | Number of times to retry a request. Retries will occur if the HTTP response code is 403, 408, 500, 502, 503, or 504. |
| `quota-exceeded` | object | {} | Configuration for handling quota exceeded. |
| `quota-exceeded.switch-project` | boolean | true | Whether to automatically switch to another project when a quota is exceeded. |
| `quota-exceeded.switch-preview-model` | boolean | true | Whether to automatically switch to a preview model when a quota is exceeded. |
| `debug` | boolean | false | Enable debug mode for verbose logging. |
| `api-keys` | string[] | [] | List of API keys that can be used to authenticate requests. |
| `generative-language-api-key` | string[] | [] | List of Generative Language API keys. |
| `claude-api-key` | object | {} | List of Claude API keys. |
| `claude-api-key.api-key` | string | "" | Claude API key. |
| `claude-api-key.base-url` | string | "" | Custom Claude API endpoint, if you use a third-party API endpoint. |
| `openai-compatibility` | object[] | [] | Upstream OpenAI-compatible providers configuration (name, base-url, api-keys, models). |
| `openai-compatibility.*.name` | string | "" | The name of the provider. It will be used in the user agent and other places. |
| `openai-compatibility.*.base-url` | string | "" | The base URL of the provider. |
| `openai-compatibility.*.api-keys` | string[] | [] | The API keys for the provider. Add multiple keys if needed. Omit if unauthenticated access is allowed. |
| `openai-compatibility.*.models` | object[] | [] | The actual model name. |
| `openai-compatibility.*.models.*.name` | string | "" | The models supported by the provider. |
| `openai-compatibility.*.models.*.alias` | string | "" | The alias used in the API. |
| Parameter | Type | Default | Description |
|-----------------------------------------|----------|--------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| `port` | integer | 8317 | The port number on which the server will listen. |
| `auth-dir` | string | "~/.cli-proxy-api" | Directory where authentication tokens are stored. Supports using `~` for the home directory. If you use Windows, please set the directory like this: `C:/cli-proxy-api/` |
| `proxy-url` | string | "" | Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/ |
| `request-retry` | integer | 0 | Number of times to retry a request. Retries will occur if the HTTP response code is 403, 408, 500, 502, 503, or 504. |
| `quota-exceeded` | object | {} | Configuration for handling quota exceeded. |
| `quota-exceeded.switch-project` | boolean | true | Whether to automatically switch to another project when a quota is exceeded. |
| `quota-exceeded.switch-preview-model` | boolean | true | Whether to automatically switch to a preview model when a quota is exceeded. |
| `debug` | boolean | false | Enable debug mode for verbose logging. |
| `api-keys` | string[] | [] | List of API keys that can be used to authenticate requests. |
| `generative-language-api-key` | string[] | [] | List of Generative Language API keys. |
| `claude-api-key` | object | {} | List of Claude API keys. |
| `claude-api-key.api-key` | string | "" | Claude API key. |
| `claude-api-key.base-url` | string | "" | Custom Claude API endpoint, if you use a third-party API endpoint. |
| `openai-compatibility` | object[] | [] | Upstream OpenAI-compatible providers configuration (name, base-url, api-keys, models). |
| `openai-compatibility.*.name` | string | "" | The name of the provider. It will be used in the user agent and other places. |
| `openai-compatibility.*.base-url` | string | "" | The base URL of the provider. |
| `openai-compatibility.*.api-keys` | string[] | [] | The API keys for the provider. Add multiple keys if needed. Omit if unauthenticated access is allowed. |
| `openai-compatibility.*.models` | object[] | [] | The actual model name. |
| `openai-compatibility.*.models.*.name` | string | "" | The models supported by the provider. |
| `openai-compatibility.*.models.*.alias` | string | "" | The alias used in the API. |
### Example Configuration File
@@ -268,7 +268,7 @@ The server uses a YAML configuration file (`config.yaml`) located in the project
# Server port
port: 8317
# Authentication directory (supports ~ for home directory)
# Authentication directory (supports ~ for home directory). If you use Windows, please set the directory like this: `C:/cli-proxy-api/`
auth-dir: "~/.cli-proxy-api"
# Enable debug logging

View File

@@ -238,28 +238,28 @@ console.log(await claudeResponse.json());
### 配置选项
| 参数 | 类型 | 默认值 | 描述 |
|---------------------------------------|----------|--------------------|---------------------------------------------------------------------------------------------|
| `port` | integer | 8317 | 服务器将监听的端口号。 |
| `auth-dir` | string | "~/.cli-proxy-api" | 存储身份验证令牌的目录。支持使用 `~` 来表示主目录。 |
| `proxy-url` | string | "" | 代理URL。支持socks5/http/https协议。例如socks5://user:pass@192.168.1.1:1080/ |
| `request-retry` | integer | 0 | 请求重试次数。如果HTTP响应码为403、408、500、502、503或504将会触发重试。 |
| `quota-exceeded` | object | {} | 用于处理配额超限的配置。 |
| `quota-exceeded.switch-project` | boolean | true | 当配额超限时,是否自动切换到另一个项目。 |
| `quota-exceeded.switch-preview-model` | boolean | true | 当配额超限时,是否自动切换到预览模型。 |
| `debug` | boolean | false | 启用调试模式以获取详细日志。 |
| `api-keys` | string[] | [] | 可用于验证请求的API密钥列表。 |
| `generative-language-api-key` | string[] | [] | 生成式语言API密钥列表。 |
| `claude-api-key` | object | {} | Claude API密钥列表。 |
| `claude-api-key.api-key` | string | "" | Claude API密钥。 |
| `claude-api-key.base-url` | string | "" | 自定义的Claude API端点如果您使用第三方的API端点。 |
| `openai-compatibility` | object[] | [] | 上游OpenAI兼容提供商的配置名称、基础URL、API密钥、模型 |
| `openai-compatibility.*.name` | string | "" | 提供商的名称。它将被用于用户代理User Agent和其他地方。 |
| `openai-compatibility.*.base-url` | string | "" | 提供商的基础URL。 |
| `openai-compatibility.*.api-keys` | string[] | [] | 提供商的API密钥。如果需要可以添加多个密钥。如果允许未经身份验证的访问则可以省略。 |
| `openai-compatibility.*.models` | object[] | [] | 实际的模型名称。 |
| `openai-compatibility.*.models.*.name` | string | "" | 提供商支持的模型。 |
| `openai-compatibility.*.models.*.alias` | string | "" | 在API中使用的别名。 |
| 参数 | 类型 | 默认值 | 描述 |
|-----------------------------------------|----------|--------------------|---------------------------------------------------------------------|
| `port` | integer | 8317 | 服务器将监听的端口号。 |
| `auth-dir` | string | "~/.cli-proxy-api" | 存储身份验证令牌的目录。支持使用 `~` 来表示主目录。如果你使用Windows建议设置成`C:/cli-proxy-api/`。 |
| `proxy-url` | string | "" | 代理URL。支持socks5/http/https协议。例如socks5://user:pass@192.168.1.1:1080/ |
| `request-retry` | integer | 0 | 请求重试次数。如果HTTP响应码为403、408、500、502、503或504将会触发重试。 |
| `quota-exceeded` | object | {} | 用于处理配额超限的配置。 |
| `quota-exceeded.switch-project` | boolean | true | 当配额超限时,是否自动切换到另一个项目。 |
| `quota-exceeded.switch-preview-model` | boolean | true | 当配额超限时,是否自动切换到预览模型。 |
| `debug` | boolean | false | 启用调试模式以获取详细日志。 |
| `api-keys` | string[] | [] | 可用于验证请求的API密钥列表。 |
| `generative-language-api-key` | string[] | [] | 生成式语言API密钥列表。 |
| `claude-api-key` | object | {} | Claude API密钥列表。 |
| `claude-api-key.api-key` | string | "" | Claude API密钥。 |
| `claude-api-key.base-url` | string | "" | 自定义的Claude API端点如果您使用第三方的API端点。 |
| `openai-compatibility` | object[] | [] | 上游OpenAI兼容提供商的配置名称、基础URL、API密钥、模型。 |
| `openai-compatibility.*.name` | string | "" | 提供商的名称。它将被用于用户代理User Agent和其他地方。 |
| `openai-compatibility.*.base-url` | string | "" | 提供商的基础URL。 |
| `openai-compatibility.*.api-keys` | string[] | [] | 提供商的API密钥。如果需要可以添加多个密钥。如果允许未经身份验证的访问则可以省略。 |
| `openai-compatibility.*.models` | object[] | [] | 实际的模型名称。 |
| `openai-compatibility.*.models.*.name` | string | "" | 提供商支持的模型。 |
| `openai-compatibility.*.models.*.alias` | string | "" | 在API中使用的别名。 |
### 配置文件示例
@@ -267,7 +267,7 @@ console.log(await claudeResponse.json());
# 服务器端口
port: 8317
# 身份验证目录(支持 ~ 表示主目录)
# 身份验证目录(支持 ~ 表示主目录)。如果你使用Windows建议设置成`C:/cli-proxy-api/`。
auth-dir: "~/.cli-proxy-api"
# 启用调试日志

View File

@@ -17,6 +17,7 @@ import (
. "github.com/luispater/CLIProxyAPI/internal/constant"
"github.com/luispater/CLIProxyAPI/internal/interfaces"
"github.com/luispater/CLIProxyAPI/internal/registry"
"github.com/luispater/CLIProxyAPI/internal/util"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
)
@@ -133,7 +134,9 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
// Ensure the client's mutex is unlocked on function exit.
// This prevents deadlocks and ensures proper resource cleanup
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
retryCount := 0
@@ -191,7 +194,7 @@ outLoop:
continue outLoop // Restart the client selection process
}
case 403, 408, 500, 502, 503, 504:
log.Debugf("http status code %d, switch client", errInfo.StatusCode)
log.Debugf("http status code %d, switch client, %s", errInfo.StatusCode, util.HideAPIKey(cliClient.GetEmail()))
retryCount++
continue outLoop
default:

View File

@@ -163,7 +163,9 @@ func (h *GeminiCLIAPIHandler) handleInternalStreamGenerateContent(c *gin.Context
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -244,7 +246,9 @@ func (h *GeminiCLIAPIHandler) handleInternalGenerateContent(c *gin.Context, rawJ
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -214,7 +214,9 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -303,7 +305,9 @@ func (h *GeminiAPIHandler) handleCountTokens(c *gin.Context, modelName string, r
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -354,7 +358,9 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()

View File

@@ -102,18 +102,19 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
}
}
// Lock the mutex to update the last used client index
h.Mutex.Lock()
if _, hasKey := h.LastUsedClientIndex[modelName]; !hasKey {
h.LastUsedClientIndex[modelName] = 0
}
if len(clients) == 0 {
h.Mutex.Unlock()
return nil, &interfaces.ErrorMessage{StatusCode: 500, Error: fmt.Errorf("no clients available")}
}
var cliClient interfaces.Client
// Lock the mutex to update the last used client index
h.Mutex.Lock()
startIndex := h.LastUsedClientIndex[modelName]
if (len(isGenerateContent) > 0 && isGenerateContent[0]) || len(isGenerateContent) == 0 {
currentIndex := (startIndex + 1) % len(clients)
@@ -157,14 +158,20 @@ func (h *BaseAPIHandler) GetClient(modelName string, isGenerateContent ...bool)
locked := false
for i := 0; i < len(reorderedClients); i++ {
cliClient = reorderedClients[i]
if cliClient.GetRequestMutex().TryLock() {
if mutex := cliClient.GetRequestMutex(); mutex != nil {
if mutex.TryLock() {
locked = true
break
}
} else {
locked = true
break
}
}
if !locked {
cliClient = clients[0]
cliClient.GetRequestMutex().Lock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Lock()
}
}
return cliClient, nil

View File

@@ -8,6 +8,7 @@ package openai
import (
"context"
"encoding/json"
"fmt"
"net/http"
"time"
@@ -19,6 +20,7 @@ import (
"github.com/luispater/CLIProxyAPI/internal/registry"
log "github.com/sirupsen/logrus"
"github.com/tidwall/gjson"
"github.com/tidwall/sjson"
)
// OpenAIAPIHandler contains the handlers for OpenAI API endpoints.
@@ -92,6 +94,276 @@ func (h *OpenAIAPIHandler) ChatCompletions(c *gin.Context) {
}
// Completions handles the /v1/completions endpoint.
// It determines whether the request is for a streaming or non-streaming response
// and calls the appropriate handler based on the model provider.
// This endpoint follows the OpenAI completions API specification.
//
// Parameters:
// - c: The Gin context containing the HTTP request and response
func (h *OpenAIAPIHandler) Completions(c *gin.Context) {
rawJSON, err := c.GetRawData()
// If data retrieval fails, return a 400 Bad Request error.
if err != nil {
c.JSON(http.StatusBadRequest, handlers.ErrorResponse{
Error: handlers.ErrorDetail{
Message: fmt.Sprintf("Invalid request: %v", err),
Type: "invalid_request_error",
},
})
return
}
// Check if the client requested a streaming response.
streamResult := gjson.GetBytes(rawJSON, "stream")
if streamResult.Type == gjson.True {
h.handleCompletionsStreamingResponse(c, rawJSON)
} else {
h.handleCompletionsNonStreamingResponse(c, rawJSON)
}
}
// convertCompletionsRequestToChatCompletions converts OpenAI completions API request to chat completions format.
// This allows the completions endpoint to use the existing chat completions infrastructure.
//
// Parameters:
// - rawJSON: The raw JSON bytes of the completions request
//
// Returns:
// - []byte: The converted chat completions request
func convertCompletionsRequestToChatCompletions(rawJSON []byte) []byte {
root := gjson.ParseBytes(rawJSON)
// Extract prompt from completions request
prompt := root.Get("prompt").String()
if prompt == "" {
prompt = "Complete this:"
}
// Create chat completions structure
out := `{"model":"","messages":[{"role":"user","content":""}]}`
// Set model
if model := root.Get("model"); model.Exists() {
out, _ = sjson.Set(out, "model", model.String())
}
// Set the prompt as user message content
out, _ = sjson.Set(out, "messages.0.content", prompt)
// Copy other parameters from completions to chat completions
if maxTokens := root.Get("max_tokens"); maxTokens.Exists() {
out, _ = sjson.Set(out, "max_tokens", maxTokens.Int())
}
if temperature := root.Get("temperature"); temperature.Exists() {
out, _ = sjson.Set(out, "temperature", temperature.Float())
}
if topP := root.Get("top_p"); topP.Exists() {
out, _ = sjson.Set(out, "top_p", topP.Float())
}
if frequencyPenalty := root.Get("frequency_penalty"); frequencyPenalty.Exists() {
out, _ = sjson.Set(out, "frequency_penalty", frequencyPenalty.Float())
}
if presencePenalty := root.Get("presence_penalty"); presencePenalty.Exists() {
out, _ = sjson.Set(out, "presence_penalty", presencePenalty.Float())
}
if stop := root.Get("stop"); stop.Exists() {
out, _ = sjson.SetRaw(out, "stop", stop.Raw)
}
if stream := root.Get("stream"); stream.Exists() {
out, _ = sjson.Set(out, "stream", stream.Bool())
}
if logprobs := root.Get("logprobs"); logprobs.Exists() {
out, _ = sjson.Set(out, "logprobs", logprobs.Bool())
}
if topLogprobs := root.Get("top_logprobs"); topLogprobs.Exists() {
out, _ = sjson.Set(out, "top_logprobs", topLogprobs.Int())
}
if echo := root.Get("echo"); echo.Exists() {
out, _ = sjson.Set(out, "echo", echo.Bool())
}
return []byte(out)
}
// convertChatCompletionsResponseToCompletions converts chat completions API response back to completions format.
// This ensures the completions endpoint returns data in the expected format.
//
// Parameters:
// - rawJSON: The raw JSON bytes of the chat completions response
//
// Returns:
// - []byte: The converted completions response
func convertChatCompletionsResponseToCompletions(rawJSON []byte) []byte {
root := gjson.ParseBytes(rawJSON)
// Base completions response structure
out := `{"id":"","object":"text_completion","created":0,"model":"","choices":[]}`
// Copy basic fields
if id := root.Get("id"); id.Exists() {
out, _ = sjson.Set(out, "id", id.String())
}
if created := root.Get("created"); created.Exists() {
out, _ = sjson.Set(out, "created", created.Int())
}
if model := root.Get("model"); model.Exists() {
out, _ = sjson.Set(out, "model", model.String())
}
if usage := root.Get("usage"); usage.Exists() {
out, _ = sjson.SetRaw(out, "usage", usage.Raw)
}
// Convert choices from chat completions to completions format
var choices []interface{}
if chatChoices := root.Get("choices"); chatChoices.Exists() && chatChoices.IsArray() {
chatChoices.ForEach(func(_, choice gjson.Result) bool {
completionsChoice := map[string]interface{}{
"index": choice.Get("index").Int(),
}
// Extract text content from message.content
if message := choice.Get("message"); message.Exists() {
if content := message.Get("content"); content.Exists() {
completionsChoice["text"] = content.String()
}
} else if delta := choice.Get("delta"); delta.Exists() {
// For streaming responses, use delta.content
if content := delta.Get("content"); content.Exists() {
completionsChoice["text"] = content.String()
}
}
// Copy finish_reason
if finishReason := choice.Get("finish_reason"); finishReason.Exists() {
completionsChoice["finish_reason"] = finishReason.String()
}
// Copy logprobs if present
if logprobs := choice.Get("logprobs"); logprobs.Exists() {
completionsChoice["logprobs"] = logprobs.Value()
}
choices = append(choices, completionsChoice)
return true
})
}
if len(choices) > 0 {
choicesJSON, _ := json.Marshal(choices)
out, _ = sjson.SetRaw(out, "choices", string(choicesJSON))
}
return []byte(out)
}
// convertChatCompletionsStreamChunkToCompletions converts a streaming chat completions chunk to completions format.
// This handles the real-time conversion of streaming response chunks and filters out empty text responses.
//
// Parameters:
// - chunkData: The raw JSON bytes of a single chat completions stream chunk
//
// Returns:
// - []byte: The converted completions stream chunk, or nil if should be filtered out
func convertChatCompletionsStreamChunkToCompletions(chunkData []byte) []byte {
root := gjson.ParseBytes(chunkData)
// Check if this chunk has any meaningful content
hasContent := false
if chatChoices := root.Get("choices"); chatChoices.Exists() && chatChoices.IsArray() {
chatChoices.ForEach(func(_, choice gjson.Result) bool {
// Check if delta has content or finish_reason
if delta := choice.Get("delta"); delta.Exists() {
if content := delta.Get("content"); content.Exists() && content.String() != "" {
hasContent = true
return false // Break out of forEach
}
}
// Also check for finish_reason to ensure we don't skip final chunks
if finishReason := choice.Get("finish_reason"); finishReason.Exists() && finishReason.String() != "" && finishReason.String() != "null" {
hasContent = true
return false // Break out of forEach
}
return true
})
}
// If no meaningful content, return nil to indicate this chunk should be skipped
if !hasContent {
return nil
}
// Base completions stream response structure
out := `{"id":"","object":"text_completion","created":0,"model":"","choices":[]}`
// Copy basic fields
if id := root.Get("id"); id.Exists() {
out, _ = sjson.Set(out, "id", id.String())
}
if created := root.Get("created"); created.Exists() {
out, _ = sjson.Set(out, "created", created.Int())
}
if model := root.Get("model"); model.Exists() {
out, _ = sjson.Set(out, "model", model.String())
}
// Convert choices from chat completions delta to completions format
var choices []interface{}
if chatChoices := root.Get("choices"); chatChoices.Exists() && chatChoices.IsArray() {
chatChoices.ForEach(func(_, choice gjson.Result) bool {
completionsChoice := map[string]interface{}{
"index": choice.Get("index").Int(),
}
// Extract text content from delta.content
if delta := choice.Get("delta"); delta.Exists() {
if content := delta.Get("content"); content.Exists() && content.String() != "" {
completionsChoice["text"] = content.String()
} else {
completionsChoice["text"] = ""
}
} else {
completionsChoice["text"] = ""
}
// Copy finish_reason
if finishReason := choice.Get("finish_reason"); finishReason.Exists() && finishReason.String() != "null" {
completionsChoice["finish_reason"] = finishReason.String()
}
// Copy logprobs if present
if logprobs := choice.Get("logprobs"); logprobs.Exists() {
completionsChoice["logprobs"] = logprobs.Value()
}
choices = append(choices, completionsChoice)
return true
})
}
if len(choices) > 0 {
choicesJSON, _ := json.Marshal(choices)
out, _ = sjson.SetRaw(out, "choices", string(choicesJSON))
}
return []byte(out)
}
// handleNonStreamingResponse handles non-streaming chat completion responses
// for Gemini models. It selects a client from the pool, sends the request, and
// aggregates the response before sending it back to the client in OpenAI format.
@@ -108,7 +380,9 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -182,7 +456,9 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
cliClient.GetRequestMutex().Unlock()
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
@@ -251,3 +527,181 @@ outLoop:
}
}
}
// handleCompletionsNonStreamingResponse handles non-streaming completions responses.
// It converts completions request to chat completions format, sends to backend,
// then converts the response back to completions format before sending to client.
//
// Parameters:
// - c: The Gin context containing the HTTP request and response
// - rawJSON: The raw JSON bytes of the OpenAI-compatible completions request
func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context, rawJSON []byte) {
c.Header("Content-Type", "application/json")
// Convert completions request to chat completions format
chatCompletionsJSON := convertCompletionsRequestToChatCompletions(rawJSON)
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
var cliClient interfaces.Client
defer func() {
if cliClient != nil {
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
retryCount := 0
for retryCount <= h.Cfg.RequestRetry {
var errorResponse *interfaces.ErrorMessage
cliClient, errorResponse = h.GetClient(modelName)
if errorResponse != nil {
c.Status(errorResponse.StatusCode)
_, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error())
cliCancel()
return
}
// Send the converted chat completions request
resp, err := cliClient.SendRawMessage(cliCtx, modelName, chatCompletionsJSON, "")
if err != nil {
switch err.StatusCode {
case 429:
if h.Cfg.QuotaExceeded.SwitchProject {
log.Debugf("quota exceeded, switch client")
continue // Restart the client selection process
}
case 403, 408, 500, 502, 503, 504:
log.Debugf("http status code %d, switch client", err.StatusCode)
retryCount++
continue
default:
// Forward other errors directly to the client
c.Status(err.StatusCode)
_, _ = c.Writer.Write([]byte(err.Error.Error()))
cliCancel(err.Error)
}
break
} else {
// Convert chat completions response back to completions format
completionsResp := convertChatCompletionsResponseToCompletions(resp)
_, _ = c.Writer.Write(completionsResp)
cliCancel(completionsResp)
break
}
}
}
// handleCompletionsStreamingResponse handles streaming completions responses.
// It converts completions request to chat completions format, streams from backend,
// then converts each response chunk back to completions format before sending to client.
//
// Parameters:
// - c: The Gin context containing the HTTP request and response
// - rawJSON: The raw JSON bytes of the OpenAI-compatible completions request
func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, rawJSON []byte) {
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
// Get the http.Flusher interface to manually flush the response.
flusher, ok := c.Writer.(http.Flusher)
if !ok {
c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{
Error: handlers.ErrorDetail{
Message: "Streaming not supported",
Type: "server_error",
},
})
return
}
// Convert completions request to chat completions format
chatCompletionsJSON := convertCompletionsRequestToChatCompletions(rawJSON)
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
var cliClient interfaces.Client
defer func() {
// Ensure the client's mutex is unlocked on function exit.
if cliClient != nil {
if mutex := cliClient.GetRequestMutex(); mutex != nil {
mutex.Unlock()
}
}
}()
retryCount := 0
outLoop:
for retryCount <= h.Cfg.RequestRetry {
var errorResponse *interfaces.ErrorMessage
cliClient, errorResponse = h.GetClient(modelName)
if errorResponse != nil {
c.Status(errorResponse.StatusCode)
_, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error())
flusher.Flush()
cliCancel()
return
}
// Send the converted chat completions request and receive response chunks
respChan, errChan := cliClient.SendRawMessageStream(cliCtx, modelName, chatCompletionsJSON, "")
for {
select {
// Handle client disconnection.
case <-c.Request.Context().Done():
if c.Request.Context().Err().Error() == "context canceled" {
log.Debugf("client disconnected: %v", c.Request.Context().Err())
cliCancel() // Cancel the backend request.
return
}
// Process incoming response chunks.
case chunk, okStream := <-respChan:
if !okStream {
// Stream is closed, send the final [DONE] message.
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
flusher.Flush()
cliCancel()
return
}
// Convert chat completions chunk to completions chunk format
completionsChunk := convertChatCompletionsStreamChunkToCompletions(chunk)
// Skip this chunk if it has no meaningful content (empty text)
if completionsChunk != nil {
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(completionsChunk))
flusher.Flush()
}
// Handle errors from the backend.
case err, okError := <-errChan:
if okError {
switch err.StatusCode {
case 429:
if h.Cfg.QuotaExceeded.SwitchProject {
log.Debugf("quota exceeded, switch client")
continue outLoop // Restart the client selection process
}
case 403, 408, 500, 502, 503, 504:
log.Debugf("http status code %d, switch client", err.StatusCode)
retryCount++
continue outLoop
default:
// Forward other errors directly to the client
c.Status(err.StatusCode)
_, _ = fmt.Fprint(c.Writer, err.Error.Error())
flusher.Flush()
cliCancel(err.Error)
}
return
}
// Send a keep-alive signal to the client.
case <-time.After(500 * time.Millisecond):
}
}
}
}

View File

@@ -104,6 +104,7 @@ func (s *Server) setupRoutes() {
{
v1.GET("/models", s.unifiedModelsHandler(openaiHandlers, claudeCodeHandlers))
v1.POST("/chat/completions", openaiHandlers.ChatCompletions)
v1.POST("/completions", openaiHandlers.Completions)
v1.POST("/messages", claudeCodeHandlers.ClaudeMessages)
}
@@ -123,6 +124,7 @@ func (s *Server) setupRoutes() {
"version": "1.0.0",
"endpoints": []string{
"POST /v1/chat/completions",
"POST /v1/completions",
"GET /v1/models",
},
})

View File

@@ -535,7 +535,7 @@ func (c *ClaudeClient) GetEmail() string {
if ts, ok := c.tokenStorage.(*claude.ClaudeTokenStorage); ok {
return ts.Email
} else {
return ""
return c.cfg.ClaudeKey[c.apiKeyIndex].APIKey
}
}
@@ -557,3 +557,12 @@ func (c *ClaudeClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *ClaudeClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -430,3 +430,12 @@ func (c *CodexClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *CodexClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -851,3 +851,12 @@ func (c *GeminiCLIClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiCLIClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -425,3 +425,12 @@ func (c *GeminiClient) GetUserAgent() string {
// return fmt.Sprintf("GeminiCLI/%s (%s; %s)", pluginVersion, runtime.GOOS, runtime.GOARCH)
return "google-api-nodejs-client/9.15.1"
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *GeminiClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -390,3 +390,12 @@ func (c *OpenAICompatibilityClient) RefreshTokens(ctx context.Context) error {
// API keys don't need refreshing
return nil
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *OpenAICompatibilityClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -432,3 +432,12 @@ func (c *QwenClient) IsModelQuotaExceeded(model string) bool {
}
return false
}
// GetRequestMutex returns the mutex used to synchronize requests for this client.
// This ensures that only one request is processed at a time for quota management.
//
// Returns:
// - *sync.Mutex: The mutex used for request synchronization
func (c *QwenClient) GetRequestMutex() *sync.Mutex {
return nil
}

View File

@@ -185,7 +185,7 @@ func (r *ModelRegistry) ClearModelQuotaExceeded(clientID, modelID string) {
if registration, exists := r.models[modelID]; exists {
delete(registration.QuotaExceededClients, clientID)
log.Debugf("Cleared quota exceeded status for model %s and client %s", modelID, clientID)
// log.Debugf("Cleared quota exceeded status for model %s and client %s", modelID, clientID)
}
}