diff --git a/sdk/cliproxy/auth/conductor.go b/sdk/cliproxy/auth/conductor.go index a24d111d..a6987190 100644 --- a/sdk/cliproxy/auth/conductor.go +++ b/sdk/cliproxy/auth/conductor.go @@ -732,173 +732,6 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string } } -func (m *Manager) executeWithProvider(ctx context.Context, provider string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { - if provider == "" { - return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "provider identifier is empty"} - } - routeModel := req.Model - opts = ensureRequestedModelMetadata(opts, routeModel) - tried := make(map[string]struct{}) - var lastErr error - for { - auth, executor, errPick := m.pickNext(ctx, provider, routeModel, opts, tried) - if errPick != nil { - if lastErr != nil { - return cliproxyexecutor.Response{}, lastErr - } - return cliproxyexecutor.Response{}, errPick - } - - entry := logEntryWithRequestID(ctx) - debugLogAuthSelection(entry, auth, provider, req.Model) - - tried[auth.ID] = struct{}{} - execCtx := ctx - if rt := m.roundTripperFor(auth); rt != nil { - execCtx = context.WithValue(execCtx, roundTripperContextKey{}, rt) - execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt) - } - execReq := req - execReq.Model = rewriteModelForAuth(routeModel, auth) - execReq.Model = m.applyOAuthModelAlias(auth, execReq.Model) - execReq.Model = m.applyAPIKeyModelAlias(auth, execReq.Model) - resp, errExec := executor.Execute(execCtx, auth, execReq, opts) - result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: errExec == nil} - if errExec != nil { - result.Error = &Error{Message: errExec.Error()} - var se cliproxyexecutor.StatusError - if errors.As(errExec, &se) && se != nil { - result.Error.HTTPStatus = se.StatusCode() - } - if ra := retryAfterFromError(errExec); ra != nil { - result.RetryAfter = ra - } - m.MarkResult(execCtx, result) - lastErr = errExec - continue - } - m.MarkResult(execCtx, result) - return resp, nil - } -} - -func (m *Manager) executeCountWithProvider(ctx context.Context, provider string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (cliproxyexecutor.Response, error) { - if provider == "" { - return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "provider identifier is empty"} - } - routeModel := req.Model - opts = ensureRequestedModelMetadata(opts, routeModel) - tried := make(map[string]struct{}) - var lastErr error - for { - auth, executor, errPick := m.pickNext(ctx, provider, routeModel, opts, tried) - if errPick != nil { - if lastErr != nil { - return cliproxyexecutor.Response{}, lastErr - } - return cliproxyexecutor.Response{}, errPick - } - - entry := logEntryWithRequestID(ctx) - debugLogAuthSelection(entry, auth, provider, req.Model) - - tried[auth.ID] = struct{}{} - execCtx := ctx - if rt := m.roundTripperFor(auth); rt != nil { - execCtx = context.WithValue(execCtx, roundTripperContextKey{}, rt) - execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt) - } - execReq := req - execReq.Model = rewriteModelForAuth(routeModel, auth) - execReq.Model = m.applyOAuthModelAlias(auth, execReq.Model) - execReq.Model = m.applyAPIKeyModelAlias(auth, execReq.Model) - resp, errExec := executor.CountTokens(execCtx, auth, execReq, opts) - result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: errExec == nil} - if errExec != nil { - result.Error = &Error{Message: errExec.Error()} - var se cliproxyexecutor.StatusError - if errors.As(errExec, &se) && se != nil { - result.Error.HTTPStatus = se.StatusCode() - } - if ra := retryAfterFromError(errExec); ra != nil { - result.RetryAfter = ra - } - m.MarkResult(execCtx, result) - lastErr = errExec - continue - } - m.MarkResult(execCtx, result) - return resp, nil - } -} - -func (m *Manager) executeStreamWithProvider(ctx context.Context, provider string, req cliproxyexecutor.Request, opts cliproxyexecutor.Options) (<-chan cliproxyexecutor.StreamChunk, error) { - if provider == "" { - return nil, &Error{Code: "provider_not_found", Message: "provider identifier is empty"} - } - routeModel := req.Model - opts = ensureRequestedModelMetadata(opts, routeModel) - tried := make(map[string]struct{}) - var lastErr error - for { - auth, executor, errPick := m.pickNext(ctx, provider, routeModel, opts, tried) - if errPick != nil { - if lastErr != nil { - return nil, lastErr - } - return nil, errPick - } - - entry := logEntryWithRequestID(ctx) - debugLogAuthSelection(entry, auth, provider, req.Model) - - tried[auth.ID] = struct{}{} - execCtx := ctx - if rt := m.roundTripperFor(auth); rt != nil { - execCtx = context.WithValue(execCtx, roundTripperContextKey{}, rt) - execCtx = context.WithValue(execCtx, "cliproxy.roundtripper", rt) - } - execReq := req - execReq.Model = rewriteModelForAuth(routeModel, auth) - execReq.Model = m.applyOAuthModelAlias(auth, execReq.Model) - execReq.Model = m.applyAPIKeyModelAlias(auth, execReq.Model) - chunks, errStream := executor.ExecuteStream(execCtx, auth, execReq, opts) - if errStream != nil { - rerr := &Error{Message: errStream.Error()} - var se cliproxyexecutor.StatusError - if errors.As(errStream, &se) && se != nil { - rerr.HTTPStatus = se.StatusCode() - } - result := Result{AuthID: auth.ID, Provider: provider, Model: routeModel, Success: false, Error: rerr} - result.RetryAfter = retryAfterFromError(errStream) - m.MarkResult(execCtx, result) - lastErr = errStream - continue - } - out := make(chan cliproxyexecutor.StreamChunk) - go func(streamCtx context.Context, streamAuth *Auth, streamProvider string, streamChunks <-chan cliproxyexecutor.StreamChunk) { - defer close(out) - var failed bool - for chunk := range streamChunks { - if chunk.Err != nil && !failed { - failed = true - rerr := &Error{Message: chunk.Err.Error()} - var se cliproxyexecutor.StatusError - if errors.As(chunk.Err, &se) && se != nil { - rerr.HTTPStatus = se.StatusCode() - } - m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: false, Error: rerr}) - } - out <- chunk - } - if !failed { - m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: true}) - } - }(execCtx, auth.Clone(), provider, chunks) - return out, nil - } -} - func ensureRequestedModelMetadata(opts cliproxyexecutor.Options, requestedModel string) cliproxyexecutor.Options { requestedModel = strings.TrimSpace(requestedModel) if requestedModel == "" { @@ -1185,35 +1018,6 @@ func (m *Manager) normalizeProviders(providers []string) []string { return result } -// rotateProviders returns a rotated view of the providers list starting from the -// current offset for the model, and atomically increments the offset for the next call. -// This ensures concurrent requests get different starting providers. -func (m *Manager) rotateProviders(model string, providers []string) []string { - if len(providers) == 0 { - return nil - } - - // Atomic read-and-increment: get current offset and advance cursor in one lock - m.mu.Lock() - offset := m.providerOffsets[model] - m.providerOffsets[model] = (offset + 1) % len(providers) - m.mu.Unlock() - - if len(providers) > 0 { - offset %= len(providers) - } - if offset < 0 { - offset = 0 - } - if offset == 0 { - return providers - } - rotated := make([]string, 0, len(providers)) - rotated = append(rotated, providers[offset:]...) - rotated = append(rotated, providers[:offset]...) - return rotated -} - func (m *Manager) retrySettings() (int, time.Duration) { if m == nil { return 0, 0 @@ -1295,42 +1099,6 @@ func waitForCooldown(ctx context.Context, wait time.Duration) error { } } -func (m *Manager) executeProvidersOnce(ctx context.Context, providers []string, fn func(context.Context, string) (cliproxyexecutor.Response, error)) (cliproxyexecutor.Response, error) { - if len(providers) == 0 { - return cliproxyexecutor.Response{}, &Error{Code: "provider_not_found", Message: "no provider supplied"} - } - var lastErr error - for _, provider := range providers { - resp, errExec := fn(ctx, provider) - if errExec == nil { - return resp, nil - } - lastErr = errExec - } - if lastErr != nil { - return cliproxyexecutor.Response{}, lastErr - } - return cliproxyexecutor.Response{}, &Error{Code: "auth_not_found", Message: "no auth available"} -} - -func (m *Manager) executeStreamProvidersOnce(ctx context.Context, providers []string, fn func(context.Context, string) (<-chan cliproxyexecutor.StreamChunk, error)) (<-chan cliproxyexecutor.StreamChunk, error) { - if len(providers) == 0 { - return nil, &Error{Code: "provider_not_found", Message: "no provider supplied"} - } - var lastErr error - for _, provider := range providers { - chunks, errExec := fn(ctx, provider) - if errExec == nil { - return chunks, nil - } - lastErr = errExec - } - if lastErr != nil { - return nil, lastErr - } - return nil, &Error{Code: "auth_not_found", Message: "no auth available"} -} - // MarkResult records an execution result and notifies hooks. func (m *Manager) MarkResult(ctx context.Context, result Result) { if result.AuthID == "" {