Compare commits

..

12 Commits

Author SHA1 Message Date
Luis Pater
4eb1e6093f feat(handlers): add test to verify no retries after partial stream response
Introduce `TestExecuteStreamWithAuthManager_DoesNotRetryAfterFirstByte` to validate that stream executions do not retry after receiving partial responses. Implement `payloadThenErrorStreamExecutor` for test coverage of this behavior.
2026-01-29 17:30:48 +08:00
Luis Pater
189a066807 Merge pull request #1296 from router-for-me/log
fix(api): update amp module only on config changes
2026-01-29 17:27:52 +08:00
hkfires
d0bada7a43 fix(config): prune oauth-model-alias when preserving config 2026-01-29 14:06:52 +08:00
hkfires
8510fc313e fix(api): update amp module only on config changes 2026-01-29 09:28:49 +08:00
Luis Pater
9e5b1d24e8 Merge pull request #1276 from router-for-me/thinking
feat(thinking): enable thinking toggle for qwen3 and deepseek models
2026-01-28 11:16:54 +08:00
Luis Pater
a7dae6ad52 Merge remote-tracking branch 'origin/dev' into dev 2026-01-28 10:59:00 +08:00
Luis Pater
e93e05ae25 refactor: consolidate channel send logic with context-safe handlers
Optimize channel operations by introducing reusable context-aware send functions (`send` and `sendErr`) across `wsrelay`, `handlers`, and `cliproxy`. Ensure graceful handling of canceled contexts during stream operations.
2026-01-28 10:58:35 +08:00
hkfires
c8c27325dc feat(thinking): enable thinking toggle for qwen3 and deepseek models
Fix #1245
2026-01-28 09:54:05 +08:00
hkfires
c3b6f3918c chore(git): stop ignoring .idea and data directories 2026-01-28 09:52:44 +08:00
Luis Pater
bbb55a8ab4 Merge pull request #1170 from BianBianY/main
feat: optimization enable/disable auth files
2026-01-28 09:34:35 +08:00
Yang Bian
f7bfa8a05c Merge branch 'upstream-main' 2026-01-24 16:28:08 +08:00
Yang Bian
c8620d1633 feat: optimization enable/disable auth files 2026-01-23 18:03:09 +08:00
12 changed files with 261 additions and 37 deletions

View File

@@ -12,6 +12,7 @@ import (
"net/http"
"os"
"path/filepath"
"reflect"
"strings"
"sync"
"sync/atomic"
@@ -990,14 +991,17 @@ func (s *Server) UpdateClients(cfg *config.Config) {
s.mgmt.SetAuthManager(s.handlers.AuthManager)
}
// Notify Amp module of config changes (for model mapping hot-reload)
if s.ampModule != nil {
log.Debugf("triggering amp module config update")
if err := s.ampModule.OnConfigUpdated(cfg); err != nil {
log.Errorf("failed to update Amp module config: %v", err)
// Notify Amp module only when Amp config has changed.
ampConfigChanged := oldCfg == nil || !reflect.DeepEqual(oldCfg.AmpCode, cfg.AmpCode)
if ampConfigChanged {
if s.ampModule != nil {
log.Debugf("triggering amp module config update")
if err := s.ampModule.OnConfigUpdated(cfg); err != nil {
log.Errorf("failed to update Amp module config: %v", err)
}
} else {
log.Warnf("amp module is nil, skipping config update")
}
} else {
log.Warnf("amp module is nil, skipping config update")
}
// Count client sources from configuration and auth store.

View File

@@ -923,6 +923,7 @@ func SaveConfigPreserveComments(configFile string, cfg *Config) error {
removeLegacyGenerativeLanguageKeys(original.Content[0])
pruneMappingToGeneratedKeys(original.Content[0], generated.Content[0], "oauth-excluded-models")
pruneMappingToGeneratedKeys(original.Content[0], generated.Content[0], "oauth-model-alias")
// Merge generated into original in-place, preserving comments/order of existing nodes.
mergeMappingPreserve(original.Content[0], generated.Content[0])

View File

@@ -784,7 +784,7 @@ func GetIFlowModels() []*ModelInfo {
{ID: "qwen3-coder-plus", DisplayName: "Qwen3-Coder-Plus", Description: "Qwen3 Coder Plus code generation", Created: 1753228800},
{ID: "qwen3-max", DisplayName: "Qwen3-Max", Description: "Qwen3 flagship model", Created: 1758672000},
{ID: "qwen3-vl-plus", DisplayName: "Qwen3-VL-Plus", Description: "Qwen3 multimodal vision-language", Created: 1758672000},
{ID: "qwen3-max-preview", DisplayName: "Qwen3-Max-Preview", Description: "Qwen3 Max preview build", Created: 1757030400},
{ID: "qwen3-max-preview", DisplayName: "Qwen3-Max-Preview", Description: "Qwen3 Max preview build", Created: 1757030400, Thinking: iFlowThinkingSupport},
{ID: "kimi-k2-0905", DisplayName: "Kimi-K2-Instruct-0905", Description: "Moonshot Kimi K2 instruct 0905", Created: 1757030400},
{ID: "glm-4.6", DisplayName: "GLM-4.6", Description: "Zhipu GLM 4.6 general model", Created: 1759190400, Thinking: iFlowThinkingSupport},
{ID: "glm-4.7", DisplayName: "GLM-4.7", Description: "Zhipu GLM 4.7 general model", Created: 1766448000, Thinking: iFlowThinkingSupport},
@@ -792,8 +792,8 @@ func GetIFlowModels() []*ModelInfo {
{ID: "kimi-k2-thinking", DisplayName: "Kimi-K2-Thinking", Description: "Moonshot Kimi K2 thinking model", Created: 1762387200},
{ID: "deepseek-v3.2-chat", DisplayName: "DeepSeek-V3.2", Description: "DeepSeek V3.2 Chat", Created: 1764576000},
{ID: "deepseek-v3.2-reasoner", DisplayName: "DeepSeek-V3.2", Description: "DeepSeek V3.2 Reasoner", Created: 1764576000},
{ID: "deepseek-v3.2", DisplayName: "DeepSeek-V3.2-Exp", Description: "DeepSeek V3.2 experimental", Created: 1759104000},
{ID: "deepseek-v3.1", DisplayName: "DeepSeek-V3.1-Terminus", Description: "DeepSeek V3.1 Terminus", Created: 1756339200},
{ID: "deepseek-v3.2", DisplayName: "DeepSeek-V3.2-Exp", Description: "DeepSeek V3.2 experimental", Created: 1759104000, Thinking: iFlowThinkingSupport},
{ID: "deepseek-v3.1", DisplayName: "DeepSeek-V3.1-Terminus", Description: "DeepSeek V3.1 Terminus", Created: 1756339200, Thinking: iFlowThinkingSupport},
{ID: "deepseek-r1", DisplayName: "DeepSeek-R1", Description: "DeepSeek reasoning model R1", Created: 1737331200},
{ID: "deepseek-v3", DisplayName: "DeepSeek-V3-671B", Description: "DeepSeek V3 671B", Created: 1734307200},
{ID: "qwen3-32b", DisplayName: "Qwen3-32B", Description: "Qwen3 32B", Created: 1747094400},

View File

@@ -1,7 +1,7 @@
// Package iflow implements thinking configuration for iFlow models (GLM, MiniMax).
// Package iflow implements thinking configuration for iFlow models.
//
// iFlow models use boolean toggle semantics:
// - GLM models: chat_template_kwargs.enable_thinking (boolean)
// - Models using chat_template_kwargs.enable_thinking (boolean toggle)
// - MiniMax models: reasoning_split (boolean)
//
// Level values are converted to boolean: none=false, all others=true
@@ -20,6 +20,7 @@ import (
// Applier implements thinking.ProviderApplier for iFlow models.
//
// iFlow-specific behavior:
// - enable_thinking toggle models: enable_thinking boolean
// - GLM models: enable_thinking boolean + clear_thinking=false
// - MiniMax models: reasoning_split boolean
// - Level to boolean: none=false, others=true
@@ -61,8 +62,8 @@ func (a *Applier) Apply(body []byte, config thinking.ThinkingConfig, modelInfo *
return body, nil
}
if isGLMModel(modelInfo.ID) {
return applyGLM(body, config), nil
if isEnableThinkingModel(modelInfo.ID) {
return applyEnableThinking(body, config, isGLMModel(modelInfo.ID)), nil
}
if isMiniMaxModel(modelInfo.ID) {
@@ -97,7 +98,8 @@ func configToBoolean(config thinking.ThinkingConfig) bool {
}
}
// applyGLM applies thinking configuration for GLM models.
// applyEnableThinking applies thinking configuration for models that use
// chat_template_kwargs.enable_thinking format.
//
// Output format when enabled:
//
@@ -107,9 +109,8 @@ func configToBoolean(config thinking.ThinkingConfig) bool {
//
// {"chat_template_kwargs": {"enable_thinking": false}}
//
// Note: clear_thinking is only set when thinking is enabled, to preserve
// thinking output in the response.
func applyGLM(body []byte, config thinking.ThinkingConfig) []byte {
// Note: clear_thinking is only set for GLM models when thinking is enabled.
func applyEnableThinking(body []byte, config thinking.ThinkingConfig, setClearThinking bool) []byte {
enableThinking := configToBoolean(config)
if len(body) == 0 || !gjson.ValidBytes(body) {
@@ -118,8 +119,11 @@ func applyGLM(body []byte, config thinking.ThinkingConfig) []byte {
result, _ := sjson.SetBytes(body, "chat_template_kwargs.enable_thinking", enableThinking)
// clear_thinking is a GLM-only knob, strip it for other models.
result, _ = sjson.DeleteBytes(result, "chat_template_kwargs.clear_thinking")
// clear_thinking only needed when thinking is enabled
if enableThinking {
if enableThinking && setClearThinking {
result, _ = sjson.SetBytes(result, "chat_template_kwargs.clear_thinking", false)
}
@@ -143,8 +147,21 @@ func applyMiniMax(body []byte, config thinking.ThinkingConfig) []byte {
return result
}
// isEnableThinkingModel determines if the model uses chat_template_kwargs.enable_thinking format.
func isEnableThinkingModel(modelID string) bool {
if isGLMModel(modelID) {
return true
}
id := strings.ToLower(modelID)
switch id {
case "qwen3-max-preview", "deepseek-v3.2", "deepseek-v3.1":
return true
default:
return false
}
}
// isGLMModel determines if the model is a GLM series model.
// GLM models use chat_template_kwargs.enable_thinking format.
func isGLMModel(modelID string) bool {
return strings.HasPrefix(strings.ToLower(modelID), "glm")
}

View File

@@ -86,12 +86,19 @@ func (s *FileSynthesizer) Synthesize(ctx *SynthesisContext) ([]*coreauth.Auth, e
}
}
disabled, _ := metadata["disabled"].(bool)
status := coreauth.StatusActive
if disabled {
status = coreauth.StatusDisabled
}
a := &coreauth.Auth{
ID: id,
Provider: provider,
Label: label,
Prefix: prefix,
Status: coreauth.StatusActive,
Status: status,
Disabled: disabled,
Attributes: map[string]string{
"source": full,
"path": full,

View File

@@ -124,32 +124,47 @@ func (m *Manager) Stream(ctx context.Context, provider string, req *HTTPRequest)
out := make(chan StreamEvent)
go func() {
defer close(out)
send := func(ev StreamEvent) bool {
if ctx == nil {
out <- ev
return true
}
select {
case <-ctx.Done():
return false
case out <- ev:
return true
}
}
for {
select {
case <-ctx.Done():
out <- StreamEvent{Err: ctx.Err()}
return
case msg, ok := <-respCh:
if !ok {
out <- StreamEvent{Err: errors.New("wsrelay: stream closed")}
_ = send(StreamEvent{Err: errors.New("wsrelay: stream closed")})
return
}
switch msg.Type {
case MessageTypeStreamStart:
resp := decodeResponse(msg.Payload)
out <- StreamEvent{Type: MessageTypeStreamStart, Status: resp.Status, Headers: resp.Headers}
if okSend := send(StreamEvent{Type: MessageTypeStreamStart, Status: resp.Status, Headers: resp.Headers}); !okSend {
return
}
case MessageTypeStreamChunk:
chunk := decodeChunk(msg.Payload)
out <- StreamEvent{Type: MessageTypeStreamChunk, Payload: chunk}
if okSend := send(StreamEvent{Type: MessageTypeStreamChunk, Payload: chunk}); !okSend {
return
}
case MessageTypeStreamEnd:
out <- StreamEvent{Type: MessageTypeStreamEnd}
_ = send(StreamEvent{Type: MessageTypeStreamEnd})
return
case MessageTypeError:
out <- StreamEvent{Type: MessageTypeError, Err: decodeError(msg.Payload)}
_ = send(StreamEvent{Type: MessageTypeError, Err: decodeError(msg.Payload)})
return
case MessageTypeHTTPResp:
resp := decodeResponse(msg.Payload)
out <- StreamEvent{Type: MessageTypeHTTPResp, Status: resp.Status, Headers: resp.Headers, Payload: resp.Body}
_ = send(StreamEvent{Type: MessageTypeHTTPResp, Status: resp.Status, Headers: resp.Headers, Payload: resp.Body})
return
default:
}

View File

@@ -506,6 +506,32 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
bootstrapRetries := 0
maxBootstrapRetries := StreamingBootstrapRetries(h.Cfg)
sendErr := func(msg *interfaces.ErrorMessage) bool {
if ctx == nil {
errChan <- msg
return true
}
select {
case <-ctx.Done():
return false
case errChan <- msg:
return true
}
}
sendData := func(chunk []byte) bool {
if ctx == nil {
dataChan <- chunk
return true
}
select {
case <-ctx.Done():
return false
case dataChan <- chunk:
return true
}
}
bootstrapEligible := func(err error) bool {
status := statusFromError(err)
if status == 0 {
@@ -565,12 +591,14 @@ func (h *BaseAPIHandler) ExecuteStreamWithAuthManager(ctx context.Context, handl
addon = hdr.Clone()
}
}
errChan <- &interfaces.ErrorMessage{StatusCode: status, Error: streamErr, Addon: addon}
_ = sendErr(&interfaces.ErrorMessage{StatusCode: status, Error: streamErr, Addon: addon})
return
}
if len(chunk.Payload) > 0 {
sentPayload = true
dataChan <- cloneBytes(chunk.Payload)
if okSendData := sendData(cloneBytes(chunk.Payload)); !okSendData {
return
}
}
}
}

View File

@@ -70,6 +70,58 @@ func (e *failOnceStreamExecutor) Calls() int {
return e.calls
}
type payloadThenErrorStreamExecutor struct {
mu sync.Mutex
calls int
}
func (e *payloadThenErrorStreamExecutor) Identifier() string { return "codex" }
func (e *payloadThenErrorStreamExecutor) Execute(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "Execute not implemented"}
}
func (e *payloadThenErrorStreamExecutor) ExecuteStream(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (<-chan coreexecutor.StreamChunk, error) {
e.mu.Lock()
e.calls++
e.mu.Unlock()
ch := make(chan coreexecutor.StreamChunk, 2)
ch <- coreexecutor.StreamChunk{Payload: []byte("partial")}
ch <- coreexecutor.StreamChunk{
Err: &coreauth.Error{
Code: "upstream_closed",
Message: "upstream closed",
Retryable: false,
HTTPStatus: http.StatusBadGateway,
},
}
close(ch)
return ch, nil
}
func (e *payloadThenErrorStreamExecutor) Refresh(ctx context.Context, auth *coreauth.Auth) (*coreauth.Auth, error) {
return auth, nil
}
func (e *payloadThenErrorStreamExecutor) CountTokens(context.Context, *coreauth.Auth, coreexecutor.Request, coreexecutor.Options) (coreexecutor.Response, error) {
return coreexecutor.Response{}, &coreauth.Error{Code: "not_implemented", Message: "CountTokens not implemented"}
}
func (e *payloadThenErrorStreamExecutor) HttpRequest(ctx context.Context, auth *coreauth.Auth, req *http.Request) (*http.Response, error) {
return nil, &coreauth.Error{
Code: "not_implemented",
Message: "HttpRequest not implemented",
HTTPStatus: http.StatusNotImplemented,
}
}
func (e *payloadThenErrorStreamExecutor) Calls() int {
e.mu.Lock()
defer e.mu.Unlock()
return e.calls
}
func TestExecuteStreamWithAuthManager_RetriesBeforeFirstByte(t *testing.T) {
executor := &failOnceStreamExecutor{}
manager := coreauth.NewManager(nil, nil, nil)
@@ -130,3 +182,73 @@ func TestExecuteStreamWithAuthManager_RetriesBeforeFirstByte(t *testing.T) {
t.Fatalf("expected 2 stream attempts, got %d", executor.Calls())
}
}
func TestExecuteStreamWithAuthManager_DoesNotRetryAfterFirstByte(t *testing.T) {
executor := &payloadThenErrorStreamExecutor{}
manager := coreauth.NewManager(nil, nil, nil)
manager.RegisterExecutor(executor)
auth1 := &coreauth.Auth{
ID: "auth1",
Provider: "codex",
Status: coreauth.StatusActive,
Metadata: map[string]any{"email": "test1@example.com"},
}
if _, err := manager.Register(context.Background(), auth1); err != nil {
t.Fatalf("manager.Register(auth1): %v", err)
}
auth2 := &coreauth.Auth{
ID: "auth2",
Provider: "codex",
Status: coreauth.StatusActive,
Metadata: map[string]any{"email": "test2@example.com"},
}
if _, err := manager.Register(context.Background(), auth2); err != nil {
t.Fatalf("manager.Register(auth2): %v", err)
}
registry.GetGlobalRegistry().RegisterClient(auth1.ID, auth1.Provider, []*registry.ModelInfo{{ID: "test-model"}})
registry.GetGlobalRegistry().RegisterClient(auth2.ID, auth2.Provider, []*registry.ModelInfo{{ID: "test-model"}})
t.Cleanup(func() {
registry.GetGlobalRegistry().UnregisterClient(auth1.ID)
registry.GetGlobalRegistry().UnregisterClient(auth2.ID)
})
handler := NewBaseAPIHandlers(&sdkconfig.SDKConfig{
Streaming: sdkconfig.StreamingConfig{
BootstrapRetries: 1,
},
}, manager)
dataChan, errChan := handler.ExecuteStreamWithAuthManager(context.Background(), "openai", "test-model", []byte(`{"model":"test-model"}`), "")
if dataChan == nil || errChan == nil {
t.Fatalf("expected non-nil channels")
}
var got []byte
for chunk := range dataChan {
got = append(got, chunk...)
}
var gotErr error
var gotStatus int
for msg := range errChan {
if msg != nil && msg.Error != nil {
gotErr = msg.Error
gotStatus = msg.StatusCode
}
}
if string(got) != "partial" {
t.Fatalf("expected payload partial, got %q", string(got))
}
if gotErr == nil {
t.Fatalf("expected terminal error, got nil")
}
if gotStatus != http.StatusBadGateway {
t.Fatalf("expected status %d, got %d", http.StatusBadGateway, gotStatus)
}
if executor.Calls() != 1 {
t.Fatalf("expected 1 stream attempt, got %d", executor.Calls())
}
}

View File

@@ -68,6 +68,7 @@ func (s *FileTokenStore) Save(ctx context.Context, auth *cliproxyauth.Auth) (str
return "", err
}
case auth.Metadata != nil:
auth.Metadata["disabled"] = auth.Disabled
raw, errMarshal := json.Marshal(auth.Metadata)
if errMarshal != nil {
return "", fmt.Errorf("auth filestore: marshal metadata failed: %w", errMarshal)
@@ -214,12 +215,18 @@ func (s *FileTokenStore) readAuthFile(path, baseDir string) (*cliproxyauth.Auth,
return nil, fmt.Errorf("stat file: %w", err)
}
id := s.idFor(path, baseDir)
disabled, _ := metadata["disabled"].(bool)
status := cliproxyauth.StatusActive
if disabled {
status = cliproxyauth.StatusDisabled
}
auth := &cliproxyauth.Auth{
ID: id,
Provider: provider,
FileName: id,
Label: s.labelFor(metadata),
Status: cliproxyauth.StatusActive,
Status: status,
Disabled: disabled,
Attributes: map[string]string{"path": path},
Metadata: metadata,
CreatedAt: info.ModTime(),

View File

@@ -718,6 +718,7 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
go func(streamCtx context.Context, streamAuth *Auth, streamProvider string, streamChunks <-chan cliproxyexecutor.StreamChunk) {
defer close(out)
var failed bool
forward := true
for chunk := range streamChunks {
if chunk.Err != nil && !failed {
failed = true
@@ -728,7 +729,18 @@ func (m *Manager) executeStreamMixedOnce(ctx context.Context, providers []string
}
m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: false, Error: rerr})
}
out <- chunk
if !forward {
continue
}
if streamCtx == nil {
out <- chunk
continue
}
select {
case <-streamCtx.Done():
forward = false
case out <- chunk:
}
}
if !failed {
m.MarkResult(streamCtx, Result{AuthID: streamAuth.ID, Provider: streamProvider, Model: routeModel, Success: true})

View File

@@ -681,6 +681,10 @@ func (s *Service) registerModelsForAuth(a *coreauth.Auth) {
if a == nil || a.ID == "" {
return
}
if a.Disabled {
GlobalModelRegistry().UnregisterClient(a.ID)
return
}
authKind := strings.ToLower(strings.TrimSpace(a.Attributes["auth_kind"]))
if authKind == "" {
if kind, _ := a.AccountInfo(); strings.EqualFold(kind, "api_key") {

View File

@@ -2,6 +2,7 @@ package test
import (
"fmt"
"strings"
"testing"
"time"
@@ -2778,12 +2779,18 @@ func runThinkingTests(t *testing.T, cases []thinkingTestCase) {
// Verify clear_thinking for iFlow GLM models when enable_thinking=true
if tc.to == "iflow" && tc.expectField == "chat_template_kwargs.enable_thinking" && tc.expectValue == "true" {
baseModel := thinking.ParseSuffix(tc.model).ModelName
isGLM := strings.HasPrefix(strings.ToLower(baseModel), "glm")
ctVal := gjson.GetBytes(body, "chat_template_kwargs.clear_thinking")
if !ctVal.Exists() {
t.Fatalf("expected clear_thinking field not found for GLM model, body=%s", string(body))
}
if ctVal.Bool() != false {
t.Fatalf("clear_thinking: expected false, got %v, body=%s", ctVal.Bool(), string(body))
if isGLM {
if !ctVal.Exists() {
t.Fatalf("expected clear_thinking field not found for GLM model, body=%s", string(body))
}
if ctVal.Bool() != false {
t.Fatalf("clear_thinking: expected false, got %v, body=%s", ctVal.Bool(), string(body))
}
} else if ctVal.Exists() {
t.Fatalf("expected no clear_thinking field for non-GLM enable_thinking model, body=%s", string(body))
}
}
})