refactor: standardize dataTag processing across response translators

- Unified `dataTag` initialization by removing spaces after `data:`.
- Replaced manual slicing with `bytes.TrimSpace` for consistent and robust handling of JSON payloads.
This commit is contained in:
Luis Pater
2025-09-21 11:16:03 +08:00
parent 83a1fa618d
commit e5a6fd2d4f
11 changed files with 38 additions and 51 deletions

View File

@@ -554,7 +554,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st
rawJSON, _ = sjson.SetBytes(rawJSON, "project", c.GetProjectID()) rawJSON, _ = sjson.SetBytes(rawJSON, "project", c.GetProjectID())
rawJSON, _ = sjson.SetBytes(rawJSON, "model", modelName) rawJSON, _ = sjson.SetBytes(rawJSON, "model", modelName)
dataTag := []byte("data: ") dataTag := []byte("data:")
errChan := make(chan *interfaces.ErrorMessage) errChan := make(chan *interfaces.ErrorMessage)
dataChan := make(chan []byte) dataChan := make(chan []byte)
// log.Debugf(string(rawJSON)) // log.Debugf(string(rawJSON))
@@ -619,7 +619,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], &param) lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), &param)
for i := 0; i < len(lines); i++ { for i := 0; i < len(lines); i++ {
dataChan <- []byte(lines[i]) dataChan <- []byte(lines[i])
} }
@@ -630,7 +630,7 @@ func (c *GeminiCLIClient) SendRawMessageStream(ctx context.Context, modelName st
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
dataChan <- line[6:] dataChan <- bytes.TrimSpace(line[5:])
} }
c.AddAPIResponseData(ctx, line) c.AddAPIResponseData(ctx, line)
} }

View File

@@ -298,7 +298,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin
handlerType := handler.HandlerType() handlerType := handler.HandlerType()
rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true)
dataTag := []byte("data: ") dataTag := []byte("data:")
errChan := make(chan *interfaces.ErrorMessage) errChan := make(chan *interfaces.ErrorMessage)
dataChan := make(chan []byte) dataChan := make(chan []byte)
// log.Debugf(string(rawJSON)) // log.Debugf(string(rawJSON))
@@ -342,7 +342,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], &param) lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), &param)
for i := 0; i < len(lines); i++ { for i := 0; i < len(lines); i++ {
dataChan <- []byte(lines[i]) dataChan <- []byte(lines[i])
} }
@@ -353,7 +353,7 @@ func (c *GeminiClient) SendRawMessageStream(ctx context.Context, modelName strin
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
dataChan <- line[6:] dataChan <- bytes.TrimSpace(line[5:])
} }
c.AddAPIResponseData(ctx, line) c.AddAPIResponseData(ctx, line)
} }

View File

@@ -291,9 +291,8 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo
handlerType := handler.HandlerType() handlerType := handler.HandlerType()
rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true)
dataTag := []byte("data: ") dataTag := []byte("data:")
dataUglyTag := []byte("data:") // Some APIs providers don't add space after "data:", fuck for them all doneTag := []byte("[DONE]")
doneTag := []byte("data: [DONE]")
errChan := make(chan *interfaces.ErrorMessage) errChan := make(chan *interfaces.ErrorMessage)
dataChan := make(chan []byte) dataChan := make(chan []byte)
// log.Debugf(string(rawJSON)) // log.Debugf(string(rawJSON))
@@ -332,19 +331,10 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
if bytes.Equal(line, doneTag) { if bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) {
break break
} }
lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[6:], &param) lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), &param)
for i := 0; i < len(lines); i++ {
c.AddAPIResponseData(ctx, line)
dataChan <- []byte(lines[i])
}
} else if bytes.HasPrefix(line, dataUglyTag) {
if bytes.Equal(line, doneTag) {
break
}
lines := translator.Response(handlerType, c.Type(), newCtx, modelName, originalRequestRawJSON, rawJSON, line[5:], &param)
for i := 0; i < len(lines); i++ { for i := 0; i < len(lines); i++ {
c.AddAPIResponseData(ctx, line) c.AddAPIResponseData(ctx, line)
dataChan <- []byte(lines[i]) dataChan <- []byte(lines[i])
@@ -356,13 +346,10 @@ func (c *OpenAICompatibilityClient) SendRawMessageStream(ctx context.Context, mo
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
if bytes.Equal(line, doneTag) { if bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) {
break break
} }
c.AddAPIResponseData(newCtx, line[6:]) c.AddAPIResponseData(newCtx, bytes.TrimSpace(line[5:]))
dataChan <- line[6:]
} else if bytes.HasPrefix(line, dataUglyTag) {
c.AddAPIResponseData(newCtx, line[5:])
dataChan <- line[5:] dataChan <- line[5:]
} }
} }

View File

@@ -215,8 +215,8 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string,
handlerType := handler.HandlerType() handlerType := handler.HandlerType()
rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true) rawJSON = translator.Request(handlerType, c.Type(), modelName, rawJSON, true)
dataTag := []byte("data: ") dataTag := []byte("data:")
doneTag := []byte("data: [DONE]") doneTag := []byte("[DONE]")
errChan := make(chan *interfaces.ErrorMessage) errChan := make(chan *interfaces.ErrorMessage)
dataChan := make(chan []byte) dataChan := make(chan []byte)
@@ -264,7 +264,7 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string,
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
lines := translator.Response(handlerType, c.Type(), ctx, modelName, originalRequestRawJSON, rawJSON, line[6:], &param) lines := translator.Response(handlerType, c.Type(), ctx, modelName, originalRequestRawJSON, rawJSON, bytes.TrimSpace(line[5:]), &param)
for i := 0; i < len(lines); i++ { for i := 0; i < len(lines); i++ {
dataChan <- []byte(lines[i]) dataChan <- []byte(lines[i])
} }
@@ -274,9 +274,9 @@ func (c *QwenClient) SendRawMessageStream(ctx context.Context, modelName string,
} else { } else {
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if !bytes.HasPrefix(line, doneTag) { if bytes.HasPrefix(line, dataTag) {
if bytes.HasPrefix(line, dataTag) { if !bytes.Equal(bytes.TrimSpace(line[5:]), doneTag) {
dataChan <- line[6:] dataChan <- bytes.TrimSpace(line[5:])
} }
} }
c.AddAPIResponseData(ctx, line) c.AddAPIResponseData(ctx, line)

View File

@@ -17,7 +17,7 @@ import (
) )
var ( var (
dataTag = []byte("data: ") dataTag = []byte("data:")
) )
// ConvertAnthropicResponseToGeminiParams holds parameters for response conversion // ConvertAnthropicResponseToGeminiParams holds parameters for response conversion
@@ -64,7 +64,7 @@ func ConvertClaudeResponseToGemini(_ context.Context, modelName string, original
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
root := gjson.ParseBytes(rawJSON) root := gjson.ParseBytes(rawJSON)
eventType := root.Get("type").String() eventType := root.Get("type").String()
@@ -336,7 +336,7 @@ func ConvertClaudeResponseToGeminiNonStream(_ context.Context, modelName string,
line := scanner.Bytes() line := scanner.Bytes()
// log.Debug(string(line)) // log.Debug(string(line))
if bytes.HasPrefix(line, dataTag) { if bytes.HasPrefix(line, dataTag) {
jsonData := line[6:] jsonData := bytes.TrimSpace(line[5:])
streamingEvents = append(streamingEvents, jsonData) streamingEvents = append(streamingEvents, jsonData)
} }
} }

View File

@@ -18,7 +18,7 @@ import (
) )
var ( var (
dataTag = []byte("data: ") dataTag = []byte("data:")
) )
// ConvertAnthropicResponseToOpenAIParams holds parameters for response conversion // ConvertAnthropicResponseToOpenAIParams holds parameters for response conversion
@@ -62,7 +62,7 @@ func ConvertClaudeResponseToOpenAI(_ context.Context, modelName string, original
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
root := gjson.ParseBytes(rawJSON) root := gjson.ParseBytes(rawJSON)
eventType := root.Get("type").String() eventType := root.Get("type").String()
@@ -289,7 +289,7 @@ func ConvertClaudeResponseToOpenAINonStream(_ context.Context, _ string, origina
if !bytes.HasPrefix(line, dataTag) { if !bytes.HasPrefix(line, dataTag) {
continue continue
} }
chunks = append(chunks, line[6:]) chunks = append(chunks, bytes.TrimSpace(rawJSON[5:]))
} }
// Base OpenAI non-streaming response template // Base OpenAI non-streaming response template

View File

@@ -34,7 +34,7 @@ type claudeToResponsesState struct {
ReasoningIndex int ReasoningIndex int
} }
var dataTag = []byte("data: ") var dataTag = []byte("data:")
func emitEvent(event string, payload string) string { func emitEvent(event string, payload string) string {
return fmt.Sprintf("event: %s\ndata: %s\n\n", event, payload) return fmt.Sprintf("event: %s\ndata: %s\n\n", event, payload)
@@ -51,7 +51,7 @@ func ConvertClaudeResponseToOpenAIResponses(ctx context.Context, modelName strin
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
root := gjson.ParseBytes(rawJSON) root := gjson.ParseBytes(rawJSON)
ev := root.Get("type").String() ev := root.Get("type").String()
var out []string var out []string

View File

@@ -16,7 +16,7 @@ import (
) )
var ( var (
dataTag = []byte("data: ") dataTag = []byte("data:")
) )
// ConvertCodexResponseToClaude performs sophisticated streaming response format conversion. // ConvertCodexResponseToClaude performs sophisticated streaming response format conversion.
@@ -45,7 +45,7 @@ func ConvertCodexResponseToClaude(_ context.Context, _ string, originalRequestRa
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
output := "" output := ""
rootResult := gjson.ParseBytes(rawJSON) rootResult := gjson.ParseBytes(rawJSON)

View File

@@ -16,7 +16,7 @@ import (
) )
var ( var (
dataTag = []byte("data: ") dataTag = []byte("data:")
) )
// ConvertCodexResponseToGeminiParams holds parameters for response conversion. // ConvertCodexResponseToGeminiParams holds parameters for response conversion.
@@ -53,7 +53,7 @@ func ConvertCodexResponseToGemini(_ context.Context, modelName string, originalR
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
rootResult := gjson.ParseBytes(rawJSON) rootResult := gjson.ParseBytes(rawJSON)
typeResult := rootResult.Get("type") typeResult := rootResult.Get("type")
@@ -161,7 +161,7 @@ func ConvertCodexResponseToGeminiNonStream(_ context.Context, modelName string,
if !bytes.HasPrefix(line, dataTag) { if !bytes.HasPrefix(line, dataTag) {
continue continue
} }
rawJSON = line[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
rootResult := gjson.ParseBytes(rawJSON) rootResult := gjson.ParseBytes(rawJSON)

View File

@@ -16,7 +16,7 @@ import (
) )
var ( var (
dataTag = []byte("data: ") dataTag = []byte("data:")
) )
// ConvertCliToOpenAIParams holds parameters for response conversion. // ConvertCliToOpenAIParams holds parameters for response conversion.
@@ -54,7 +54,7 @@ func ConvertCodexResponseToOpenAI(_ context.Context, modelName string, originalR
if !bytes.HasPrefix(rawJSON, dataTag) { if !bytes.HasPrefix(rawJSON, dataTag) {
return []string{} return []string{}
} }
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
// Initialize the OpenAI SSE template. // Initialize the OpenAI SSE template.
template := `{"id":"","object":"chat.completion.chunk","created":12345,"model":"model","choices":[{"index":0,"delta":{"role":null,"content":null,"reasoning_content":null,"tool_calls":null},"finish_reason":null,"native_finish_reason":null}]}` template := `{"id":"","object":"chat.completion.chunk","created":12345,"model":"model","choices":[{"index":0,"delta":{"role":null,"content":null,"reasoning_content":null,"tool_calls":null},"finish_reason":null,"native_finish_reason":null}]}`
@@ -175,7 +175,7 @@ func ConvertCodexResponseToOpenAINonStream(_ context.Context, _ string, original
if !bytes.HasPrefix(line, dataTag) { if !bytes.HasPrefix(line, dataTag) {
continue continue
} }
rawJSON = line[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
rootResult := gjson.ParseBytes(rawJSON) rootResult := gjson.ParseBytes(rawJSON)
// Verify this is a response.completed event // Verify this is a response.completed event

View File

@@ -13,8 +13,8 @@ import (
// ConvertCodexResponseToOpenAIResponses converts OpenAI Chat Completions streaming chunks // ConvertCodexResponseToOpenAIResponses converts OpenAI Chat Completions streaming chunks
// to OpenAI Responses SSE events (response.*). // to OpenAI Responses SSE events (response.*).
func ConvertCodexResponseToOpenAIResponses(ctx context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, param *any) []string { func ConvertCodexResponseToOpenAIResponses(ctx context.Context, modelName string, originalRequestRawJSON, requestRawJSON, rawJSON []byte, param *any) []string {
if bytes.HasPrefix(rawJSON, []byte("data: ")) { if bytes.HasPrefix(rawJSON, []byte("data:")) {
rawJSON = rawJSON[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
if typeResult := gjson.GetBytes(rawJSON, "type"); typeResult.Exists() { if typeResult := gjson.GetBytes(rawJSON, "type"); typeResult.Exists() {
typeStr := typeResult.String() typeStr := typeResult.String()
if typeStr == "response.created" || typeStr == "response.in_progress" || typeStr == "response.completed" { if typeStr == "response.created" || typeStr == "response.in_progress" || typeStr == "response.completed" {
@@ -32,14 +32,14 @@ func ConvertCodexResponseToOpenAIResponsesNonStream(_ context.Context, modelName
scanner := bufio.NewScanner(bytes.NewReader(rawJSON)) scanner := bufio.NewScanner(bytes.NewReader(rawJSON))
buffer := make([]byte, 10240*1024) buffer := make([]byte, 10240*1024)
scanner.Buffer(buffer, 10240*1024) scanner.Buffer(buffer, 10240*1024)
dataTag := []byte("data: ") dataTag := []byte("data:")
for scanner.Scan() { for scanner.Scan() {
line := scanner.Bytes() line := scanner.Bytes()
if !bytes.HasPrefix(line, dataTag) { if !bytes.HasPrefix(line, dataTag) {
continue continue
} }
rawJSON = line[6:] rawJSON = bytes.TrimSpace(rawJSON[5:])
rootResult := gjson.ParseBytes(rawJSON) rootResult := gjson.ParseBytes(rawJSON)
// Verify this is a response.completed event // Verify this is a response.completed event