Merge PR #1626 into codex/pr-1626

This commit is contained in:
Luis Pater
2026-02-19 04:49:14 +08:00
24 changed files with 192 additions and 107 deletions

View File

@@ -461,10 +461,10 @@ func appendAPIResponse(c *gin.Context, data []byte) {
// ExecuteWithAuthManager executes a non-streaming request via the core auth manager.
// This path is the only supported execution route.
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, http.Header, *interfaces.ErrorMessage) {
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
if errMsg != nil {
return nil, errMsg
return nil, nil, errMsg
}
reqMeta := requestExecutionMetadata(ctx)
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
@@ -497,17 +497,17 @@ func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType
addon = hdr.Clone()
}
}
return nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
return nil, nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
}
return resp.Payload, nil
return resp.Payload, FilterUpstreamHeaders(resp.Headers), nil
}
// ExecuteCountWithAuthManager executes a non-streaming request via the core auth manager.
// This path is the only supported execution route.
func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, http.Header, *interfaces.ErrorMessage) {
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
if errMsg != nil {
return nil, errMsg
return nil, nil, errMsg
}
reqMeta := requestExecutionMetadata(ctx)
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
@@ -540,20 +540,21 @@ func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handle
addon = hdr.Clone()
}
}
return nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
return nil, nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
}
return resp.Payload, nil
return resp.Payload, FilterUpstreamHeaders(resp.Headers), nil
}
// ExecuteStreamWithAuthManager executes a streaming request via the core auth manager.
// This path is the only supported execution route.
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, <-chan *interfaces.ErrorMessage) {
// The returned http.Header carries upstream response headers captured before streaming begins.
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, http.Header, <-chan *interfaces.ErrorMessage) {
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
if errMsg != nil {
errChan := make(chan *interfaces.ErrorMessage, 1)
errChan <- errMsg
close(errChan)
return nil, errChan
return nil, nil, errChan
}
reqMeta := requestExecutionMetadata(ctx)
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
@@ -572,7 +573,7 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
SourceFormat: sdktranslator.FromString(handlerType),
}
opts.Metadata = reqMeta
chunks, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
streamResult, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
if err != nil {
errChan := make(chan *interfaces.ErrorMessage, 1)
status := http.StatusInternalServerError
@@ -589,8 +590,11 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
}
errChan <- &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
close(errChan)
return nil, errChan
return nil, nil, errChan
}
// Capture upstream headers from the initial connection synchronously before the goroutine starts.
upstreamHeaders := FilterUpstreamHeaders(streamResult.Headers)
chunks := streamResult.Chunks
dataChan := make(chan []byte)
errChan := make(chan *interfaces.ErrorMessage, 1)
go func() {
@@ -664,9 +668,9 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
if !sentPayload {
if bootstrapRetries < maxBootstrapRetries && bootstrapEligible(streamErr) {
bootstrapRetries++
retryChunks, retryErr := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
retryResult, retryErr := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
if retryErr == nil {
chunks = retryChunks
chunks = retryResult.Chunks
continue outer
}
streamErr = retryErr
@@ -697,7 +701,7 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
}
}
}()
return dataChan, errChan
return dataChan, upstreamHeaders, errChan
}
func statusFromError(err error) int {