feat(gemini): Implement pseudo-streaming and improve context reuse

This commit introduces two major enhancements to the Gemini Web client to improve user experience and conversation continuity.

First, it implements a pseudo-streaming mechanism for non-code mode. The Gemini Web API returns the full response at once in this mode, leading to a poor user experience with a long wait for output. This change splits the full response into smaller chunks and sends them with an 80ms delay, simulating a real-time streaming effect.

Second, the conversation context reuse logic is now more robust. A fallback mechanism has been added to reuse conversation metadata when a clear continuation context is detected (e.g., a user replies to an assistant's turn). This improves conversational flow. Metadata lookups have also been improved to check both the canonical model key and its alias for better compatibility.
This commit is contained in:
hkfires
2025-09-18 11:22:56 +08:00
parent 580ec737d3
commit 1b2e3dc7af

View File

@@ -263,8 +263,32 @@ func (c *GeminiWebClient) prepareChat(ctx context.Context, modelName string, raw
useMsgs = remaining
}
} else {
meta = nil
useMsgs = cleaned
// Fallback: only when there is clear continuation context.
// Require at least two messages and the previous turn is assistant.
if len(cleaned) >= 2 && strings.EqualFold(cleaned[len(cleaned)-2].Role, "assistant") {
// Prefer canonical (underlying) model key; fall back to alias key for backward-compatibility.
keyUnderlying := geminiWeb.AccountMetaKey(c.GetEmail(), res.underlying)
keyAlias := geminiWeb.AccountMetaKey(c.GetEmail(), modelName)
c.convMutex.RLock()
fallbackMeta := c.convStore[keyUnderlying]
if len(fallbackMeta) == 0 {
fallbackMeta = c.convStore[keyAlias]
}
c.convMutex.RUnlock()
if len(fallbackMeta) > 0 {
meta = fallbackMeta
// Only send the newest user message as continuation.
useMsgs = []geminiWeb.RoleText{cleaned[len(cleaned)-1]}
res.reuse = true
} else {
meta = nil
useMsgs = cleaned
}
} else {
// No safe continuation context detected; do not reuse metadata.
meta = nil
useMsgs = cleaned
}
}
res.tagged = geminiWeb.NeedRoleTags(useMsgs)
if res.reuse && len(useMsgs) == 1 {
@@ -290,9 +314,16 @@ func (c *GeminiWebClient) prepareChat(ctx context.Context, modelName string, raw
}
res.metaLen = len(meta)
} else {
key := geminiWeb.AccountMetaKey(c.GetEmail(), modelName)
// Context reuse disabled: use account-level metadata if present.
// Check both canonical model and alias for compatibility.
keyUnderlying := geminiWeb.AccountMetaKey(c.GetEmail(), res.underlying)
keyAlias := geminiWeb.AccountMetaKey(c.GetEmail(), modelName)
c.convMutex.RLock()
meta = c.convStore[key]
if v, ok := c.convStore[keyUnderlying]; ok && len(v) > 0 {
meta = v
} else {
meta = c.convStore[keyAlias]
}
c.convMutex.RUnlock()
useMsgs = cleaned
res.tagged = geminiWeb.NeedRoleTags(useMsgs)
@@ -448,6 +479,38 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
root := gjson.ParseBytes(gemBytes)
parts := root.Get("candidates.0.content.parts")
if parts.Exists() && parts.IsArray() {
// Non code-mode: perform pseudo streaming by splitting text into small chunks
if !c.cfg.GeminiWeb.CodeMode {
chunkSize := 40
fr := strings.ToUpper(root.Get("candidates.0.finishReason").String())
units := make([][]byte, 0, 16)
units = append(units, buildPseudoUnits(gemBytes, true, chunkSize, false)...)
other := buildPseudoUnits(gemBytes, false, chunkSize, false)
if len(other) > 0 && fr != "" {
if updated, err := sjson.SetBytes(other[len(other)-1], "candidates.0.finishReason", fr); err == nil {
other[len(other)-1] = updated
}
}
units = append(units, other...)
for _, u := range units {
lines := translator.Response(prep.handlerType, c.Type(), newCtx, modelName, original, prep.translatedRaw, u, &param)
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// 80ms interval between pseudo chunks
time.Sleep(80 * time.Millisecond)
}
}
}
// translator-level done signal
done := translator.Response(prep.handlerType, c.Type(), newCtx, modelName, original, prep.translatedRaw, []byte("[DONE]"), &param)
for _, l := range done {
if l != "" {
dataChan <- []byte(l)
}
}
return
}
var thoughtArr, otherArr strings.Builder
thoughtCount := 0
thoughtArr.WriteByte('[')
@@ -485,6 +548,10 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// Apply 80ms delay between pseudo chunks in non-code mode
if !c.cfg.GeminiWeb.CodeMode {
time.Sleep(80 * time.Millisecond)
}
}
}
} else {
@@ -492,6 +559,10 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// Apply 80ms delay between pseudo chunks in non-code mode
if !c.cfg.GeminiWeb.CodeMode {
time.Sleep(80 * time.Millisecond)
}
}
}
}
@@ -504,6 +575,10 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// Apply 80ms delay between pseudo chunks in non-code mode
if !c.cfg.GeminiWeb.CodeMode {
time.Sleep(80 * time.Millisecond)
}
}
}
} else {
@@ -511,6 +586,10 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// Apply 80ms delay between pseudo chunks in non-code mode
if !c.cfg.GeminiWeb.CodeMode {
time.Sleep(80 * time.Millisecond)
}
}
}
}
@@ -537,6 +616,10 @@ func (c *GeminiWebClient) SendRawMessageStream(ctx context.Context, modelName st
for _, l := range lines {
if l != "" {
dataChan <- []byte(l)
// Apply 80ms delay between pseudo chunks in non-code mode
if !c.cfg.GeminiWeb.CodeMode {
time.Sleep(80 * time.Millisecond)
}
}
}
}
@@ -589,7 +672,8 @@ func (c *GeminiWebClient) handleSendSuccess(ctx context.Context, prep *chatPrep,
if output != nil {
metaAfter := prep.chat.Metadata()
if len(metaAfter) > 0 {
key := geminiWeb.AccountMetaKey(c.GetEmail(), modelName)
// Store under canonical (underlying) model key for stability across aliases.
key := geminiWeb.AccountMetaKey(c.GetEmail(), prep.underlying)
c.convMutex.Lock()
c.convStore[key] = metaAfter
snapshot := c.convStore
@@ -743,8 +827,6 @@ func (c *GeminiWebClient) startCookiePersist() {
if c.gwc != nil && c.gwc.Cookies != nil {
if err := c.SaveTokenToFile(); err != nil {
log.Errorf("Failed to persist cookies sidecar for %s: %v", c.GetEmail(), err)
} else {
log.Debugf("Persisted cookies sidecar for %s", c.GetEmail())
}
}
}
@@ -770,6 +852,153 @@ func (c *GeminiWebClient) GetRequestMutex() *sync.Mutex { return nil }
func (c *GeminiWebClient) RefreshTokens(ctx context.Context) error { return c.Init() }
// runeChunks splits a string into rune-safe chunks of roughly the given size.
// It preserves UTF-8 boundaries to avoid breaking characters mid-sequence.
func runeChunks(s string, size int) []string {
if size <= 0 || len(s) == 0 {
return []string{s}
}
var chunks []string
var b strings.Builder
count := 0
for _, r := range s {
b.WriteRune(r)
count++
if count >= size {
chunks = append(chunks, b.String())
b.Reset()
count = 0
}
}
if b.Len() > 0 {
chunks = append(chunks, b.String())
}
if len(chunks) == 0 {
return []string{""}
}
return chunks
}
// splitCodeBlocks splits text by triple backtick code fences, marking code blocks.
type textBlock struct {
text string
isCode bool
}
func splitCodeBlocks(s string) []textBlock {
var blocks []textBlock
for {
start := strings.Index(s, "```")
if start == -1 {
if s != "" {
blocks = append(blocks, textBlock{text: s, isCode: false})
}
break
}
// prepend plain text before code block
if start > 0 {
blocks = append(blocks, textBlock{text: s[:start], isCode: false})
}
s = s[start+3:]
end := strings.Index(s, "```")
if end == -1 {
// unmatched fence, treat rest as code
blocks = append(blocks, textBlock{text: s, isCode: true})
break
}
code := s[:end]
blocks = append(blocks, textBlock{text: code, isCode: true})
s = s[end+3:]
}
return blocks
}
// buildPseudoUnits constructs a series of Gemini JSON payloads that each contain
// a small portion of the original response's parts. When thoughtOnly is true,
// it chunks only reasoning text; otherwise it chunks visible text and forwards
// functionCall parts as separate units. All generated units have finishReason removed.
func buildPseudoUnits(gemBytes []byte, thoughtOnly bool, chunkSize int, _ bool) [][]byte {
base := gemBytes
base, _ = sjson.DeleteBytes(base, "candidates.0.finishReason")
setParts := func(partsRaw string) []byte {
s, _ := sjson.SetRawBytes(base, "candidates.0.content.parts", []byte(partsRaw))
return s
}
parts := gjson.GetBytes(gemBytes, "candidates.0.content.parts")
var units [][]byte
if thoughtOnly {
var buf strings.Builder
parts.ForEach(func(_, p gjson.Result) bool {
if p.Get("thought").Bool() {
if t := p.Get("text"); t.Exists() {
buf.WriteString(t.String())
}
}
return true
})
if buf.Len() > 0 {
// Chunk by runes to preserve exact formatting (including newlines)
segs := runeChunks(buf.String(), chunkSize)
for _, piece := range segs {
obj := map[string]any{"text": piece, "thought": true}
arr, _ := json.Marshal([]map[string]any{obj})
units = append(units, setParts(string(arr)))
}
}
return units
}
// Non-thought: chunk visible text semantically and forward functionCall parts in order
flushText := func(sb *strings.Builder) {
if sb.Len() == 0 {
return
}
s := sb.String()
// Preserve code fences as whole blocks; otherwise chunk by runes
blocks := splitCodeBlocks(s)
for _, blk := range blocks {
if blk.isCode {
obj := map[string]any{"text": "```" + blk.text + "```"}
arr, _ := json.Marshal([]map[string]any{obj})
units = append(units, setParts(string(arr)))
continue
}
for _, piece := range runeChunks(blk.text, chunkSize) {
if piece == "" {
continue
}
obj := map[string]any{"text": piece}
arr, _ := json.Marshal([]map[string]any{obj})
units = append(units, setParts(string(arr)))
}
}
sb.Reset()
}
var textBuf strings.Builder
parts.ForEach(func(_, p gjson.Result) bool {
if p.Get("thought").Bool() {
return true
}
if fc := p.Get("functionCall"); fc.Exists() {
flushText(&textBuf)
units = append(units, setParts("["+fc.Raw+"]"))
return true
}
if t := p.Get("text"); t.Exists() {
textBuf.WriteString(t.String())
return true
}
// Unknown part: forward as its own unit
flushText(&textBuf)
units = append(units, setParts("["+p.Raw+"]"))
return true
})
flushText(&textBuf)
return units
}
func (c *GeminiWebClient) backgroundInitRetry() {
backoffs := []time.Duration{5 * time.Second, 10 * time.Second, 30 * time.Second, 1 * time.Minute, 2 * time.Minute, 5 * time.Minute}
i := 0