revert: fix rpc stdout backpressure

This reverts commit d0d1d8edca.
This commit is contained in:
Armin Ronacher
2026-05-24 11:50:53 +02:00
Unverified
parent d0d1d8edca
commit 9600ded922
6 changed files with 79 additions and 133 deletions
-1
View File
@@ -4,7 +4,6 @@
### Fixed
- Fixed RPC mode to respect stdout backpressure while streaming events, avoiding `ENOBUFS` crashes when clients drain stdout slowly ([#4897](https://github.com/earendil-works/pi/issues/4897)).
- Fixed managed npm extension updates to avoid package managers installing or resolving pi host packages as peer dependencies ([#4907](https://github.com/earendil-works/pi/issues/4907)).
## [0.75.5] - 2026-05-23
+2 -2
View File
@@ -81,7 +81,7 @@ interface AgentSession {
followUp(text: string): Promise<void>;
// Subscribe to events (returns unsubscribe function)
subscribe(listener: (event: AgentSessionEvent) => void | Promise<void>): () => void;
subscribe(listener: (event: AgentSessionEvent) => void): () => void;
// Session info
sessionFile: string | undefined;
@@ -191,7 +191,7 @@ interface PromptOptions {
images?: ImageContent[];
streamingBehavior?: "steer" | "followUp";
source?: InputSource;
preflightResult?: (success: boolean) => void | Promise<void>;
preflightResult?: (success: boolean) => void;
}
```
+37 -43
View File
@@ -147,7 +147,7 @@ export type AgentSessionEvent =
| { type: "auto_retry_end"; success: boolean; attempt: number; finalError?: string };
/** Listener function for agent session events */
export type AgentSessionEventListener = (event: AgentSessionEvent) => void | Promise<void>;
export type AgentSessionEventListener = (event: AgentSessionEvent) => void;
// ============================================================================
// Types
@@ -202,7 +202,7 @@ export interface PromptOptions {
/** Source of input for extension input event handlers. Defaults to "interactive". */
source?: InputSource;
/** Internal hook used by RPC mode to observe prompt preflight acceptance or rejection. */
preflightResult?: (success: boolean) => void | Promise<void>;
preflightResult?: (success: boolean) => void;
}
/** Result from cycleModel() */
@@ -448,18 +448,14 @@ export class AgentSession {
// =========================================================================
/** Emit an event to all listeners */
private async _emit(event: AgentSessionEvent): Promise<void> {
private _emit(event: AgentSessionEvent): void {
for (const l of this._eventListeners) {
await l(event);
l(event);
}
}
private _emitDetached(event: AgentSessionEvent): void {
void this._emit(event);
}
private async _emitQueueUpdate(): Promise<void> {
await this._emit({
private _emitQueueUpdate(): void {
this._emit({
type: "queue_update",
steering: [...this._steeringMessages],
followUp: [...this._followUpMessages],
@@ -481,13 +477,13 @@ export class AgentSession {
const steeringIndex = this._steeringMessages.indexOf(messageText);
if (steeringIndex !== -1) {
this._steeringMessages.splice(steeringIndex, 1);
await this._emitQueueUpdate();
this._emitQueueUpdate();
} else {
// Check follow-up queue
const followUpIndex = this._followUpMessages.indexOf(messageText);
if (followUpIndex !== -1) {
this._followUpMessages.splice(followUpIndex, 1);
await this._emitQueueUpdate();
this._emitQueueUpdate();
}
}
}
@@ -497,9 +493,7 @@ export class AgentSession {
await this._emitExtensionEvent(event);
// Notify all listeners
await this._emit(
event.type === "agent_end" ? { ...event, willRetry: this._willRetryAfterAgentEnd(event) } : event,
);
this._emit(event.type === "agent_end" ? { ...event, willRetry: this._willRetryAfterAgentEnd(event) } : event);
// Handle session persistence
if (event.type === "message_end") {
@@ -534,7 +528,7 @@ export class AgentSession {
// Reset retry counter immediately on successful assistant response
// This prevents accumulation across multiple LLM calls within a turn
if (assistantMsg.stopReason !== "error" && this._retryAttempt > 0) {
await this._emit({
this._emit({
type: "auto_retry_end",
success: true,
attempt: this._retryAttempt,
@@ -944,7 +938,7 @@ export class AgentSession {
}
if (msg.stopReason === "error" && this._retryAttempt > 0) {
await this._emit({
this._emit({
type: "auto_retry_end",
success: false,
attempt: this._retryAttempt,
@@ -977,7 +971,7 @@ export class AgentSession {
const handled = await this._tryExecuteExtensionCommand(text);
if (handled) {
// Extension command executed, no prompt to send
await preflightResult?.(true);
preflightResult?.(true);
return;
}
}
@@ -992,7 +986,7 @@ export class AgentSession {
options?.source ?? "interactive",
);
if (inputResult.action === "handled") {
await preflightResult?.(true);
preflightResult?.(true);
return;
}
if (inputResult.action === "transform") {
@@ -1020,7 +1014,7 @@ export class AgentSession {
} else {
await this._queueSteer(expandedText, currentImages);
}
await preflightResult?.(true);
preflightResult?.(true);
return;
}
@@ -1105,7 +1099,7 @@ export class AgentSession {
this.agent.state.systemPrompt = this._baseSystemPrompt;
}
} catch (error) {
await preflightResult?.(false);
preflightResult?.(false);
throw error;
}
@@ -1113,7 +1107,7 @@ export class AgentSession {
return;
}
await preflightResult?.(true);
preflightResult?.(true);
await this._runAgentPrompt(messages);
}
@@ -1223,7 +1217,7 @@ export class AgentSession {
*/
private async _queueSteer(text: string, images?: ImageContent[]): Promise<void> {
this._steeringMessages.push(text);
await this._emitQueueUpdate();
this._emitQueueUpdate();
const content: (TextContent | ImageContent)[] = [{ type: "text", text }];
if (images) {
content.push(...images);
@@ -1240,7 +1234,7 @@ export class AgentSession {
*/
private async _queueFollowUp(text: string, images?: ImageContent[]): Promise<void> {
this._followUpMessages.push(text);
await this._emitQueueUpdate();
this._emitQueueUpdate();
const content: (TextContent | ImageContent)[] = [{ type: "text", text }];
if (images) {
content.push(...images);
@@ -1309,8 +1303,8 @@ export class AgentSession {
message.display,
message.details,
);
await this._emit({ type: "message_start", message: appMessage });
await this._emit({ type: "message_end", message: appMessage });
this._emit({ type: "message_start", message: appMessage });
this._emit({ type: "message_end", message: appMessage });
}
}
@@ -1365,7 +1359,7 @@ export class AgentSession {
this._steeringMessages = [];
this._followUpMessages = [];
this.agent.clearAllQueues();
void this._emitQueueUpdate();
this._emitQueueUpdate();
return { steering, followUp };
}
@@ -1528,7 +1522,7 @@ export class AgentSession {
if (this.supportsThinking() || effectiveLevel !== "off") {
this.settingsManager.setDefaultThinkingLevel(effectiveLevel);
}
this._emitDetached({ type: "thinking_level_changed", level: effectiveLevel });
this._emit({ type: "thinking_level_changed", level: effectiveLevel });
void this._extensionRunner.emit({
type: "thinking_level_select",
level: effectiveLevel,
@@ -1618,7 +1612,7 @@ export class AgentSession {
this._disconnectFromAgent();
await this.abort();
this._compactionAbortController = new AbortController();
await this._emit({ type: "compaction_start", reason: "manual" });
this._emit({ type: "compaction_start", reason: "manual" });
try {
if (!this.model) {
@@ -1719,7 +1713,7 @@ export class AgentSession {
tokensBefore,
details,
};
await this._emit({
this._emit({
type: "compaction_end",
reason: "manual",
result: compactionResult,
@@ -1730,7 +1724,7 @@ export class AgentSession {
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
const aborted = message === "Compaction cancelled" || (error instanceof Error && error.name === "AbortError");
await this._emit({
this._emit({
type: "compaction_end",
reason: "manual",
result: undefined,
@@ -1800,7 +1794,7 @@ export class AgentSession {
// Case 1: Overflow - LLM returned context overflow error
if (sameModel && isContextOverflow(assistantMessage, contextWindow)) {
if (this._overflowRecoveryAttempted) {
await this._emit({
this._emit({
type: "compaction_end",
reason: "overflow",
result: undefined,
@@ -1857,12 +1851,12 @@ export class AgentSession {
private async _runAutoCompaction(reason: "overflow" | "threshold", willRetry: boolean): Promise<boolean> {
const settings = this.settingsManager.getCompactionSettings();
await this._emit({ type: "compaction_start", reason });
this._emit({ type: "compaction_start", reason });
this._autoCompactionAbortController = new AbortController();
try {
if (!this.model) {
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -1877,7 +1871,7 @@ export class AgentSession {
if (this.agent.streamFn === streamSimple) {
const authResult = await this._modelRegistry.getApiKeyAndHeaders(this.model);
if (!authResult.ok || !authResult.apiKey) {
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -1896,7 +1890,7 @@ export class AgentSession {
const preparation = prepareCompaction(pathEntries, settings);
if (!preparation) {
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -1919,7 +1913,7 @@ export class AgentSession {
})) as SessionBeforeCompactResult | undefined;
if (extensionResult?.cancel) {
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -1965,7 +1959,7 @@ export class AgentSession {
}
if (this._autoCompactionAbortController.signal.aborted) {
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -1999,7 +1993,7 @@ export class AgentSession {
tokensBefore,
details,
};
await this._emit({ type: "compaction_end", reason, result, aborted: false, willRetry });
this._emit({ type: "compaction_end", reason, result, aborted: false, willRetry });
if (willRetry) {
const messages = this.agent.state.messages;
@@ -2015,7 +2009,7 @@ export class AgentSession {
return this.agent.hasQueuedMessages();
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "compaction failed";
await this._emit({
this._emit({
type: "compaction_end",
reason,
result: undefined,
@@ -2466,7 +2460,7 @@ export class AgentSession {
const delayMs = settings.baseDelayMs * 2 ** (this._retryAttempt - 1);
await this._emit({
this._emit({
type: "auto_retry_start",
attempt: this._retryAttempt,
maxAttempts: settings.maxRetries,
@@ -2488,7 +2482,7 @@ export class AgentSession {
// Aborted during sleep - emit end event so UI can clean up
const attempt = this._retryAttempt;
this._retryAttempt = 0;
await this._emit({
this._emit({
type: "auto_retry_end",
success: false,
attempt,
@@ -2642,7 +2636,7 @@ export class AgentSession {
*/
setSessionName(name: string): void {
this.sessionManager.appendSessionInfo(name);
this._emitDetached({ type: "session_info_changed", name: this.sessionManager.getSessionName() });
this._emit({ type: "session_info_changed", name: this.sessionManager.getSessionName() });
}
// =========================================================================
@@ -1,5 +1,3 @@
import { once } from "node:events";
interface StdoutTakeoverState {
rawStdoutWrite: (chunk: string, callback?: (error?: Error | null) => void) => boolean;
rawStderrWrite: (chunk: string, callback?: (error?: Error | null) => void) => boolean;
@@ -48,11 +46,12 @@ export function isStdoutTakenOver(): boolean {
return stdoutTakeoverState !== undefined;
}
export async function writeRawStdout(text: string): Promise<void> {
const canContinue = stdoutTakeoverState ? stdoutTakeoverState.rawStdoutWrite(text) : process.stdout.write(text);
if (!canContinue) {
await once(process.stdout, "drain");
export function writeRawStdout(text: string): void {
if (stdoutTakeoverState) {
stdoutTakeoverState.rawStdoutWrite(text);
return;
}
process.stdout.write(text);
}
export async function flushRawStdout(): Promise<void> {
@@ -100,9 +100,9 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr
});
unsubscribe?.();
unsubscribe = session.subscribe(async (event) => {
unsubscribe = session.subscribe((event) => {
if (mode === "json") {
await writeRawStdout(`${JSON.stringify(event)}\n`);
writeRawStdout(`${JSON.stringify(event)}\n`);
}
});
};
@@ -111,7 +111,7 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr
if (mode === "json") {
const header = session.sessionManager.getHeader();
if (header) {
await writeRawStdout(`${JSON.stringify(header)}\n`);
writeRawStdout(`${JSON.stringify(header)}\n`);
}
}
@@ -137,7 +137,7 @@ export async function runPrintMode(runtimeHost: AgentSessionRuntime, options: Pr
} else {
for (const content of assistantMsg.content) {
if (content.type === "text") {
await writeRawStdout(`${content.text}\n`);
writeRawStdout(`${content.text}\n`);
}
}
}
+31 -77
View File
@@ -50,14 +50,8 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
let session = runtimeHost.session;
let unsubscribe: (() => void) | undefined;
const output = async (obj: RpcResponse | RpcExtensionUIRequest | object): Promise<void> => {
await writeRawStdout(serializeJsonLine(obj));
};
const outputDetached = (obj: RpcResponse | RpcExtensionUIRequest | object): void => {
void output(obj).catch((err: unknown) => {
process.stderr.write(`RPC output failed: ${err instanceof Error ? err.message : String(err)}\n`);
});
const output = (obj: RpcResponse | RpcExtensionUIRequest | object) => {
writeRawStdout(serializeJsonLine(obj));
};
const success = <T extends RpcCommand["type"]>(
@@ -87,30 +81,28 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
const signalCleanupHandlers: Array<() => void> = [];
/** Helper for dialog methods with signal/timeout support */
async function createDialogPromise<T>(
function createDialogPromise<T>(
opts: ExtensionUIDialogOptions | undefined,
defaultValue: T,
request: Record<string, unknown>,
parseResponse: (response: RpcExtensionUIResponse) => T,
): Promise<T> {
if (opts?.signal?.aborted) return defaultValue;
if (opts?.signal?.aborted) return Promise.resolve(defaultValue);
const id = crypto.randomUUID();
let cleanup = () => {};
const responsePromise = new Promise<T>((resolve, reject) => {
return new Promise((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout> | undefined;
const onAbort = () => {
cleanup();
resolve(defaultValue);
};
cleanup = () => {
const cleanup = () => {
if (timeoutId) clearTimeout(timeoutId);
opts?.signal?.removeEventListener("abort", onAbort);
pendingExtensionRequests.delete(id);
};
const onAbort = () => {
cleanup();
resolve(defaultValue);
};
opts?.signal?.addEventListener("abort", onAbort, { once: true });
if (opts?.timeout) {
@@ -125,20 +117,10 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
cleanup();
resolve(parseResponse(response));
},
reject: (error) => {
cleanup();
reject(error);
},
reject,
});
output({ type: "extension_ui_request", id, ...request } as RpcExtensionUIRequest);
});
try {
await output({ type: "extension_ui_request", id, ...request } as RpcExtensionUIRequest);
} catch (err) {
cleanup();
throw err;
}
return await responsePromise;
}
/**
@@ -162,7 +144,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
notify(message: string, type?: "info" | "warning" | "error"): void {
// Fire and forget - no response needed
outputDetached({
output({
type: "extension_ui_request",
id: crypto.randomUUID(),
method: "notify",
@@ -178,7 +160,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
setStatus(key: string, text: string | undefined): void {
// Fire and forget - no response needed
outputDetached({
output({
type: "extension_ui_request",
id: crypto.randomUUID(),
method: "setStatus",
@@ -206,7 +188,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
setWidget(key: string, content: unknown, options?: ExtensionWidgetOptions): void {
// Only support string arrays in RPC mode - factory functions are ignored
if (content === undefined || Array.isArray(content)) {
outputDetached({
output({
type: "extension_ui_request",
id: crypto.randomUUID(),
method: "setWidget",
@@ -228,7 +210,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
setTitle(title: string): void {
// Fire and forget - host can implement terminal title control
outputDetached({
output({
type: "extension_ui_request",
id: crypto.randomUUID(),
method: "setTitle",
@@ -248,7 +230,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
setEditorText(text: string): void {
// Fire and forget - host can implement editor control
outputDetached({
output({
type: "extension_ui_request",
id: crypto.randomUUID(),
method: "set_editor_text",
@@ -264,14 +246,9 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
async editor(title: string, prefill?: string): Promise<string | undefined> {
const id = crypto.randomUUID();
let cleanup = () => {};
const responsePromise = new Promise<string | undefined>((resolve, reject) => {
cleanup = () => {
pendingExtensionRequests.delete(id);
};
return new Promise((resolve, reject) => {
pendingExtensionRequests.set(id, {
resolve: (response: RpcExtensionUIResponse) => {
cleanup();
if ("cancelled" in response && response.cancelled) {
resolve(undefined);
} else if ("value" in response) {
@@ -280,25 +257,10 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
resolve(undefined);
}
},
reject: (error) => {
cleanup();
reject(error);
},
reject,
});
output({ type: "extension_ui_request", id, method: "editor", title, prefill } as RpcExtensionUIRequest);
});
try {
await output({
type: "extension_ui_request",
id,
method: "editor",
title,
prefill,
} as RpcExtensionUIRequest);
} catch (err) {
cleanup();
throw err;
}
return await responsePromise;
},
addAutocompleteProvider(): void {
@@ -376,18 +338,13 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
shutdownRequested = true;
},
onError: (err) => {
outputDetached({
type: "extension_error",
extensionPath: err.extensionPath,
event: err.event,
error: err.error,
});
output({ type: "extension_error", extensionPath: err.extensionPath, event: err.event, error: err.error });
},
});
unsubscribe?.();
unsubscribe = session.subscribe(async (event) => {
await output(event);
unsubscribe = session.subscribe((event) => {
output(event);
});
};
@@ -428,17 +385,16 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
images: command.images,
streamingBehavior: command.streamingBehavior,
source: "rpc",
preflightResult: async (didSucceed) => {
preflightResult: (didSucceed) => {
if (didSucceed) {
await output(success(id, "prompt"));
preflightSucceeded = true;
output(success(id, "prompt"));
}
},
})
.catch((err: unknown) => {
.catch((e) => {
if (!preflightSucceeded) {
const message = err instanceof Error ? err.message : String(err);
outputDetached(error(id, "prompt", message));
output(error(id, "prompt", e.message));
}
});
return undefined;
@@ -734,7 +690,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
try {
parsed = JSON.parse(line);
} catch (parseError: unknown) {
await output(
output(
error(
undefined,
"parse",
@@ -764,11 +720,11 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
try {
const response = await handleCommand(command);
if (response) {
await output(response);
output(response);
}
await checkShutdownRequested();
} catch (commandError: unknown) {
await output(
output(
error(
command.id,
command.type,
@@ -785,9 +741,7 @@ export async function runRpcMode(runtimeHost: AgentSessionRuntime): Promise<neve
detachInput = (() => {
const detachJsonl = attachJsonlLineReader(process.stdin, (line) => {
void handleInputLine(line).catch((err: unknown) => {
process.stderr.write(`RPC command handling failed: ${err instanceof Error ? err.message : String(err)}\n`);
});
void handleInputLine(line);
});
return () => {
detachJsonl();