**feat(executor): enhance WebSocket error handling and metadata logging**

- Added handling for stream closure before start with appropriate error recording.
- Improved metadata logging for non-OK HTTP status codes in WebSocket responses.
- Consolidated event processing logic with `processEvent` for better error handling and payload management.
- Refactored stream initialization to include the first event handling for smoother execution flow.
This commit is contained in:
Luis Pater
2025-11-22 11:18:13 +08:00
parent 7757210af6
commit d291eb9489

View File

@@ -129,18 +129,60 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
recordAPIResponseError(ctx, e.cfg, err) recordAPIResponseError(ctx, e.cfg, err)
return nil, err return nil, err
} }
firstEvent, ok := <-wsStream
if !ok {
err = fmt.Errorf("wsrelay: stream closed before start")
recordAPIResponseError(ctx, e.cfg, err)
return nil, err
}
if firstEvent.Status > 0 && firstEvent.Status != http.StatusOK {
metadataLogged := false
if firstEvent.Status > 0 {
recordAPIResponseMetadata(ctx, e.cfg, firstEvent.Status, firstEvent.Headers.Clone())
metadataLogged = true
}
var body bytes.Buffer
if len(firstEvent.Payload) > 0 {
appendAPIResponseChunk(ctx, e.cfg, bytes.Clone(firstEvent.Payload))
body.Write(firstEvent.Payload)
}
if firstEvent.Type == wsrelay.MessageTypeStreamEnd {
return nil, statusErr{code: firstEvent.Status, msg: body.String()}
}
for event := range wsStream {
if event.Err != nil {
recordAPIResponseError(ctx, e.cfg, event.Err)
if body.Len() == 0 {
body.WriteString(event.Err.Error())
}
break
}
if !metadataLogged && event.Status > 0 {
recordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone())
metadataLogged = true
}
if len(event.Payload) > 0 {
appendAPIResponseChunk(ctx, e.cfg, bytes.Clone(event.Payload))
body.Write(event.Payload)
}
if event.Type == wsrelay.MessageTypeStreamEnd {
break
}
}
return nil, statusErr{code: firstEvent.Status, msg: body.String()}
}
out := make(chan cliproxyexecutor.StreamChunk) out := make(chan cliproxyexecutor.StreamChunk)
stream = out stream = out
go func() { go func(first wsrelay.StreamEvent) {
defer close(out) defer close(out)
var param any var param any
metadataLogged := false metadataLogged := false
for event := range wsStream { processEvent := func(event wsrelay.StreamEvent) bool {
if event.Err != nil { if event.Err != nil {
recordAPIResponseError(ctx, e.cfg, event.Err) recordAPIResponseError(ctx, e.cfg, event.Err)
reporter.publishFailure(ctx) reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}
return return false
} }
switch event.Type { switch event.Type {
case wsrelay.MessageTypeStreamStart: case wsrelay.MessageTypeStreamStart:
@@ -162,7 +204,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
break break
} }
case wsrelay.MessageTypeStreamEnd: case wsrelay.MessageTypeStreamEnd:
return return false
case wsrelay.MessageTypeHTTPResp: case wsrelay.MessageTypeHTTPResp:
if !metadataLogged && event.Status > 0 { if !metadataLogged && event.Status > 0 {
recordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone()) recordAPIResponseMetadata(ctx, e.cfg, event.Status, event.Headers.Clone())
@@ -176,15 +218,24 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON([]byte(lines[i]))} out <- cliproxyexecutor.StreamChunk{Payload: ensureColonSpacedJSON([]byte(lines[i]))}
} }
reporter.publish(ctx, parseGeminiUsage(event.Payload)) reporter.publish(ctx, parseGeminiUsage(event.Payload))
return return false
case wsrelay.MessageTypeError: case wsrelay.MessageTypeError:
recordAPIResponseError(ctx, e.cfg, event.Err) recordAPIResponseError(ctx, e.cfg, event.Err)
reporter.publishFailure(ctx) reporter.publishFailure(ctx)
out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)} out <- cliproxyexecutor.StreamChunk{Err: fmt.Errorf("wsrelay: %v", event.Err)}
return false
}
return true
}
if !processEvent(first) {
return
}
for event := range wsStream {
if !processEvent(event) {
return return
} }
} }
}() }(firstEvent)
return stream, nil return stream, nil
} }