From e73f16507012ef26c2a28a49a1c3c9161ebf0b74 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sat, 5 Jul 2025 04:10:00 +0800 Subject: [PATCH] Refactor API handlers to streamline response handling - Replaced channel-based handling in `SendMessage` flow with direct synchronous execution. - Introduced `hasFirstResponse` flag to manage keep-alive signals in streaming handler. - Simplified error handling and removed redundant code for enhanced readability and maintainability. --- internal/api/handlers.go | 54 +++++++++++++--------------------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/internal/api/handlers.go b/internal/api/handlers.go index cf28f010..5ca23ad9 100644 --- a/internal/api/handlers.go +++ b/internal/api/handlers.go @@ -244,43 +244,19 @@ func (h *APIHandlers) handleNonStreamingResponse(c *gin.Context, rawJson []byte) log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID()) - respChan := make(chan []byte) - errChan := make(chan *client.ErrorMessage) - go func() { - resp, err := cliClient.SendMessage(cliCtx, rawJson, modelName, contents, tools) - if err != nil { - errChan <- err - } else { - respChan <- resp - } - }() - - for { - select { - case <-c.Request.Context().Done(): - if c.Request.Context().Err().Error() == "context canceled" { - log.Debugf("Client disconnected: %v", c.Request.Context().Err()) - cliCancel() - return - } - case respBody := <-respChan: - openAIFormat := translator.ConvertCliToOpenAINonStream(respBody) - if openAIFormat != "" { - _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat) - flusher.Flush() - } - cliCancel() - return - case err := <-errChan: - c.Status(err.StatusCode) - _, _ = fmt.Fprint(c.Writer, err.Error.Error()) - flusher.Flush() - cliCancel() - return - case <-time.After(500 * time.Millisecond): - _, _ = c.Writer.Write([]byte("\n")) + resp, err := cliClient.SendMessage(cliCtx, rawJson, modelName, contents, tools) + if err != nil { + c.Status(err.StatusCode) + _, _ = fmt.Fprint(c.Writer, err.Error.Error()) + flusher.Flush() + cliCancel() + } else { + openAIFormat := translator.ConvertCliToOpenAINonStream(resp) + if openAIFormat != "" { + _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat) flusher.Flush() } + cliCancel() } } @@ -345,6 +321,7 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) { log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID()) // Send the message and receive response chunks and errors via channels. respChan, errChan := cliClient.SendMessageStream(cliCtx, rawJson, modelName, contents, tools) + hasFirstResponse := false for { select { // Handle client disconnection. @@ -364,6 +341,7 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) { return } else { // Convert the chunk to OpenAI format and send it to the client. + hasFirstResponse = true openAIFormat := translator.ConvertCliToOpenAI(chunk) if openAIFormat != "" { _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat) @@ -381,8 +359,10 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) { } // Send a keep-alive signal to the client. case <-time.After(500 * time.Millisecond): - _, _ = c.Writer.Write([]byte(": CLI-PROXY-API PROCESSING\n\n")) - flusher.Flush() + if hasFirstResponse { + _, _ = c.Writer.Write([]byte(": CLI-PROXY-API PROCESSING\n\n")) + flusher.Flush() + } } } }