fix(claude): track message_start event in streaming response

Add a `MessageStarted` flag to `ConvertOpenAIResponseToAnthropicParams` to ensure the `message_start` event is emitted only once during streaming.
Refactor response handling to detect streaming mode via the `stream` field instead of the `object` type, simplifying the branching logic.
Update the streaming conversion to set `MessageStarted` after sending the `message_start` event, preventing duplicate starts.
These changes improve correctness of streaming response handling for Claude integration.
This commit is contained in:
Luis Pater
2025-10-16 03:54:48 +08:00
parent 15981aa412
commit 5ab0854b5b

View File

@@ -37,6 +37,8 @@ type ConvertOpenAIResponseToAnthropicParams struct {
ContentBlocksStopped bool ContentBlocksStopped bool
// Track if message_delta has been sent // Track if message_delta has been sent
MessageDeltaSent bool MessageDeltaSent bool
// Track if message_start has been sent
MessageStarted bool
} }
// ToolCallAccumulator holds the state for accumulating tool call data // ToolCallAccumulator holds the state for accumulating tool call data
@@ -84,20 +86,12 @@ func ConvertOpenAIResponseToClaude(_ context.Context, _ string, originalRequestR
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams)) return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
} }
root := gjson.ParseBytes(rawJSON) streamResult := gjson.GetBytes(originalRequestRawJSON, "stream")
if !streamResult.Exists() || (streamResult.Exists() && streamResult.Type == gjson.False) {
// Check if this is a streaming chunk or non-streaming response
objectType := root.Get("object").String()
if objectType == "chat.completion.chunk" {
// Handle streaming response
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
} else if objectType == "chat.completion" {
// Handle non-streaming response
return convertOpenAINonStreamingToAnthropic(rawJSON) return convertOpenAINonStreamingToAnthropic(rawJSON)
} else {
return convertOpenAIStreamingChunkToAnthropic(rawJSON, (*param).(*ConvertOpenAIResponseToAnthropicParams))
} }
return []string{}
} }
// convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events // convertOpenAIStreamingChunkToAnthropic converts OpenAI streaming chunk to Anthropic streaming events
@@ -118,7 +112,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
// Check if this is the first chunk (has role) // Check if this is the first chunk (has role)
if delta := root.Get("choices.0.delta"); delta.Exists() { if delta := root.Get("choices.0.delta"); delta.Exists() {
if role := delta.Get("role"); role.Exists() && role.String() == "assistant" { if role := delta.Get("role"); role.Exists() && role.String() == "assistant" && !param.MessageStarted {
// Send message_start event // Send message_start event
messageStart := map[string]interface{}{ messageStart := map[string]interface{}{
"type": "message_start", "type": "message_start",
@@ -138,6 +132,7 @@ func convertOpenAIStreamingChunkToAnthropic(rawJSON []byte, param *ConvertOpenAI
} }
messageStartJSON, _ := json.Marshal(messageStart) messageStartJSON, _ := json.Marshal(messageStart)
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n") results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
param.MessageStarted = true
// Don't send content_block_start for text here - wait for actual content // Don't send content_block_start for text here - wait for actual content
} }