mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-28 10:16:07 +08:00
feat: passthrough upstream response headers to clients
CPA previously stripped ALL response headers from upstream AI provider APIs, preventing clients from seeing rate-limit info, request IDs, server-timing and other useful headers. Changes: - Add Headers field to Response and StreamResult structs - Add FilterUpstreamHeaders helper (hop-by-hop + security denylist) - Add WriteUpstreamHeaders helper (respects CPA-set headers) - ExecuteWithAuthManager/ExecuteCountWithAuthManager now return headers - ExecuteStreamWithAuthManager returns headers from initial connection - All 11 provider executors populate Response.Headers - All handler call sites write filtered upstream headers before response Filtered headers (not forwarded): - RFC 7230 hop-by-hop: Connection, Transfer-Encoding, Keep-Alive, etc. - Security: Set-Cookie - CPA-managed: Content-Length, Content-Encoding
This commit is contained in:
@@ -159,13 +159,13 @@ func (MyExecutor) CountTokens(context.Context, *coreauth.Auth, clipexec.Request,
|
|||||||
return clipexec.Response{}, errors.New("count tokens not implemented")
|
return clipexec.Response{}, errors.New("count tokens not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (MyExecutor) ExecuteStream(ctx context.Context, a *coreauth.Auth, req clipexec.Request, opts clipexec.Options) (<-chan clipexec.StreamChunk, error) {
|
func (MyExecutor) ExecuteStream(ctx context.Context, a *coreauth.Auth, req clipexec.Request, opts clipexec.Options) (*clipexec.StreamResult, error) {
|
||||||
ch := make(chan clipexec.StreamChunk, 1)
|
ch := make(chan clipexec.StreamChunk, 1)
|
||||||
go func() {
|
go func() {
|
||||||
defer close(ch)
|
defer close(ch)
|
||||||
ch <- clipexec.StreamChunk{Payload: []byte("data: {\"ok\":true}\n\n")}
|
ch <- clipexec.StreamChunk{Payload: []byte("data: {\"ok\":true}\n\n")}
|
||||||
}()
|
}()
|
||||||
return ch, nil
|
return &clipexec.StreamResult{Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (MyExecutor) Refresh(ctx context.Context, a *coreauth.Auth) (*coreauth.Auth, error) {
|
func (MyExecutor) Refresh(ctx context.Context, a *coreauth.Auth) (*coreauth.Auth, error) {
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ func (EchoExecutor) Execute(context.Context, *coreauth.Auth, clipexec.Request, c
|
|||||||
return clipexec.Response{}, errors.New("echo executor: Execute not implemented")
|
return clipexec.Response{}, errors.New("echo executor: Execute not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (EchoExecutor) ExecuteStream(context.Context, *coreauth.Auth, clipexec.Request, clipexec.Options) (<-chan clipexec.StreamChunk, error) {
|
func (EchoExecutor) ExecuteStream(context.Context, *coreauth.Auth, clipexec.Request, clipexec.Options) (*clipexec.StreamResult, error) {
|
||||||
return nil, errors.New("echo executor: ExecuteStream not implemented")
|
return nil, errors.New("echo executor: ExecuteStream not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -164,12 +164,12 @@ func (e *AIStudioExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth,
|
|||||||
reporter.publish(ctx, parseGeminiUsage(wsResp.Body))
|
reporter.publish(ctx, parseGeminiUsage(wsResp.Body))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, wsResp.Body, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, body.toFormat, opts.SourceFormat, req.Model, opts.OriginalRequest, translatedReq, wsResp.Body, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: ensureColonSpacedJSON([]byte(out))}
|
resp = cliproxyexecutor.Response{Payload: ensureColonSpacedJSON([]byte(out)), Headers: wsResp.Headers.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming request to the AI Studio API.
|
// ExecuteStream performs a streaming request to the AI Studio API.
|
||||||
func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -254,7 +254,6 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
|||||||
return nil, statusErr{code: firstEvent.Status, msg: body.String()}
|
return nil, statusErr{code: firstEvent.Status, msg: body.String()}
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func(first wsrelay.StreamEvent) {
|
go func(first wsrelay.StreamEvent) {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
var param any
|
var param any
|
||||||
@@ -318,7 +317,7 @@ func (e *AIStudioExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}(firstEvent)
|
}(firstEvent)
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: firstEvent.Headers.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountTokens counts tokens for the given request using the AI Studio API.
|
// CountTokens counts tokens for the given request using the AI Studio API.
|
||||||
|
|||||||
@@ -232,7 +232,7 @@ attemptLoop:
|
|||||||
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
reporter.publish(ctx, parseAntigravityUsage(bodyBytes))
|
||||||
var param any
|
var param any
|
||||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bodyBytes, ¶m)
|
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, bodyBytes, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
resp = cliproxyexecutor.Response{Payload: []byte(converted), Headers: httpResp.Header.Clone()}
|
||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
@@ -436,7 +436,7 @@ attemptLoop:
|
|||||||
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
reporter.publish(ctx, parseAntigravityUsage(resp.Payload))
|
||||||
var param any
|
var param any
|
||||||
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, resp.Payload, ¶m)
|
converted := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, resp.Payload, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(converted)}
|
resp = cliproxyexecutor.Response{Payload: []byte(converted), Headers: httpResp.Header.Clone()}
|
||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
|
|
||||||
return resp, nil
|
return resp, nil
|
||||||
@@ -645,7 +645,7 @@ func (e *AntigravityExecutor) convertStreamToNonStream(stream []byte) []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming request to the Antigravity API.
|
// ExecuteStream performs a streaming request to the Antigravity API.
|
||||||
func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *AntigravityExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -775,7 +775,6 @@ attemptLoop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func(resp *http.Response) {
|
go func(resp *http.Response) {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -820,7 +819,7 @@ attemptLoop:
|
|||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
}
|
}
|
||||||
}(httpResp)
|
}(httpResp)
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
switch {
|
switch {
|
||||||
@@ -968,7 +967,7 @@ func (e *AntigravityExecutor) CountTokens(ctx context.Context, auth *cliproxyaut
|
|||||||
if httpResp.StatusCode >= http.StatusOK && httpResp.StatusCode < http.StatusMultipleChoices {
|
if httpResp.StatusCode >= http.StatusOK && httpResp.StatusCode < http.StatusMultipleChoices {
|
||||||
count := gjson.GetBytes(bodyBytes, "totalTokens").Int()
|
count := gjson.GetBytes(bodyBytes, "totalTokens").Int()
|
||||||
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, bodyBytes)
|
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, bodyBytes)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(translated)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(translated), Headers: httpResp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
lastStatus = httpResp.StatusCode
|
lastStatus = httpResp.StatusCode
|
||||||
|
|||||||
@@ -222,11 +222,11 @@ func (e *ClaudeExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
data,
|
data,
|
||||||
¶m,
|
¶m,
|
||||||
)
|
)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -329,7 +329,6 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -398,7 +397,7 @@ func (e *ClaudeExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
@@ -487,7 +486,7 @@ func (e *ClaudeExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
count := gjson.GetBytes(data, "input_tokens").Int()
|
count := gjson.GetBytes(data, "input_tokens").Int()
|
||||||
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(out), Headers: resp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
|
func (e *ClaudeExecutor) Refresh(ctx context.Context, auth *cliproxyauth.Auth) (*cliproxyauth.Auth, error) {
|
||||||
|
|||||||
@@ -183,7 +183,7 @@ func (e *CodexExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
|
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, line, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, line, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
|
err = statusErr{code: 408, msg: "stream error: stream disconnected before completion: stream closed before response.completed"}
|
||||||
@@ -273,11 +273,11 @@ func (e *CodexExecutor) executeCompact(ctx context.Context, auth *cliproxyauth.A
|
|||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, originalPayload, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusBadRequest, msg: "streaming not supported for /responses/compact"}
|
return nil, statusErr{code: http.StatusBadRequest, msg: "streaming not supported for /responses/compact"}
|
||||||
}
|
}
|
||||||
@@ -362,7 +362,6 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -397,7 +396,7 @@ func (e *CodexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *CodexExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -225,7 +225,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
|||||||
reporter.publish(ctx, parseGeminiCLIUsage(data))
|
reporter.publish(ctx, parseGeminiCLIUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, opts.OriginalRequest, payload, data, ¶m)
|
out := sdktranslator.TranslateNonStream(respCtx, to, from, attemptModel, opts.OriginalRequest, payload, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -256,7 +256,7 @@ func (e *GeminiCLIExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming request to the Gemini CLI API.
|
// ExecuteStream performs a streaming request to the Gemini CLI API.
|
||||||
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -382,7 +382,6 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func(resp *http.Response, reqBody []byte, attemptModel string) {
|
go func(resp *http.Response, reqBody []byte, attemptModel string) {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -441,7 +440,7 @@ func (e *GeminiCLIExecutor) ExecuteStream(ctx context.Context, auth *cliproxyaut
|
|||||||
}
|
}
|
||||||
}(httpResp, append([]byte(nil), payload...), attemptModel)
|
}(httpResp, append([]byte(nil), payload...), attemptModel)
|
||||||
|
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(lastBody) > 0 {
|
if len(lastBody) > 0 {
|
||||||
@@ -546,7 +545,7 @@ func (e *GeminiCLIExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.
|
|||||||
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
|
||||||
count := gjson.GetBytes(data, "totalTokens").Int()
|
count := gjson.GetBytes(data, "totalTokens").Int()
|
||||||
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, data)
|
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, data)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(translated)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(translated), Headers: resp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
lastStatus = resp.StatusCode
|
lastStatus = resp.StatusCode
|
||||||
lastBody = append([]byte(nil), data...)
|
lastBody = append([]byte(nil), data...)
|
||||||
|
|||||||
@@ -205,12 +205,12 @@ func (e *GeminiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, r
|
|||||||
reporter.publish(ctx, parseGeminiUsage(data))
|
reporter.publish(ctx, parseGeminiUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming request to the Gemini API.
|
// ExecuteStream performs a streaming request to the Gemini API.
|
||||||
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -298,7 +298,6 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -335,7 +334,7 @@ func (e *GeminiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.A
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountTokens counts tokens for the given request using the Gemini API.
|
// CountTokens counts tokens for the given request using the Gemini API.
|
||||||
@@ -416,7 +415,7 @@ func (e *GeminiExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
|
|
||||||
count := gjson.GetBytes(data, "totalTokens").Int()
|
count := gjson.GetBytes(data, "totalTokens").Int()
|
||||||
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, data)
|
translated := sdktranslator.TranslateTokenCount(respCtx, to, from, count, data)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(translated)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(translated), Headers: resp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Refresh refreshes the authentication credentials (no-op for Gemini API key).
|
// Refresh refreshes the authentication credentials (no-op for Gemini API key).
|
||||||
|
|||||||
@@ -253,7 +253,7 @@ func (e *GeminiVertexExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming request to the Vertex AI API.
|
// ExecuteStream performs a streaming request to the Vertex AI API.
|
||||||
func (e *GeminiVertexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *GeminiVertexExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -419,7 +419,7 @@ func (e *GeminiVertexExecutor) executeWithServiceAccount(ctx context.Context, au
|
|||||||
to := sdktranslator.FromString("gemini")
|
to := sdktranslator.FromString("gemini")
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -524,12 +524,12 @@ func (e *GeminiVertexExecutor) executeWithAPIKey(ctx context.Context, auth *clip
|
|||||||
reporter.publish(ctx, parseGeminiUsage(data))
|
reporter.publish(ctx, parseGeminiUsage(data))
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// executeStreamWithServiceAccount handles streaming authentication using service account credentials.
|
// executeStreamWithServiceAccount handles streaming authentication using service account credentials.
|
||||||
func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, projectID, location string, saJSON []byte) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, projectID, location string, saJSON []byte) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
||||||
@@ -618,7 +618,6 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte
|
|||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -650,11 +649,11 @@ func (e *GeminiVertexExecutor) executeStreamWithServiceAccount(ctx context.Conte
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// executeStreamWithAPIKey handles streaming authentication using API key credentials.
|
// executeStreamWithAPIKey handles streaming authentication using API key credentials.
|
||||||
func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, apiKey, baseURL string) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options, apiKey, baseURL string) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
||||||
@@ -743,7 +742,6 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth
|
|||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -775,7 +773,7 @@ func (e *GeminiVertexExecutor) executeStreamWithAPIKey(ctx context.Context, auth
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// countTokensWithServiceAccount counts tokens using service account credentials.
|
// countTokensWithServiceAccount counts tokens using service account credentials.
|
||||||
@@ -859,7 +857,7 @@ func (e *GeminiVertexExecutor) countTokensWithServiceAccount(ctx context.Context
|
|||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
count := gjson.GetBytes(data, "totalTokens").Int()
|
count := gjson.GetBytes(data, "totalTokens").Int()
|
||||||
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// countTokensWithAPIKey handles token counting using API key credentials.
|
// countTokensWithAPIKey handles token counting using API key credentials.
|
||||||
@@ -943,7 +941,7 @@ func (e *GeminiVertexExecutor) countTokensWithAPIKey(ctx context.Context, auth *
|
|||||||
appendAPIResponseChunk(ctx, e.cfg, data)
|
appendAPIResponseChunk(ctx, e.cfg, data)
|
||||||
count := gjson.GetBytes(data, "totalTokens").Int()
|
count := gjson.GetBytes(data, "totalTokens").Int()
|
||||||
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
out := sdktranslator.TranslateTokenCount(ctx, to, from, count, data)
|
||||||
return cliproxyexecutor.Response{Payload: []byte(out)}, nil
|
return cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// vertexCreds extracts project, location and raw service account JSON from auth metadata.
|
// vertexCreds extracts project, location and raw service account JSON from auth metadata.
|
||||||
|
|||||||
@@ -169,12 +169,12 @@ func (e *IFlowExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, re
|
|||||||
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
||||||
// the original model name in the response for client compatibility.
|
// the original model name in the response for client compatibility.
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming chat completion request.
|
// ExecuteStream performs a streaming chat completion request.
|
||||||
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -262,7 +262,6 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
}
|
}
|
||||||
|
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -294,7 +293,7 @@ func (e *IFlowExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Au
|
|||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *IFlowExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *IFlowExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -161,12 +161,12 @@ func (e *KimiExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
||||||
// the original model name in the response for client compatibility.
|
// the original model name in the response for client compatibility.
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStream performs a streaming chat completion request to Kimi.
|
// ExecuteStream performs a streaming chat completion request to Kimi.
|
||||||
func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
from := opts.SourceFormat
|
from := opts.SourceFormat
|
||||||
if from.String() == "claude" {
|
if from.String() == "claude" {
|
||||||
auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL
|
auth.Attributes["base_url"] = kimiauth.KimiAPIBaseURL
|
||||||
@@ -253,7 +253,6 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -285,7 +284,7 @@ func (e *KimiExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountTokens estimates token count for Kimi requests.
|
// CountTokens estimates token count for Kimi requests.
|
||||||
|
|||||||
@@ -172,11 +172,11 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
|
|||||||
// Translate response back to source format when needed
|
// Translate response back to source format when needed
|
||||||
var param any
|
var param any
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, body, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, translated, body, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
baseModel := thinking.ParseSuffix(req.Model).ModelName
|
||||||
|
|
||||||
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
reporter := newUsageReporter(ctx, e.Identifier(), baseModel, auth)
|
||||||
@@ -258,7 +258,6 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -298,7 +297,7 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
|
|||||||
// Ensure we record the request if no usage chunk was ever seen
|
// Ensure we record the request if no usage chunk was ever seen
|
||||||
reporter.ensurePublished(ctx)
|
reporter.ensurePublished(ctx)
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *OpenAICompatExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -150,11 +150,11 @@ func (e *QwenExecutor) Execute(ctx context.Context, auth *cliproxyauth.Auth, req
|
|||||||
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
// Note: TranslateNonStream uses req.Model (original with suffix) to preserve
|
||||||
// the original model name in the response for client compatibility.
|
// the original model name in the response for client compatibility.
|
||||||
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
out := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, opts.OriginalRequest, body, data, ¶m)
|
||||||
resp = cliproxyexecutor.Response{Payload: []byte(out)}
|
resp = cliproxyexecutor.Response{Payload: []byte(out), Headers: httpResp.Header.Clone()}
|
||||||
return resp, nil
|
return resp, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (stream <-chan cliproxyexecutor.StreamChunk, err error) {
|
func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (_ *cliproxyexecutor.StreamResult, err error) {
|
||||||
if opts.Alt == "responses/compact" {
|
if opts.Alt == "responses/compact" {
|
||||||
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
return nil, statusErr{code: http.StatusNotImplemented, msg: "/responses/compact not supported"}
|
||||||
}
|
}
|
||||||
@@ -236,7 +236,6 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
out := make(chan cliproxyexecutor.StreamChunk)
|
out := make(chan cliproxyexecutor.StreamChunk)
|
||||||
stream = out
|
|
||||||
go func() {
|
go func() {
|
||||||
defer close(out)
|
defer close(out)
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -268,7 +267,7 @@ func (e *QwenExecutor) ExecuteStream(ctx context.Context, auth *cliproxyauth.Aut
|
|||||||
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
out <- cliproxyexecutor.StreamChunk{Err: errScan}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return stream, nil
|
return &cliproxyexecutor.StreamResult{Headers: httpResp.Header.Clone(), Chunks: out}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
func (e *QwenExecutor) CountTokens(ctx context.Context, auth *cliproxyauth.Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) {
|
||||||
|
|||||||
@@ -112,12 +112,13 @@ func (h *ClaudeCodeAPIHandler) ClaudeCountTokens(c *gin.Context) {
|
|||||||
|
|
||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
|
|
||||||
resp, errMsg := h.ExecuteCountWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
resp, upstreamHeaders, errMsg := h.ExecuteCountWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -165,7 +166,7 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO
|
|||||||
|
|
||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
|
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
||||||
stopKeepAlive()
|
stopKeepAlive()
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
@@ -194,6 +195,7 @@ func (h *ClaudeCodeAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSO
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -225,7 +227,7 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
|||||||
// This allows proper cleanup and cancellation of ongoing requests
|
// This allows proper cleanup and cancellation of ongoing requests
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
|
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
||||||
setSSEHeaders := func() {
|
setSSEHeaders := func() {
|
||||||
c.Header("Content-Type", "text/event-stream")
|
c.Header("Content-Type", "text/event-stream")
|
||||||
c.Header("Cache-Control", "no-cache")
|
c.Header("Cache-Control", "no-cache")
|
||||||
@@ -257,6 +259,7 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
|||||||
if !ok {
|
if !ok {
|
||||||
// Stream closed without data? Send DONE or just headers.
|
// Stream closed without data? Send DONE or just headers.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
cliCancel(nil)
|
cliCancel(nil)
|
||||||
return
|
return
|
||||||
@@ -264,6 +267,7 @@ func (h *ClaudeCodeAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON [
|
|||||||
|
|
||||||
// Success! Set headers now.
|
// Success! Set headers now.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
|
|
||||||
// Write the first chunk
|
// Write the first chunk
|
||||||
if len(chunk) > 0 {
|
if len(chunk) > 0 {
|
||||||
|
|||||||
@@ -159,7 +159,8 @@ func (h *GeminiCLIAPIHandler) handleInternalStreamGenerateContent(c *gin.Context
|
|||||||
modelName := modelResult.String()
|
modelName := modelResult.String()
|
||||||
|
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
h.forwardCLIStream(c, flusher, "", func(err error) { cliCancel(err) }, dataChan, errChan)
|
h.forwardCLIStream(c, flusher, "", func(err error) { cliCancel(err) }, dataChan, errChan)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -172,12 +173,13 @@ func (h *GeminiCLIAPIHandler) handleInternalGenerateContent(c *gin.Context, rawJ
|
|||||||
modelName := modelResult.String()
|
modelName := modelResult.String()
|
||||||
|
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -188,7 +188,7 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
|
|||||||
}
|
}
|
||||||
|
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
||||||
|
|
||||||
setSSEHeaders := func() {
|
setSSEHeaders := func() {
|
||||||
c.Header("Content-Type", "text/event-stream")
|
c.Header("Content-Type", "text/event-stream")
|
||||||
@@ -223,6 +223,7 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
|
|||||||
if alt == "" {
|
if alt == "" {
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
cliCancel(nil)
|
cliCancel(nil)
|
||||||
return
|
return
|
||||||
@@ -232,6 +233,7 @@ func (h *GeminiAPIHandler) handleStreamGenerateContent(c *gin.Context, modelName
|
|||||||
if alt == "" {
|
if alt == "" {
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
|
|
||||||
// Write first chunk
|
// Write first chunk
|
||||||
if alt == "" {
|
if alt == "" {
|
||||||
@@ -262,12 +264,13 @@ func (h *GeminiAPIHandler) handleCountTokens(c *gin.Context, modelName string, r
|
|||||||
c.Header("Content-Type", "application/json")
|
c.Header("Content-Type", "application/json")
|
||||||
alt := h.GetAlt(c)
|
alt := h.GetAlt(c)
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
resp, errMsg := h.ExecuteCountWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
resp, upstreamHeaders, errMsg := h.ExecuteCountWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -286,13 +289,14 @@ func (h *GeminiAPIHandler) handleGenerateContent(c *gin.Context, modelName strin
|
|||||||
alt := h.GetAlt(c)
|
alt := h.GetAlt(c)
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, alt)
|
||||||
stopKeepAlive()
|
stopKeepAlive()
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -370,10 +370,10 @@ func appendAPIResponse(c *gin.Context, data []byte) {
|
|||||||
|
|
||||||
// ExecuteWithAuthManager executes a non-streaming request via the core auth manager.
|
// ExecuteWithAuthManager executes a non-streaming request via the core auth manager.
|
||||||
// This path is the only supported execution route.
|
// This path is the only supported execution route.
|
||||||
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
|
func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, http.Header, *interfaces.ErrorMessage) {
|
||||||
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
return nil, errMsg
|
return nil, nil, errMsg
|
||||||
}
|
}
|
||||||
reqMeta := requestExecutionMetadata(ctx)
|
reqMeta := requestExecutionMetadata(ctx)
|
||||||
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
||||||
@@ -406,17 +406,17 @@ func (h *BaseAPIHandler) ExecuteWithAuthManager(ctx context.Context, handlerType
|
|||||||
addon = hdr.Clone()
|
addon = hdr.Clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
return nil, nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
||||||
}
|
}
|
||||||
return resp.Payload, nil
|
return resp.Payload, FilterUpstreamHeaders(resp.Headers), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteCountWithAuthManager executes a non-streaming request via the core auth manager.
|
// ExecuteCountWithAuthManager executes a non-streaming request via the core auth manager.
|
||||||
// This path is the only supported execution route.
|
// This path is the only supported execution route.
|
||||||
func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, *interfaces.ErrorMessage) {
|
func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) ([]byte, http.Header, *interfaces.ErrorMessage) {
|
||||||
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
return nil, errMsg
|
return nil, nil, errMsg
|
||||||
}
|
}
|
||||||
reqMeta := requestExecutionMetadata(ctx)
|
reqMeta := requestExecutionMetadata(ctx)
|
||||||
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
||||||
@@ -449,20 +449,21 @@ func (h *BaseAPIHandler) ExecuteCountWithAuthManager(ctx context.Context, handle
|
|||||||
addon = hdr.Clone()
|
addon = hdr.Clone()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
return nil, nil, &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
||||||
}
|
}
|
||||||
return resp.Payload, nil
|
return resp.Payload, FilterUpstreamHeaders(resp.Headers), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteStreamWithAuthManager executes a streaming request via the core auth manager.
|
// ExecuteStreamWithAuthManager executes a streaming request via the core auth manager.
|
||||||
// This path is the only supported execution route.
|
// This path is the only supported execution route.
|
||||||
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, <-chan *interfaces.ErrorMessage) {
|
// The returned http.Header carries upstream response headers captured before streaming begins.
|
||||||
|
func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handlerType, modelName string, rawJSON []byte, alt string) (<-chan []byte, http.Header, <-chan *interfaces.ErrorMessage) {
|
||||||
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
providers, normalizedModel, errMsg := h.getRequestDetails(modelName)
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
errChan := make(chan *interfaces.ErrorMessage, 1)
|
errChan := make(chan *interfaces.ErrorMessage, 1)
|
||||||
errChan <- errMsg
|
errChan <- errMsg
|
||||||
close(errChan)
|
close(errChan)
|
||||||
return nil, errChan
|
return nil, nil, errChan
|
||||||
}
|
}
|
||||||
reqMeta := requestExecutionMetadata(ctx)
|
reqMeta := requestExecutionMetadata(ctx)
|
||||||
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
reqMeta[coreexecutor.RequestedModelMetadataKey] = normalizedModel
|
||||||
@@ -481,7 +482,7 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
|
|||||||
SourceFormat: sdktranslator.FromString(handlerType),
|
SourceFormat: sdktranslator.FromString(handlerType),
|
||||||
}
|
}
|
||||||
opts.Metadata = reqMeta
|
opts.Metadata = reqMeta
|
||||||
chunks, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
|
streamResult, err := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errChan := make(chan *interfaces.ErrorMessage, 1)
|
errChan := make(chan *interfaces.ErrorMessage, 1)
|
||||||
status := http.StatusInternalServerError
|
status := http.StatusInternalServerError
|
||||||
@@ -498,8 +499,11 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
|
|||||||
}
|
}
|
||||||
errChan <- &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
errChan <- &interfaces.ErrorMessage{StatusCode: status, Error: err, Addon: addon}
|
||||||
close(errChan)
|
close(errChan)
|
||||||
return nil, errChan
|
return nil, nil, errChan
|
||||||
}
|
}
|
||||||
|
// Capture upstream headers from the initial connection synchronously before the goroutine starts.
|
||||||
|
upstreamHeaders := FilterUpstreamHeaders(streamResult.Headers)
|
||||||
|
chunks := streamResult.Chunks
|
||||||
dataChan := make(chan []byte)
|
dataChan := make(chan []byte)
|
||||||
errChan := make(chan *interfaces.ErrorMessage, 1)
|
errChan := make(chan *interfaces.ErrorMessage, 1)
|
||||||
go func() {
|
go func() {
|
||||||
@@ -573,9 +577,9 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
|
|||||||
if !sentPayload {
|
if !sentPayload {
|
||||||
if bootstrapRetries < maxBootstrapRetries && bootstrapEligible(streamErr) {
|
if bootstrapRetries < maxBootstrapRetries && bootstrapEligible(streamErr) {
|
||||||
bootstrapRetries++
|
bootstrapRetries++
|
||||||
retryChunks, retryErr := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
|
retryResult, retryErr := h.AuthManager.ExecuteStream(ctx, providers, req, opts)
|
||||||
if retryErr == nil {
|
if retryErr == nil {
|
||||||
chunks = retryChunks
|
chunks = retryResult.Chunks
|
||||||
continue outer
|
continue outer
|
||||||
}
|
}
|
||||||
streamErr = retryErr
|
streamErr = retryErr
|
||||||
@@ -606,7 +610,7 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
return dataChan, errChan
|
return dataChan, upstreamHeaders, errChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func statusFromError(err error) int {
|
func statusFromError(err error) int {
|
||||||
|
|||||||
@@ -23,7 +23,7 @@ func (e *failOnceStreamExecutor) Execute(context.Context, *coreauth.Auth, coreex
|
|||||||
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
|
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *failOnceStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (<-chan coreexecutor.StreamChunk, error) {
|
func (e *failOnceStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
e.calls++
|
e.calls++
|
||||||
call := e.calls
|
call := e.calls
|
||||||
@@ -40,12 +40,12 @@ func (e *failOnceStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth,
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
close(ch)
|
close(ch)
|
||||||
return ch, nil
|
return &coreexecutor.StreamResult{Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
ch <- coreexecutor.StreamChunk{Payload: []byte("ok")}
|
ch <- coreexecutor.StreamChunk{Payload: []byte("ok")}
|
||||||
close(ch)
|
close(ch)
|
||||||
return ch, nil
|
return &coreexecutor.StreamResult{Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *failOnceStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
|
func (e *failOnceStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
|
||||||
@@ -81,7 +81,7 @@ func (e *payloadThenErrorStreamExecutor) Execute(context.Context, *coreauth.Auth
|
|||||||
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
|
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *payloadThenErrorStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (<-chan coreexecutor.StreamChunk, error) {
|
func (e *payloadThenErrorStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) {
|
||||||
e.mu.Lock()
|
e.mu.Lock()
|
||||||
e.calls++
|
e.calls++
|
||||||
e.mu.Unlock()
|
e.mu.Unlock()
|
||||||
@@ -97,7 +97,7 @@ func (e *payloadThenErrorStreamExecutor) ExecuteStream(context.Context, *coreaut
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
close(ch)
|
close(ch)
|
||||||
return ch, nil
|
return &coreexecutor.StreamResult{Chunks: ch}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *payloadThenErrorStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
|
func (e *payloadThenErrorStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
|
||||||
@@ -159,7 +159,7 @@ func TestExecuteStreamWithAuthManager_RetriesBeforeFirstByte(t *testing.T) {
|
|||||||
BootstrapRetries: 1,
|
BootstrapRetries: 1,
|
||||||
},
|
},
|
||||||
}, manager)
|
}, manager)
|
||||||
dataChan, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "")
|
dataChan, _, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "")
|
||||||
if dataChan == nil || errChan == nil {
|
if dataChan == nil || errChan == nil {
|
||||||
t.Fatalf("expected non-nil channels")
|
t.Fatalf("expected non-nil channels")
|
||||||
}
|
}
|
||||||
@@ -220,7 +220,7 @@ func TestExecuteStreamWithAuthManager_DoesNotRetryAfterFirstByte(t *testing.T) {
|
|||||||
BootstrapRetries: 1,
|
BootstrapRetries: 1,
|
||||||
},
|
},
|
||||||
}, manager)
|
}, manager)
|
||||||
dataChan, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "")
|
dataChan, _, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "")
|
||||||
if dataChan == nil || errChan == nil {
|
if dataChan == nil || errChan == nil {
|
||||||
t.Fatalf("expected non-nil channels")
|
t.Fatalf("expected non-nil channels")
|
||||||
}
|
}
|
||||||
|
|||||||
58
sdk/api/handlers/header_filter.go
Normal file
58
sdk/api/handlers/header_filter.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package handlers
|
||||||
|
|
||||||
|
import "net/http"
|
||||||
|
|
||||||
|
// hopByHopHeaders lists RFC 7230 Section 6.1 hop-by-hop headers that MUST NOT
|
||||||
|
// be forwarded by proxies, plus security-sensitive headers that should not leak.
|
||||||
|
var hopByHopHeaders = map[string]struct{}{
|
||||||
|
// RFC 7230 hop-by-hop
|
||||||
|
"Connection": {},
|
||||||
|
"Keep-Alive": {},
|
||||||
|
"Proxy-Authenticate": {},
|
||||||
|
"Proxy-Authorization": {},
|
||||||
|
"Te": {},
|
||||||
|
"Trailer": {},
|
||||||
|
"Transfer-Encoding": {},
|
||||||
|
"Upgrade": {},
|
||||||
|
// Security-sensitive
|
||||||
|
"Set-Cookie": {},
|
||||||
|
// CPA-managed (set by handlers, not upstream)
|
||||||
|
"Content-Length": {},
|
||||||
|
"Content-Encoding": {},
|
||||||
|
}
|
||||||
|
|
||||||
|
// FilterUpstreamHeaders returns a copy of src with hop-by-hop and security-sensitive
|
||||||
|
// headers removed. Returns nil if src is nil or empty after filtering.
|
||||||
|
func FilterUpstreamHeaders(src http.Header) http.Header {
|
||||||
|
if src == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
dst := make(http.Header)
|
||||||
|
for key, values := range src {
|
||||||
|
if _, blocked := hopByHopHeaders[http.CanonicalHeaderKey(key)]; blocked {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
dst[key] = values
|
||||||
|
}
|
||||||
|
if len(dst) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return dst
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteUpstreamHeaders writes filtered upstream headers to the gin response writer.
|
||||||
|
// Headers already set by CPA (e.g., Content-Type) are NOT overwritten.
|
||||||
|
func WriteUpstreamHeaders(dst http.Header, src http.Header) {
|
||||||
|
if src == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
for key, values := range src {
|
||||||
|
// Don't overwrite headers already set by CPA handlers
|
||||||
|
if dst.Get(key) != "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for _, v := range values {
|
||||||
|
dst.Add(key, v)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -425,12 +425,13 @@ func (h *OpenAIAPIHandler) handleNonStreamingResponse(c *gin.Context, rawJSON []
|
|||||||
|
|
||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, h.GetAlt(c))
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, h.GetAlt(c))
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -457,7 +458,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
|
|||||||
|
|
||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, h.GetAlt(c))
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, h.GetAlt(c))
|
||||||
|
|
||||||
setSSEHeaders := func() {
|
setSSEHeaders := func() {
|
||||||
c.Header("Content-Type", "text/event-stream")
|
c.Header("Content-Type", "text/event-stream")
|
||||||
@@ -490,6 +491,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
|
|||||||
if !ok {
|
if !ok {
|
||||||
// Stream closed without data? Send DONE or just headers.
|
// Stream closed without data? Send DONE or just headers.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
|
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
cliCancel(nil)
|
cliCancel(nil)
|
||||||
@@ -498,6 +500,7 @@ func (h *OpenAIAPIHandler) handleStreamingResponse(c *gin.Context, rawJSON []byt
|
|||||||
|
|
||||||
// Success! Commit to streaming headers.
|
// Success! Commit to streaming headers.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
|
|
||||||
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk))
|
_, _ = fmt.Fprintf(c.Writer, "data: %s\n\n", string(chunk))
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
@@ -525,13 +528,14 @@ func (h *OpenAIAPIHandler) handleCompletionsNonStreamingResponse(c *gin.Context,
|
|||||||
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
|
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "")
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "")
|
||||||
stopKeepAlive()
|
stopKeepAlive()
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
completionsResp := convertChatCompletionsResponseToCompletions(resp)
|
completionsResp := convertChatCompletionsResponseToCompletions(resp)
|
||||||
_, _ = c.Writer.Write(completionsResp)
|
_, _ = c.Writer.Write(completionsResp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
@@ -562,7 +566,7 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra
|
|||||||
|
|
||||||
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
|
modelName := gjson.GetBytes(chatCompletionsJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "")
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, chatCompletionsJSON, "")
|
||||||
|
|
||||||
setSSEHeaders := func() {
|
setSSEHeaders := func() {
|
||||||
c.Header("Content-Type", "text/event-stream")
|
c.Header("Content-Type", "text/event-stream")
|
||||||
@@ -593,6 +597,7 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra
|
|||||||
case chunk, ok := <-dataChan:
|
case chunk, ok := <-dataChan:
|
||||||
if !ok {
|
if !ok {
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
|
_, _ = fmt.Fprintf(c.Writer, "data: [DONE]\n\n")
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
cliCancel(nil)
|
cliCancel(nil)
|
||||||
@@ -601,6 +606,7 @@ func (h *OpenAIAPIHandler) handleCompletionsStreamingResponse(c *gin.Context, ra
|
|||||||
|
|
||||||
// Success! Set headers.
|
// Success! Set headers.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
|
|
||||||
// Write the first chunk
|
// Write the first chunk
|
||||||
converted := convertChatCompletionsStreamChunkToCompletions(chunk)
|
converted := convertChatCompletionsStreamChunkToCompletions(chunk)
|
||||||
|
|||||||
@@ -31,7 +31,7 @@ func (e *compactCaptureExecutor) Execute(ctx context.Context, auth *coreauth.Aut
|
|||||||
return coreexecutor.Response{Payload: []byte(`{"ok":true}`)}, nil
|
return coreexecutor.Response{Payload: []byte(`{"ok":true}`)}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *compactCaptureExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (<-chan coreexecutor.StreamChunk, error) {
|
func (e *compactCaptureExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (*coreexecutor.StreamResult, error) {
|
||||||
return nil, errors.New("not implemented")
|
return nil, errors.New("not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -124,13 +124,14 @@ func (h *OpenAIResponsesAPIHandler) Compact(c *gin.Context) {
|
|||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "responses/compact")
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "responses/compact")
|
||||||
stopKeepAlive()
|
stopKeepAlive()
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -149,13 +150,14 @@ func (h *OpenAIResponsesAPIHandler) handleNonStreamingResponse(c *gin.Context, r
|
|||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
stopKeepAlive := h.StartNonStreamingKeepAlive(c, cliCtx)
|
||||||
|
|
||||||
resp, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
resp, upstreamHeaders, errMsg := h.ExecuteWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
||||||
stopKeepAlive()
|
stopKeepAlive()
|
||||||
if errMsg != nil {
|
if errMsg != nil {
|
||||||
h.WriteErrorResponse(c, errMsg)
|
h.WriteErrorResponse(c, errMsg)
|
||||||
cliCancel(errMsg.Error)
|
cliCancel(errMsg.Error)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write(resp)
|
_, _ = c.Writer.Write(resp)
|
||||||
cliCancel()
|
cliCancel()
|
||||||
}
|
}
|
||||||
@@ -183,7 +185,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponse(c *gin.Context, rawJ
|
|||||||
// New core execution path
|
// New core execution path
|
||||||
modelName := gjson.GetBytes(rawJSON, "model").String()
|
modelName := gjson.GetBytes(rawJSON, "model").String()
|
||||||
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
cliCtx, cliCancel := h.GetContextWithCancel(h, c, context.Background())
|
||||||
dataChan, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
dataChan, upstreamHeaders, errChan := h.ExecuteStreamWithAuthManager(cliCtx, h.HandlerType(), modelName, rawJSON, "")
|
||||||
|
|
||||||
setSSEHeaders := func() {
|
setSSEHeaders := func() {
|
||||||
c.Header("Content-Type", "text/event-stream")
|
c.Header("Content-Type", "text/event-stream")
|
||||||
@@ -216,6 +218,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponse(c *gin.Context, rawJ
|
|||||||
if !ok {
|
if !ok {
|
||||||
// Stream closed without data? Send headers and done.
|
// Stream closed without data? Send headers and done.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
_, _ = c.Writer.Write([]byte("\n"))
|
_, _ = c.Writer.Write([]byte("\n"))
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
cliCancel(nil)
|
cliCancel(nil)
|
||||||
@@ -224,6 +227,7 @@ func (h *OpenAIResponsesAPIHandler) handleStreamingResponse(c *gin.Context, rawJ
|
|||||||
|
|
||||||
// Success! Set headers.
|
// Success! Set headers.
|
||||||
setSSEHeaders()
|
setSSEHeaders()
|
||||||
|
handlers.WriteUpstreamHeaders(c.Writer.Header(), upstreamHeaders)
|
||||||
|
|
||||||
// Write first chunk logic (matching forwardResponsesStream)
|
// Write first chunk logic (matching forwardResponsesStream)
|
||||||
if bytes.HasPrefix(chunk, []byte("event:")) {
|
if bytes.HasPrefix(chunk, []byte("event:")) {
|
||||||
|
|||||||
@@ -30,8 +30,9 @@ type ProviderExecutor interface {
|
|||||||
Identifier() string
|
Identifier() string
|
||||||
// Execute handles non-streaming execution and returns the provider response payload.
|
// Execute handles non-streaming execution and returns the provider response payload.
|
||||||
Execute(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error)
|
Execute(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error)
|
||||||
// ExecuteStream handles streaming execution and returns a channel of provider chunks.
|
// ExecuteStream handles streaming execution and returns a StreamResult containing
|
||||||
ExecuteStream(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error)
|
// upstream headers and a channel of provider chunks.
|
||||||
|
ExecuteStream(ctx context.Context, auth *Auth, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error)
|
||||||
// Refresh attempts to refresh provider credentials and returns the updated auth state.
|
// Refresh attempts to refresh provider credentials and returns the updated auth state.
|
||||||
Refresh(ctx context.Context, auth *Auth) (*Auth, error)
|
Refresh(ctx context.Context, auth *Auth) (*Auth, error)
|
||||||
// CountTokens returns the token count for the given request.
|
// CountTokens returns the token count for the given request.
|
||||||
@@ -533,7 +534,7 @@ func (m *Manager) ExecuteCount(ctx context.Context, providers []string, req clip
|
|||||||
|
|
||||||
// ExecuteStream performs a streaming execution using the configured selector and executor.
|
// ExecuteStream performs a streaming execution using the configured selector and executor.
|
||||||
// It supports multiple providers for the same model and round-robins the starting provider per model.
|
// It supports multiple providers for the same model and round-robins the starting provider per model.
|
||||||
func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) {
|
||||||
normalized := m.normalizeProviders(providers)
|
normalized := m.normalizeProviders(providers)
|
||||||
if len(normalized) == 0 {
|
if len(normalized) == 0 {
|
||||||
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
||||||
@@ -543,9 +544,9 @@ func (m *Manager) ExecuteStream(ctx context.Context, providers []string, req cli
|
|||||||
|
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 0; ; attempt++ {
|
for attempt := 0; ; attempt++ {
|
||||||
chunks, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts)
|
result, errStream := m.executeStreamMixedOnce(ctx, normalized, req, opts)
|
||||||
if errStream == nil {
|
if errStream == nil {
|
||||||
return chunks, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
lastErr = errStream
|
lastErr = errStream
|
||||||
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait)
|
wait, shouldRetry := m.shouldRetryAfterError(errStream, attempt, normalized, req.Model, maxWait)
|
||||||
@@ -672,7 +673,7 @@ func (m *Manager) executeCountMixedOnce(ctx context.Context, providers []string,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) {
|
func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (*cliproxyexecutor.StreamResult, error) {
|
||||||
if len(providers) == 0 {
|
if len(providers) == 0 {
|
||||||
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"}
|
||||||
}
|
}
|
||||||
@@ -702,7 +703,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
|
|||||||
execReq.Model = rewriteModelForAuth(routeModel, auth)
|
execReq.Model = rewriteModelForAuth(routeModel, auth)
|
||||||
execReq.Model = m.applyOAuthModelAlias(auth, execReq.Model)
|
execReq.Model = m.applyOAuthModelAlias(auth, execReq.Model)
|
||||||
execReq.Model = m.applyAPIKeyModelAlias(auth, execReq.Model)
|
execReq.Model = m.applyAPIKeyModelAlias(auth, execReq.Model)
|
||||||
chunks, errStream := executor.ExecuteStream(execCtx, auth, execReq, opts)
|
streamResult, errStream := executor.ExecuteStream(execCtx, auth, execReq, opts)
|
||||||
if errStream != nil {
|
if errStream != nil {
|
||||||
if errCtx := execCtx.Err(); errCtx != nil {
|
if errCtx := execCtx.Err(); errCtx != nil {
|
||||||
return nil, errCtx
|
return nil, errCtx
|
||||||
@@ -750,8 +751,11 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
|
|||||||
if !failed {
|
if !failed {
|
||||||
m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: true})
|
m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: true})
|
||||||
}
|
}
|
||||||
}(execCtx, auth.Clone(), provider, chunks)
|
}(execCtx, auth.Clone(), provider, streamResult.Chunks)
|
||||||
return out, nil
|
return &cliproxyexecutor.StreamResult{
|
||||||
|
Headers: streamResult.Headers,
|
||||||
|
Chunks: out,
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -46,6 +46,8 @@ type Response struct {
|
|||||||
Payload []byte
|
Payload []byte
|
||||||
// Metadata exposes optional structured data for translators.
|
// Metadata exposes optional structured data for translators.
|
||||||
Metadata map[string]any
|
Metadata map[string]any
|
||||||
|
// Headers carries upstream HTTP response headers for passthrough to clients.
|
||||||
|
Headers http.Header
|
||||||
}
|
}
|
||||||
|
|
||||||
// StreamChunk represents a single streaming payload unit emitted by provider executors.
|
// StreamChunk represents a single streaming payload unit emitted by provider executors.
|
||||||
@@ -56,6 +58,15 @@ type StreamChunk struct {
|
|||||||
Err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StreamResult wraps the streaming response, providing both the chunk channel
|
||||||
|
// and the upstream HTTP response headers captured before streaming begins.
|
||||||
|
type StreamResult struct {
|
||||||
|
// Headers carries upstream HTTP response headers from the initial connection.
|
||||||
|
Headers http.Header
|
||||||
|
// Chunks is the channel of streaming payload units.
|
||||||
|
Chunks <-chan StreamChunk
|
||||||
|
}
|
||||||
|
|
||||||
// StatusError represents an error that carries an HTTP-like status code.
|
// StatusError represents an error that carries an HTTP-like status code.
|
||||||
// Provider executors should implement this when possible to enable
|
// Provider executors should implement this when possible to enable
|
||||||
// better auth state updates on failures (e.g., 401/402/429).
|
// better auth state updates on failures (e.g., 401/402/429).
|
||||||
|
|||||||
Reference in New Issue
Block a user