diff --git a/packages/coding-agent/CHANGELOG.md b/packages/coding-agent/CHANGELOG.md index 1a64aecba..c7ae064a3 100644 --- a/packages/coding-agent/CHANGELOG.md +++ b/packages/coding-agent/CHANGELOG.md @@ -4,7 +4,6 @@ ### Fixed -- Fixed RPC mode to respect stdout backpressure while streaming events, avoiding `ENOBUFS` crashes when clients drain stdout slowly ([#4897](https://github.com/earendil-works/pi/issues/4897)). - Fixed managed npm extension updates to avoid package managers installing or resolving pi host packages as peer dependencies ([#4907](https://github.com/earendil-works/pi/issues/4907)). ## [0.75.5] - 2026-05-23 diff --git a/packages/coding-agent/docs/sdk.md b/packages/coding-agent/docs/sdk.md index a6d8974ee..8377185e3 100644 --- a/packages/coding-agent/docs/sdk.md +++ b/packages/coding-agent/docs/sdk.md @@ -81,7 +81,7 @@ interface AgentSession { followUp(text: string): Promise; // Subscribe to events (returns unsubscribe function) - subscribe(listener: (event: AgentSessionEvent) => void | Promise): () => void; + subscribe(listener: (event: AgentSessionEvent) => void): () => void; // Session info sessionFile: string | undefined; @@ -191,7 +191,7 @@ interface PromptOptions { images?: ImageContent[]; streamingBehavior?: "steer" | "followUp"; source?: InputSource; - preflightResult?: (success: boolean) => void | Promise; + preflightResult?: (success: boolean) => void; } ``` diff --git a/packages/coding-agent/src/core/agent-session.ts b/packages/coding-agent/src/core/agent-session.ts index 2c0b126ad..cf13594b8 100644 --- a/packages/coding-agent/src/core/agent-session.ts +++ b/packages/coding-agent/src/core/agent-session.ts @@ -147,7 +147,7 @@ export type AgentSessionEvent = | { type: "auto_retry_end"; success: boolean; attempt: number; finalError?: string }; /** Listener function for agent session events */ -export type AgentSessionEventListener = (event: AgentSessionEvent) => void | Promise; +export type AgentSessionEventListener = (event: AgentSessionEvent) => void; // ============================================================================ // Types @@ -202,7 +202,7 @@ export interface PromptOptions { /** Source of input for extension input event handlers. Defaults to "interactive". */ source?: InputSource; /** Internal hook used by RPC mode to observe prompt preflight acceptance or rejection. */ - preflightResult?: (success: boolean) => void | Promise; + preflightResult?: (success: boolean) => void; } /** Result from cycleModel() */ @@ -448,18 +448,14 @@ export class AgentSession { // ========================================================================= /** Emit an event to all listeners */ - private async _emit(event: AgentSessionEvent): Promise { + private _emit(event: AgentSessionEvent): void { for (const l of this._eventListeners) { - await l(event); + l(event); } } - private _emitDetached(event: AgentSessionEvent): void { - void this._emit(event); - } - - private async _emitQueueUpdate(): Promise { - await this._emit({ + private _emitQueueUpdate(): void { + this._emit({ type: "queue_update", steering: [...this._steeringMessages], followUp: [...this._followUpMessages], @@ -481,13 +477,13 @@ export class AgentSession { const steeringIndex = this._steeringMessages.indexOf(messageText); if (steeringIndex !== -1) { this._steeringMessages.splice(steeringIndex, 1); - await this._emitQueueUpdate(); + this._emitQueueUpdate(); } else { // Check follow-up queue const followUpIndex = this._followUpMessages.indexOf(messageText); if (followUpIndex !== -1) { this._followUpMessages.splice(followUpIndex, 1); - await this._emitQueueUpdate(); + this._emitQueueUpdate(); } } } @@ -497,9 +493,7 @@ export class AgentSession { await this._emitExtensionEvent(event); // Notify all listeners - await this._emit( - event.type === "agent_end" ? { ...event, willRetry: this._willRetryAfterAgentEnd(event) } : event, - ); + this._emit(event.type === "agent_end" ? { ...event, willRetry: this._willRetryAfterAgentEnd(event) } : event); // Handle session persistence if (event.type === "message_end") { @@ -534,7 +528,7 @@ export class AgentSession { // Reset retry counter immediately on successful assistant response // This prevents accumulation across multiple LLM calls within a turn if (assistantMsg.stopReason !== "error" && this._retryAttempt > 0) { - await this._emit({ + this._emit({ type: "auto_retry_end", success: true, attempt: this._retryAttempt, @@ -944,7 +938,7 @@ export class AgentSession { } if (msg.stopReason === "error" && this._retryAttempt > 0) { - await this._emit({ + this._emit({ type: "auto_retry_end", success: false, attempt: this._retryAttempt, @@ -977,7 +971,7 @@ export class AgentSession { const handled = await this._tryExecuteExtensionCommand(text); if (handled) { // Extension command executed, no prompt to send - await preflightResult?.(true); + preflightResult?.(true); return; } } @@ -992,7 +986,7 @@ export class AgentSession { options?.source ?? "interactive", ); if (inputResult.action === "handled") { - await preflightResult?.(true); + preflightResult?.(true); return; } if (inputResult.action === "transform") { @@ -1020,7 +1014,7 @@ export class AgentSession { } else { await this._queueSteer(expandedText, currentImages); } - await preflightResult?.(true); + preflightResult?.(true); return; } @@ -1105,7 +1099,7 @@ export class AgentSession { this.agent.state.systemPrompt = this._baseSystemPrompt; } } catch (error) { - await preflightResult?.(false); + preflightResult?.(false); throw error; } @@ -1113,7 +1107,7 @@ export class AgentSession { return; } - await preflightResult?.(true); + preflightResult?.(true); await this._runAgentPrompt(messages); } @@ -1223,7 +1217,7 @@ export class AgentSession { */ private async _queueSteer(text: string, images?: ImageContent[]): Promise { this._steeringMessages.push(text); - await this._emitQueueUpdate(); + this._emitQueueUpdate(); const content: (TextContent | ImageContent)[] = [{ type: "text", text }]; if (images) { content.push(...images); @@ -1240,7 +1234,7 @@ export class AgentSession { */ private async _queueFollowUp(text: string, images?: ImageContent[]): Promise { this._followUpMessages.push(text); - await this._emitQueueUpdate(); + this._emitQueueUpdate(); const content: (TextContent | ImageContent)[] = [{ type: "text", text }]; if (images) { content.push(...images); @@ -1309,8 +1303,8 @@ export class AgentSession { message.display, message.details, ); - await this._emit({ type: "message_start", message: appMessage }); - await this._emit({ type: "message_end", message: appMessage }); + this._emit({ type: "message_start", message: appMessage }); + this._emit({ type: "message_end", message: appMessage }); } } @@ -1365,7 +1359,7 @@ export class AgentSession { this._steeringMessages = []; this._followUpMessages = []; this.agent.clearAllQueues(); - void this._emitQueueUpdate(); + this._emitQueueUpdate(); return { steering, followUp }; } @@ -1528,7 +1522,7 @@ export class AgentSession { if (this.supportsThinking() || effectiveLevel !== "off") { this.settingsManager.setDefaultThinkingLevel(effectiveLevel); } - this._emitDetached({ type: "thinking_level_changed", level: effectiveLevel }); + this._emit({ type: "thinking_level_changed", level: effectiveLevel }); void this._extensionRunner.emit({ type: "thinking_level_select", level: effectiveLevel, @@ -1618,7 +1612,7 @@ export class AgentSession { this._disconnectFromAgent(); await this.abort(); this._compactionAbortController = new AbortController(); - await this._emit({ type: "compaction_start", reason: "manual" }); + this._emit({ type: "compaction_start", reason: "manual" }); try { if (!this.model) { @@ -1719,7 +1713,7 @@ export class AgentSession { tokensBefore, details, }; - await this._emit({ + this._emit({ type: "compaction_end", reason: "manual", result: compactionResult, @@ -1730,7 +1724,7 @@ export class AgentSession { } catch (error) { const message = error instanceof Error ? error.message : String(error); const aborted = message === "Compaction cancelled" || (error instanceof Error && error.name === "AbortError"); - await this._emit({ + this._emit({ type: "compaction_end", reason: "manual", result: undefined, @@ -1800,7 +1794,7 @@ export class AgentSession { // Case 1: Overflow - LLM returned context overflow error if (sameModel && isContextOverflow(assistantMessage, contextWindow)) { if (this._overflowRecoveryAttempted) { - await this._emit({ + this._emit({ type: "compaction_end", reason: "overflow", result: undefined, @@ -1857,12 +1851,12 @@ export class AgentSession { private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise { const settings = this.settingsManager.getCompactionSettings(); - await this._emit({ type: "compaction_start", reason }); + this._emit({ type: "compaction_start", reason }); this._autoCompactionAbortController = new AbortController(); try { if (!this.model) { - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -1877,7 +1871,7 @@ export class AgentSession { if (this.agent.streamFn === streamSimple) { const authResult = await this._modelRegistry.getApiKeyAndHeaders(this.model); if (!authResult.ok || !authResult.apiKey) { - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -1896,7 +1890,7 @@ export class AgentSession { const preparation = prepareCompaction(pathEntries, settings); if (!preparation) { - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -1919,7 +1913,7 @@ export class AgentSession { })) as SessionBeforeCompactResult | undefined; if (extensionResult?.cancel) { - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -1965,7 +1959,7 @@ export class AgentSession { } if (this._autoCompactionAbortController.signal.aborted) { - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -1999,7 +1993,7 @@ export class AgentSession { tokensBefore, details, }; - await this._emit({ type: "compaction_end", reason, result, aborted: false, willRetry }); + this._emit({ type: "compaction_end", reason, result, aborted: false, willRetry }); if (willRetry) { const messages = this.agent.state.messages; @@ -2015,7 +2009,7 @@ export class AgentSession { return this.agent.hasQueuedMessages(); } catch (error) { const errorMessage = error instanceof Error ? error.message : "compaction failed"; - await this._emit({ + this._emit({ type: "compaction_end", reason, result: undefined, @@ -2466,7 +2460,7 @@ export class AgentSession { const delayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1); - await this._emit({ + this._emit({ type: "auto_retry_start", attempt: this._retryAttempt, maxAttempts: settings.maxRetries, @@ -2488,7 +2482,7 @@ export class AgentSession { // Aborted during sleep - emit end event so UI can clean up const attempt = this._retryAttempt; this._retryAttempt = 0; - await this._emit({ + this._emit({ type: "auto_retry_end", success: false, attempt, @@ -2642,7 +2636,7 @@ export class AgentSession { */ setSessionName(name: string): void { this.sessionManager.appendSessionInfo(name); - this._emitDetached({ type: "session_info_changed", name: this.sessionManager.getSessionName() }); + this._emit({ type: "session_info_changed", name: this.sessionManager.getSessionName() }); } // ========================================================================= diff --git a/packages/coding-agent/src/core/output-guard.ts b/packages/coding-agent/src/core/output-guard.ts index 40857f695..c07837604 100644 --- a/packages/coding-agent/src/core/output-guard.ts +++ b/packages/coding-agent/src/core/output-guard.ts @@ -1,5 +1,3 @@ -import { once } from "node:events"; - interface StdoutTakeoverState { rawStdoutWrite: (chunk: string, callback?: (error?: Error | null) => void) => boolean; rawStderrWrite: (chunk: string, callback?: (error?: Error | null) => void) => boolean; @@ -48,11 +46,12 @@ export function isStdoutTakenOver(): boolean { return stdoutTakeoverState !== undefined; } -export async function writeRawStdout(text: string): Promise { - const canContinue = stdoutTakeoverState ? stdoutTakeoverState.rawStdoutWrite(text) : process.stdout.write(text); - if (!canContinue) { - await once(process.stdout, "drain"); +export function writeRawStdout(text: string): void { + if (stdoutTakeoverState) { + stdoutTakeoverState.rawStdoutWrite(text); + return; } + process.stdout.write(text); } export async function flushRawStdout(): Promise { diff --git a/packages/coding-agent/src/modes/print-mode.ts b/packages/coding-agent/src/modes/print-mode.ts index 7a11d0fc7..c9553c558 100644 --- a/packages/coding-agent/src/modes/print-mode.ts +++ b/packages/coding-agent/src/modes/print-mode.ts @@ -100,9 +100,9 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr }); unsubscribe?.(); - unsubscribe = session.subscribe(async (event) => { + unsubscribe = session.subscribe((event) => { if (mode === "json") { - await writeRawStdout(`${JSON.stringify(event)}\n`); + writeRawStdout(`${JSON.stringify(event)}\n`); } }); }; @@ -111,7 +111,7 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr if (mode === "json") { const header = session.sessionManager.getHeader(); if (header) { - await writeRawStdout(`${JSON.stringify(header)}\n`); + writeRawStdout(`${JSON.stringify(header)}\n`); } } @@ -137,7 +137,7 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr } else { for (const content of assistantMsg.content) { if (content.type === "text") { - await writeRawStdout(`${content.text}\n`); + writeRawStdout(`${content.text}\n`); } } } diff --git a/packages/coding-agent/src/modes/rpc/rpc-mode.ts b/packages/coding-agent/src/modes/rpc/rpc-mode.ts index 59882a381..828c9f67f 100644 --- a/packages/coding-agent/src/modes/rpc/rpc-mode.ts +++ b/packages/coding-agent/src/modes/rpc/rpc-mode.ts @@ -50,14 +50,8 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise void) | undefined; - const output = async (obj: RpcResponse | RpcExtensionUIRequest | object): Promise => { - await writeRawStdout(serializeJsonLine(obj)); - }; - - const outputDetached = (obj: RpcResponse | RpcExtensionUIRequest | object): void => { - void output(obj).catch((err: unknown) => { - process.stderr.write(`RPC output failed: ${err instanceof Error ? err.message : String(err)}\n`); - }); + const output = (obj: RpcResponse | RpcExtensionUIRequest | object) => { + writeRawStdout(serializeJsonLine(obj)); }; const success = ( @@ -87,30 +81,28 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise void> = []; /** Helper for dialog methods with signal/timeout support */ - async function createDialogPromise( + function createDialogPromise( opts: ExtensionUIDialogOptions | undefined, defaultValue: T, request: Record, parseResponse: (response: RpcExtensionUIResponse) => T, ): Promise { - if (opts?.signal?.aborted) return defaultValue; + if (opts?.signal?.aborted) return Promise.resolve(defaultValue); const id = crypto.randomUUID(); - let cleanup = () => {}; - const responsePromise = new Promise((resolve, reject) => { + return new Promise((resolve, reject) => { let timeoutId: ReturnType | undefined; - const onAbort = () => { - cleanup(); - resolve(defaultValue); - }; - - cleanup = () => { + const cleanup = () => { if (timeoutId) clearTimeout(timeoutId); opts?.signal?.removeEventListener("abort", onAbort); pendingExtensionRequests.delete(id); }; + const onAbort = () => { + cleanup(); + resolve(defaultValue); + }; opts?.signal?.addEventListener("abort", onAbort, { once: true }); if (opts?.timeout) { @@ -125,20 +117,10 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { - cleanup(); - reject(error); - }, + reject, }); + output({ type: "extension_ui_request", id, ...request } as RpcExtensionUIRequest); }); - - try { - await output({ type: "extension_ui_request", id, ...request } as RpcExtensionUIRequest); - } catch (err) { - cleanup(); - throw err; - } - return await responsePromise; } /** @@ -162,7 +144,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { const id = crypto.randomUUID(); - let cleanup = () => {}; - const responsePromise = new Promise((resolve, reject) => { - cleanup = () => { - pendingExtensionRequests.delete(id); - }; + return new Promise((resolve, reject) => { pendingExtensionRequests.set(id, { resolve: (response: RpcExtensionUIResponse) => { - cleanup(); if ("cancelled" in response && response.cancelled) { resolve(undefined); } else if ("value" in response) { @@ -280,25 +257,10 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { - cleanup(); - reject(error); - }, + reject, }); + output({ type: "extension_ui_request", id, method: "editor", title, prefill } as RpcExtensionUIRequest); }); - try { - await output({ - type: "extension_ui_request", - id, - method: "editor", - title, - prefill, - } as RpcExtensionUIRequest); - } catch (err) { - cleanup(); - throw err; - } - return await responsePromise; }, addAutocompleteProvider(): void { @@ -376,18 +338,13 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { - outputDetached({ - type: "extension_error", - extensionPath: err.extensionPath, - event: err.event, - error: err.error, - }); + output({ type: "extension_error", extensionPath: err.extensionPath, event: err.event, error: err.error }); }, }); unsubscribe?.(); - unsubscribe = session.subscribe(async (event) => { - await output(event); + unsubscribe = session.subscribe((event) => { + output(event); }); }; @@ -428,17 +385,16 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { + preflightResult: (didSucceed) => { if (didSucceed) { - await output(success(id, "prompt")); preflightSucceeded = true; + output(success(id, "prompt")); } }, }) - .catch((err: unknown) => { + .catch((e) => { if (!preflightSucceeded) { - const message = err instanceof Error ? err.message : String(err); - outputDetached(error(id, "prompt", message)); + output(error(id, "prompt", e.message)); } }); return undefined; @@ -734,7 +690,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise { const detachJsonl = attachJsonlLineReader(process.stdin, (line) => { - void handleInputLine(line).catch((err: unknown) => { - process.stderr.write(`RPC command handling failed: ${err instanceof Error ? err.message : String(err)}\n`); - }); + void handleInputLine(line); }); return () => { detachJsonl();