Refactor API handlers to streamline response handling

- Replaced channel-based handling in `SendMessage` flow with direct synchronous execution.
- Introduced `hasFirstResponse` flag to manage keep-alive signals in streaming handler.
- Simplified error handling and removed redundant code for enhanced readability and maintainability.
This commit is contained in:
Luis Pater
2025-07-05 04:10:00 +08:00
parent 512f2d5247
commit e73f165070

View File

@@ -244,43 +244,19 @@ func (h *APIHandlers) handleNonStreamingResponse(c *gin.Context, rawJson []byte)
log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID()) log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID())
respChan := make(chan []byte)
errChan := make(chan *client.ErrorMessage)
go func() {
resp, err := cliClient.SendMessage(cliCtx, rawJson, modelName, contents, tools) resp, err := cliClient.SendMessage(cliCtx, rawJson, modelName, contents, tools)
if err != nil { if err != nil {
errChan <- err c.Status(err.StatusCode)
} else { _, _ = fmt.Fprint(c.Writer, err.Error.Error())
respChan <- resp flusher.Flush()
}
}()
for {
select {
case <-c.Request.Context().Done():
if c.Request.Context().Err().Error() == "context canceled" {
log.Debugf("Client disconnected: %v", c.Request.Context().Err())
cliCancel() cliCancel()
return } else {
} openAIFormat := translator.ConvertCliToOpenAINonStream(resp)
case respBody := <-respChan:
openAIFormat := translator.ConvertCliToOpenAINonStream(respBody)
if openAIFormat != "" { if openAIFormat != "" {
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat) _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat)
flusher.Flush() flusher.Flush()
} }
cliCancel() cliCancel()
return
case err := <-errChan:
c.Status(err.StatusCode)
_, _ = fmt.Fprint(c.Writer, err.Error.Error())
flusher.Flush()
cliCancel()
return
case <-time.After(500 * time.Millisecond):
_, _ = c.Writer.Write([]byte("\n"))
flusher.Flush()
}
} }
} }
@@ -345,6 +321,7 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) {
log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID()) log.Debugf("Request use account: %s, project id: %s", cliClient.GetEmail(), cliClient.GetProjectID())
// Send the message and receive response chunks and errors via channels. // Send the message and receive response chunks and errors via channels.
respChan, errChan := cliClient.SendMessageStream(cliCtx, rawJson, modelName, contents, tools) respChan, errChan := cliClient.SendMessageStream(cliCtx, rawJson, modelName, contents, tools)
hasFirstResponse := false
for { for {
select { select {
// Handle client disconnection. // Handle client disconnection.
@@ -364,6 +341,7 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) {
return return
} else { } else {
// Convert the chunk to OpenAI format and send it to the client. // Convert the chunk to OpenAI format and send it to the client.
hasFirstResponse = true
openAIFormat := translator.ConvertCliToOpenAI(chunk) openAIFormat := translator.ConvertCliToOpenAI(chunk)
if openAIFormat != "" { if openAIFormat != "" {
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat) _, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", openAIFormat)
@@ -381,8 +359,10 @@ func (h *APIHandlers) handleStreamingResponse(c *gin.Context, rawJson []byte) {
} }
// Send a keep-alive signal to the client. // Send a keep-alive signal to the client.
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
if hasFirstResponse {
_, _ = c.Writer.Write([]byte(": CLI-PROXY-API PROCESSING\n\n")) _, _ = c.Writer.Write([]byte(": CLI-PROXY-API PROCESSING\n\n"))
flusher.Flush() flusher.Flush()
} }
} }
}
} }