// Package openai provides HTTP handlers for OpenAI API endpoints. // This package implements the OpenAI-compatible API interface, including model listing // and chat completion functionality. It supports both streaming and non-streaming responses, // and manages a pool of clients to interact with backend services. // The handlers translate OpenAI API requests to the appropriate backend format and // convert responses back to OpenAI-compatible format. package openai import ( "context" "fmt" "net/http" "time" "github.com/gin-gonic/gin" "github.com/luispater/CLIProxyAPI/internal/api/handlers" . "github.com/luispater/CLIProxyAPI/internal/constant" "github.com/luispater/CLIProxyAPI/internal/interfaces" log "github.com/sirupsen/logrus" "github.com/tidwall/gjson" ) // OpenAIAPIHandler contains the handlers for OpenAI API endpoints. // It holds a pool of clients to interact with the backend service. type OpenAIAPIHandler struct { *handlers.BaseAPIHandler } // NewOpenAIAPIHandler creates a new OpenAI API handlers instance. // It takes an BaseAPIHandler instance as input and returns an OpenAIAPIHandler. // // Parameters: // - apiHandlers: The base API handlers instance // // Returns: // - *OpenAIAPIHandler: A new OpenAI API handlers instance func NewOpenAIAPIHandler(apiHandlers *handlers.BaseAPIHandler) *OpenAIAPIHandler { return &OpenAIAPIHandler{ BaseAPIHandler: apiHandlers, } } // HandlerType returns the identifier for this handler implementation. func (h *OpenAIAPIHandler) HandlerType() string { return OPENAI } // Models returns the OpenAI-compatible model metadata supported by this handler. func (h *OpenAIAPIHandler) Models() []map[string]any { return []map[string]any{ { "id": "gemini-2.5-pro", "object": "model", "version": "2.5", "name": "Gemini 2.5 Pro", "description": "Stable release (June 17th, 2025) of Gemini 2.5 Pro", "context_length": 1_048_576, "max_completion_tokens": 65_536, "supported_parameters": []string{ "tools", "temperature", "top_p", "top_k", }, "temperature": 1, "topP": 0.95, "topK": 64, "maxTemperature": 2, "thinking": true, }, { "id": "gemini-2.5-flash", "object": "model", "version": "001", "name": "Gemini 2.5 Flash", "description": "Stable version of Gemini 2.5 Flash, our mid-size multimodal model that supports up to 1 million tokens, released in June of 2025.", "context_length": 1_048_576, "max_completion_tokens": 65_536, "supported_parameters": []string{ "tools", "temperature", "top_p", "top_k", }, "temperature": 1, "topP": 0.95, "topK": 64, "maxTemperature": 2, "thinking": true, }, { "id": "gpt-5", "object": "model", "version": "gpt-5-2025-08-07", "name": "GPT 5", "description": "Stable version of GPT 5, The best model for coding and agentic tasks across domains.", "context_length": 400_000, "max_completion_tokens": 128_000, "supported_parameters": []string{ "tools", }, "temperature": 1, "topP": 0.95, "topK": 64, "maxTemperature": 2, "thinking": true, }, { "id": "claude-opus-4-1-20250805", "object": "model", "version": "claude-opus-4-1-20250805", "name": "Claude Opus 4.1", "description": "Anthropic's most capable model.", "context_length": 200_000, "max_completion_tokens": 32_000, "supported_parameters": []string{ "tools", }, "temperature": 1, "topP": 0.95, "topK": 64, "maxTemperature": 2, "thinking": true, }, } } // OpenAIModels handles the /v1/models endpoint. // It returns a hardcoded list of available AI models with their capabilities // and specifications in OpenAI-compatible format. func (h *OpenAIAPIHandler) OpenAIModels(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "data": h.Models(), }) } // ChatCompletions handles the /v1/chat/completions endpoint. // It determines whether the request is for a streaming or non-streaming response // and calls the appropriate handler based on the model provider. // // Parameters: // - c: The Gin context containing the HTTP request and response func (h *OpenAIAPIHandler) ChatCompletions(c *gin.Context) { rawJSON, err := c.GetRawData() // If data retrieval fails, return a 400 Bad Request error. if err != nil { c.JSON(http.StatusBadRequest, handlers.ErrorResponse{ Error: handlers.ErrorDetail{ Message: fmt.Sprintf("Invalid request: %v", err), Type: "invalid_request_error", }, }) return } // Check if the client requested a streaming response. streamResult := gjson.GetBytes(rawJSON, "stream") if streamResult.Type == gjson.True { h.handleStreamingResponse(c, rawJSON) } else { h.handleNonStreamingResponse(c, rawJSON) } } // handleNonStreamingResponse handles non-streaming chat completion responses // for Gemini models. It selects a client from the pool, sends the request, and // aggregates the response before sending it back to the client in OpenAI format. // // Parameters: // - c: The Gin context containing the HTTP request and response // - rawJSON: The raw JSON bytes of the OpenAI-compatible request func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []byte) { c.Header("Content-Type", "application/json") modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) var cliClient interfaces.Client defer func() { if cliClient != nil { cliClient.GetRequestMutex().Unlock() } }() retryCount := 0 for retryCount <= h.Cfg.RequestRetry { var errorResponse *interfaces.ErrorMessage cliClient, errorResponse = h.GetClient(modelName) if errorResponse != nil { c.Status(errorResponse.StatusCode) _, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error()) cliCancel() return } resp, err := cliClient.SendRawMessage(cliCtx, modelName, rawJSON, "") if err != nil { switch err.StatusCode { case 429: if h.Cfg.QuotaExceeded.SwitchProject { log.Debugf("quota exceeded, switch client") continue // Restart the client selection process } case 403, 408, 500, 502, 503, 504: log.Debugf("http status code %d, switch client", err.StatusCode) retryCount++ continue default: // Forward other errors directly to the client c.Status(err.StatusCode) _, _ = c.Writer.Write([]byte(err.Error.Error())) cliCancel(err.Error) } break } else { _, _ = c.Writer.Write(resp) cliCancel(resp) break } } } // handleStreamingResponse handles streaming responses for Gemini models. // It establishes a streaming connection with the backend service and forwards // the response chunks to the client in real-time using Server-Sent Events. // // Parameters: // - c: The Gin context containing the HTTP request and response // - rawJSON: The raw JSON bytes of the OpenAI-compatible request func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byte) { c.Header("Content-Type", "text/event-stream") c.Header("Cache-Control", "no-cache") c.Header("Connection", "keep-alive") c.Header("Access-Control-Allow-Origin", "*") // Get the http.Flusher interface to manually flush the response. flusher, ok := c.Writer.(http.Flusher) if !ok { c.JSON(http.StatusInternalServerError, handlers.ErrorResponse{ Error: handlers.ErrorDetail{ Message: "Streaming not supported", Type: "server_error", }, }) return } modelName := gjson.GetBytes(rawJSON, "model").String() cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background()) var cliClient interfaces.Client defer func() { // Ensure the client's mutex is unlocked on function exit. if cliClient != nil { cliClient.GetRequestMutex().Unlock() } }() retryCount := 0 outLoop: for retryCount <= h.Cfg.RequestRetry { var errorResponse *interfaces.ErrorMessage cliClient, errorResponse = h.GetClient(modelName) if errorResponse != nil { c.Status(errorResponse.StatusCode) _, _ = fmt.Fprint(c.Writer, errorResponse.Error.Error()) flusher.Flush() cliCancel() return } // Send the message and receive response chunks and errors via channels. respChan, errChan := cliClient.SendRawMessageStream(cliCtx, modelName, rawJSON, "") for { select { // Handle client disconnection. case <-c.Request.Context().Done(): if c.Request.Context().Err().Error() == "context canceled" { log.Debugf("Client disconnected: %v", c.Request.Context().Err()) cliCancel() // Cancel the backend request. return } // Process incoming response chunks. case chunk, okStream := <-respChan: if !okStream { // Stream is closed, send the final [DONE] message. _, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n") flusher.Flush() cliCancel() return } _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk)) flusher.Flush() // Handle errors from the backend. case err, okError := <-errChan: if okError { switch err.StatusCode { case 429: if h.Cfg.QuotaExceeded.SwitchProject { log.Debugf("quota exceeded, switch client") continue outLoop // Restart the client selection process } case 403, 408, 500, 502, 503, 504: log.Debugf("http status code %d, switch client", err.StatusCode) retryCount++ continue outLoop default: // Forward other errors directly to the client c.Status(err.StatusCode) _, _ = fmt.Fprint(c.Writer, err.Error.Error()) flusher.Flush() cliCancel(err.Error) } return } // Send a keep-alive signal to the client. case <-time.After(500 * time.Millisecond): } } } }