Python: Fix DevUI streaming memory growth regression (#6038)

* Fix DevUI streaming memory growth regression

Bounds retained streaming/debug state in DevUI and strengthens browser regression coverage for long streamed responses.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Address DevUI memory review feedback

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix DevUI bundle trailing whitespace

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-05-27 09:48:29 +02:00
committed by GitHub
Unverified
parent e1e6e3d35e
commit 3242d8a4c4
8 changed files with 259 additions and 512 deletions
File diff suppressed because one or more lines are too long
@@ -40,11 +40,22 @@ import type {
ExtendedResponseStreamEvent,
} from "@/types";
import { useDevUIStore } from "@/stores";
import { loadStreamingState } from "@/services/streaming-state";
import { loadStreamingState, type StreamingState } from "@/services/streaming-state";
type DebugEventHandler = (event: ExtendedResponseStreamEvent | "clear") => void;
const ASSISTANT_TEXT_RENDER_INTERVAL_MS = 50;
const STREAMING_PREVIEW_PREFIX = "[Earlier streaming content omitted after refresh]\n\n";
function getRestoredStreamingText(state: StreamingState): string {
if (!state.accumulatedText) {
return "";
}
return state.accumulatedTextIsPreview
? `${STREAMING_PREVIEW_PREFIX}${state.accumulatedText}`
: state.accumulatedText;
}
interface AgentViewProps {
selectedAgent: AgentInfo;
@@ -683,13 +694,14 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
const state = loadStreamingState(mostRecent.id);
if (state && !state.completed) {
accumulatedTextRef.current = state.accumulatedText || "";
const restoredText = getRestoredStreamingText(state);
accumulatedTextRef.current = restoredText;
// Add assistant message with resumed text
const assistantMsg: import("@/types/openai").ConversationMessage = {
id: state.lastMessageId || `assistant-${Date.now()}`,
type: "message",
role: "assistant",
content: state.accumulatedText ? [{ type: "text", text: state.accumulatedText }] : [],
content: restoredText ? [{ type: "text", text: restoredText }] : [],
status: "in_progress",
};
setChatItems([...allItems as import("@/types/openai").ConversationItem[], assistantMsg]);
@@ -988,13 +1000,14 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
// Check for incomplete stream and restore accumulated text
const state = loadStreamingState(conversationId);
if (state?.accumulatedText) {
accumulatedTextRef.current = state.accumulatedText;
const restoredText = getRestoredStreamingText(state);
accumulatedTextRef.current = restoredText;
// Add assistant message with resumed text - streaming will continue automatically
const assistantMsg: import("@/types/openai").ConversationMessage = {
id: `assistant-${Date.now()}`,
type: "message",
role: "assistant",
content: [{ type: "output_text", text: state.accumulatedText }],
content: [{ type: "output_text", text: restoredText }],
status: "in_progress",
};
setChatItems([...items, assistantMsg]);
@@ -49,6 +49,28 @@ interface WorkflowViewProps {
onDebugEvent: DebugEventHandler;
}
function getWorkflowEventTimestamp(event: ExtendedResponseStreamEvent): number | undefined {
if ("created_at" in event && typeof event.created_at === "number" && event.created_at) {
return event.created_at;
}
const response = "response" in event ? event.response : undefined;
if (response && typeof response === "object" && "created_at" in response) {
const createdAt = response.created_at;
if (typeof createdAt === "number") {
return createdAt;
}
}
const data = "data" in event ? event.data : undefined;
if (data && typeof data === "object" && "timestamp" in data && typeof data.timestamp === "string") {
const milliseconds = new Date(data.timestamp).getTime();
return Number.isFinite(milliseconds) ? milliseconds / 1000 : undefined;
}
return undefined;
}
// TODO: CheckpointSelector is not currently used but may be needed for checkpoint resumption feature
// Smart Run Workflow Button Component moved to separate file
@@ -581,20 +603,7 @@ export function WorkflowView({
// 2. response.created_at (response.created / lifecycle events)
// 3. data.timestamp (response.workflow_event.completed ISO string)
// Fall back to a synthesized timestamp only when none is present.
const anyEvent = openAIEvent as Record<string, unknown>;
const eventTimestamp: number | undefined =
typeof anyEvent["created_at"] === "number" && anyEvent["created_at"]
? (anyEvent["created_at"] as number)
: typeof (anyEvent["response"] as Record<string, unknown> | undefined)?.["created_at"] === "number"
? ((anyEvent["response"] as Record<string, number>)["created_at"] as number)
: (() => {
const ts = (anyEvent["data"] as Record<string, unknown> | undefined)?.["timestamp"];
if (typeof ts !== "string") return undefined;
const ms = new Date(ts).getTime();
// Guard against NaN: Python isoformat() emits microseconds without Z,
// which some JS engines cannot parse. Number.isFinite rejects NaN.
return Number.isFinite(ms) ? ms / 1000 : undefined;
})();
const eventTimestamp = getWorkflowEventTimestamp(openAIEvent);
const baseTimestamp = Math.floor(Date.now() / 1000);
const lastTimestamp =
prev.length > 0
@@ -1018,20 +1027,7 @@ export function WorkflowView({
// 2. response.created_at (response.created / lifecycle events)
// 3. data.timestamp (response.workflow_event.completed ISO string)
// Fall back to a synthesized timestamp only when none is present.
const anyEvent = openAIEvent as Record<string, unknown>;
const eventTimestamp: number | undefined =
typeof anyEvent["created_at"] === "number" && anyEvent["created_at"]
? (anyEvent["created_at"] as number)
: typeof (anyEvent["response"] as Record<string, unknown> | undefined)?.["created_at"] === "number"
? ((anyEvent["response"] as Record<string, number>)["created_at"] as number)
: (() => {
const ts = (anyEvent["data"] as Record<string, unknown> | undefined)?.["timestamp"];
if (typeof ts !== "string") return undefined;
const ms = new Date(ts).getTime();
// Guard against NaN: Python isoformat() emits microseconds without Z,
// which some JS engines cannot parse. Number.isFinite rejects NaN.
return Number.isFinite(ms) ? ms / 1000 : undefined;
})();
const eventTimestamp = getWorkflowEventTimestamp(openAIEvent);
const baseTimestamp = Math.floor(Date.now() / 1000);
const lastTimestamp =
prev.length > 0
@@ -600,7 +600,10 @@ function EventItem({ event }: EventItemProps) {
event.type === "error";
return (
<div className="border-l-2 border-muted pl-3 py-2 hover:bg-muted/50 transition-colors">
<div
className="border-l-2 border-muted pl-3 py-2 hover:bg-muted/50 transition-colors"
data-devui-debug-event={eventType}
>
<div className="flex items-center gap-2 text-xs text-muted-foreground mb-1">
<Icon className={`h-3 w-3 ${colorClass}`} />
<span className="font-mono">{timestamp}</span>
@@ -1088,18 +1091,17 @@ function EventExpandedContent({
function EventsTab({
events,
processedEvents,
isStreaming,
}: {
events: ExtendedResponseStreamEvent[];
processedEvents: ExtendedResponseStreamEvent[];
isStreaming?: boolean;
}) {
const scrollRef = useRef<HTMLDivElement>(null);
// Process events to accumulate tool calls and reduce noise
const processedEvents = processEventsForDisplay(events);
// Add separators between message rounds
const eventsWithSeparators = addSeparatorsToEvents(processedEvents);
const eventsWithSeparators = useMemo(() => addSeparatorsToEvents(processedEvents), [processedEvents]);
// Reverse events so latest appears at top
const reversedEvents = [...eventsWithSeparators].reverse();
@@ -1565,10 +1567,13 @@ function TracesTab({ events }: { events: ExtendedResponseStreamEvent[] }) {
);
}
function ToolsTab({ events }: { events: ExtendedResponseStreamEvent[] }) {
// Process events first to get clean tool calls
const processedEvents = processEventsForDisplay(events);
function ToolsTab({
events,
processedEvents,
}: {
events: ExtendedResponseStreamEvent[];
processedEvents: ExtendedResponseStreamEvent[];
}) {
// Create call->result pairs in chronological order
const toolEvents: ExtendedResponseStreamEvent[] = [];
const functionCalls = processedEvents.filter(
@@ -1755,15 +1760,16 @@ export function DebugPanel({
const activeTab = useDevUIStore((state) => state.debugPanelTab);
const setActiveTab = useDevUIStore((state) => state.setDebugPanelTab);
const processedEvents = useMemo(() => processEventsForDisplay(events), [events]);
// Compute counts once for tab badges (memoized to avoid perf hits)
const counts = useMemo(() => {
const processedEvents = processEventsForDisplay(events);
const eventsCount = processedEvents.length;
const tracesCount = events.filter(e => e.type === "response.trace.completed").length;
const toolsCount = processedEvents.filter(e => e.type === "response.function_call.complete").length
+ events.filter(e => getFunctionResultFromEvent(e) !== null).length;
return { eventsCount, tracesCount, toolsCount };
}, [events]);
}, [events, processedEvents]);
return (
<div className="flex-1 border-l flex flex-col min-h-0">
@@ -1809,7 +1815,7 @@ export function DebugPanel({
</div>
<TabsContent value="events" className="flex-1 mt-0 overflow-hidden">
<EventsTab events={events} isStreaming={isStreaming} />
<EventsTab events={events} processedEvents={processedEvents} isStreaming={isStreaming} />
</TabsContent>
<TabsContent value="traces" className="flex-1 mt-0 overflow-hidden">
@@ -1817,7 +1823,7 @@ export function DebugPanel({
</TabsContent>
<TabsContent value="tools" className="flex-1 mt-0 overflow-hidden">
<ToolsTab events={events} />
<ToolsTab events={events} processedEvents={processedEvents} />
</TabsContent>
</Tabs>
</div>
@@ -519,6 +519,7 @@ class ApiClient {
lastMessageId,
lastSequenceNumber,
accumulatedText: storedState?.accumulatedText,
accumulatedTextIsPreview: storedState?.accumulatedTextIsPreview,
}),
event,
currentResponseId,
@@ -15,11 +15,13 @@ export interface StreamingState {
lastSequenceNumber: number;
timestamp: number; // When this state was last updated
completed: boolean; // Whether the stream completed successfully
accumulatedText?: string; // Accumulated text content for quick restoration
accumulatedText?: string; // Bounded tail preview for refresh restoration
accumulatedTextIsPreview?: boolean;
}
const STORAGE_KEY_PREFIX = "devui_streaming_state_";
const STATE_EXPIRY_MS = 24 * 60 * 60 * 1000; // 24 hours
const MAX_ACCUMULATED_TEXT_PREVIEW_CHARS = 16 * 1024;
interface CreateStreamingStateOptions {
conversationId: string;
@@ -27,6 +29,7 @@ interface CreateStreamingStateOptions {
lastMessageId?: string;
lastSequenceNumber?: number;
accumulatedText?: string;
accumulatedTextIsPreview?: boolean;
}
/**
@@ -36,6 +39,21 @@ function getStorageKey(conversationId: string): string {
return `${STORAGE_KEY_PREFIX}${conversationId}`;
}
function normalizeAccumulatedTextPreview(state: StreamingState): StreamingState {
if (
state.accumulatedText === undefined ||
state.accumulatedText.length <= MAX_ACCUMULATED_TEXT_PREVIEW_CHARS
) {
return state;
}
return {
...state,
accumulatedText: state.accumulatedText.slice(-MAX_ACCUMULATED_TEXT_PREVIEW_CHARS),
accumulatedTextIsPreview: true,
};
}
/**
* Read raw streaming state from storage, including completed entries.
*/
@@ -56,7 +74,7 @@ function readStreamingState(conversationId: string): StreamingState | null {
return null;
}
return state;
return normalizeAccumulatedTextPreview(state);
}
/**
@@ -68,8 +86,9 @@ export function createStreamingState({
lastMessageId,
lastSequenceNumber = -1,
accumulatedText,
accumulatedTextIsPreview = false,
}: CreateStreamingStateOptions): StreamingState {
return {
return normalizeAccumulatedTextPreview({
conversationId,
responseId,
lastMessageId,
@@ -77,7 +96,8 @@ export function createStreamingState({
timestamp: Date.now(),
completed: false,
accumulatedText,
};
accumulatedTextIsPreview,
});
}
/**
@@ -108,7 +128,15 @@ export function applyStreamingEventToState(
typeof event.delta === "string" &&
event.delta.length > 0
) {
nextState.accumulatedText = `${state.accumulatedText ?? ""}${event.delta}`;
const accumulatedText = `${state.accumulatedText ?? ""}${event.delta}`;
const isPreview =
state.accumulatedTextIsPreview ||
accumulatedText.length > MAX_ACCUMULATED_TEXT_PREVIEW_CHARS;
nextState.accumulatedText = isPreview
? accumulatedText.slice(-MAX_ACCUMULATED_TEXT_PREVIEW_CHARS)
: accumulatedText;
nextState.accumulatedTextIsPreview = isPreview;
}
return nextState;
@@ -120,7 +148,7 @@ export function applyStreamingEventToState(
export function saveStreamingState(state: StreamingState): void {
try {
const key = getStorageKey(state.conversationId);
const data = JSON.stringify(state);
const data = JSON.stringify(normalizeAccumulatedTextPreview(state));
localStorage.setItem(key, data);
} catch (error) {
console.error("Failed to save streaming state:", error);
@@ -129,7 +157,7 @@ export function saveStreamingState(state: StreamingState): void {
clearExpiredStreamingStates();
// Try again
const key = getStorageKey(state.conversationId);
const data = JSON.stringify(state);
const data = JSON.stringify(normalizeAccumulatedTextPreview(state));
localStorage.setItem(key, data);
} catch {
console.error("Failed to save streaming state even after cleanup");
@@ -18,6 +18,26 @@ import type {
import type { ConversationItem } from "@/types/openai";
import type { AttachmentItem } from "@/components/ui/attachment-gallery";
const MAX_DEBUG_EVENTS = 1000;
const MAX_DEBUG_TEXT_DELTA_CHARS = 2048;
function prepareDebugEvent(event: ExtendedResponseStreamEvent): ExtendedResponseStreamEvent {
if (
event.type !== "response.output_text.delta" ||
!("delta" in event) ||
typeof event.delta !== "string" ||
event.delta.length <= MAX_DEBUG_TEXT_DELTA_CHARS
) {
return event;
}
const omittedChars = event.delta.length - MAX_DEBUG_TEXT_DELTA_CHARS;
return {
...event,
delta: `${event.delta.slice(0, MAX_DEBUG_TEXT_DELTA_CHARS)}\n...[${omittedChars} chars omitted from debug view]`,
};
}
// ========================================
// State Interface
// ========================================
@@ -395,6 +415,7 @@ export const useDevUIStore = create<DevUIStore>()(
setStreamingEnabled: (enabled) => set({ streamingEnabled: enabled }),
addDebugEvent: (event) =>
set((state) => {
const eventForStorage = prepareDebugEvent(event);
// Generate unique timestamp for each event
// Use current time + small increment to ensure uniqueness even for rapid events
const baseTimestamp = Math.floor(Date.now() / 1000);
@@ -404,16 +425,19 @@ export const useDevUIStore = create<DevUIStore>()(
const lastTimestamp = lastEvent?._uiTimestamp ?? 0;
// Ensure new timestamp is always greater than the last one
const uniqueTimestamp = Math.max(baseTimestamp, lastTimestamp + 1);
const retainedEvents = state.debugEvents.length >= MAX_DEBUG_EVENTS
? state.debugEvents.slice(-(MAX_DEBUG_EVENTS - 1))
: state.debugEvents;
return {
debugEvents: [
...state.debugEvents,
...retainedEvents,
{
...event,
...eventForStorage,
// Add UI display timestamp when event is received (Unix seconds)
// Each event gets a unique timestamp to preserve chronological order
_uiTimestamp: ('created_at' in event && event.created_at)
? event.created_at
_uiTimestamp: ('created_at' in eventForStorage && eventForStorage.created_at)
? eventForStorage.created_at
: uniqueTimestamp,
} as ExtendedResponseStreamEvent & { _uiTimestamp: number },
],
@@ -79,6 +79,8 @@ _POST_SEND_DELAY_S = 1.0
_SAMPLE_INTERVAL_S = 0.5
_SAMPLE_WINDOW_S = 12.0
_MAX_RENDERER_GROWTH_MB = 500.0
_MAX_STREAMING_STATE_STORAGE_BYTES = 64 * 1024
_MAX_DEBUG_EVENT_DOM_ITEMS = 1000
@dataclass(frozen=True)
@@ -89,6 +91,13 @@ class _BrowserProcessRow:
command: str
@dataclass(frozen=True)
class _BrowserMemoryProbe:
streaming_state_storage_bytes: int
debug_event_dom_items: int
js_heap_bytes: int | None
class MemoryStressAgent(BaseAgent):
"""Agent that emits many small streaming chunks."""
@@ -430,6 +439,52 @@ def _sample_peak_renderer_rss_mb(root_pid: int, profile_dir: str) -> float:
return round((max(renderer_rss_kb, default=0)) / 1024, 2)
async def _sample_browser_memory_probe(client: _CDPClient, *, session_id: str) -> _BrowserMemoryProbe:
value = await client.evaluate(
"""
(() => {
const storagePrefix = "devui_streaming_state_";
const textEncoder = new TextEncoder();
let streamingStateStorageBytes = 0;
for (let index = 0; index < localStorage.length; index += 1) {
const key = localStorage.key(index);
if (!key || !key.startsWith(storagePrefix)) {
continue;
}
const item = localStorage.getItem(key) || "";
streamingStateStorageBytes += textEncoder.encode(key).length + textEncoder.encode(item).length;
}
return {
streamingStateStorageBytes,
debugEventDomItems: document.querySelectorAll("[data-devui-debug-event]").length,
jsHeapBytes: performance.memory ? performance.memory.usedJSHeapSize : null,
};
})()
""",
session_id=session_id,
)
if not isinstance(value, dict):
raise AssertionError(f"Expected browser memory probe object, got: {type(value).__name__}")
streaming_state_storage_bytes = value.get("streamingStateStorageBytes")
debug_event_dom_items = value.get("debugEventDomItems")
js_heap_bytes = value.get("jsHeapBytes")
if not isinstance(streaming_state_storage_bytes, int):
raise AssertionError("Browser memory probe did not return streamingStateStorageBytes")
if not isinstance(debug_event_dom_items, int):
raise AssertionError("Browser memory probe did not return debugEventDomItems")
if js_heap_bytes is not None and not isinstance(js_heap_bytes, int):
raise AssertionError("Browser memory probe returned invalid jsHeapBytes")
return _BrowserMemoryProbe(
streaming_state_storage_bytes=streaming_state_storage_bytes,
debug_event_dom_items=debug_event_dom_items,
js_heap_bytes=js_heap_bytes,
)
def _terminate_browser_processes(root_pid: int, profile_dir: str) -> None:
browser_rows = _collect_browser_process_rows(root_pid, profile_dir)
browser_pids = sorted({row.pid for row in browser_rows} | {root_pid}, reverse=True)
@@ -727,6 +782,7 @@ async def test_devui_streaming_renderer_memory_is_bounded(
peak_renderer_rss_mb = start_renderer_rss_mb
samples: list[tuple[float, float]] = [(0.0, start_renderer_rss_mb)]
probe_samples: list[tuple[float, _BrowserMemoryProbe]] = []
start_time = time.monotonic()
while time.monotonic() - start_time < _SAMPLE_WINDOW_S:
@@ -737,6 +793,10 @@ async def test_devui_streaming_renderer_memory_is_bounded(
elapsed_s = round(time.monotonic() - start_time, 2)
samples.append((elapsed_s, current_sample))
peak_renderer_rss_mb = max(peak_renderer_rss_mb, current_sample)
probe_samples.append((
elapsed_s,
await _sample_browser_memory_probe(client, session_id=session_id),
))
if peak_renderer_rss_mb - start_renderer_rss_mb > _MAX_RENDERER_GROWTH_MB:
break
@@ -744,13 +804,44 @@ async def test_devui_streaming_renderer_memory_is_bounded(
await asyncio.sleep(_SAMPLE_INTERVAL_S)
renderer_growth_mb = round(peak_renderer_rss_mb - start_renderer_rss_mb, 2)
max_streaming_state_storage_bytes = max(
(probe.streaming_state_storage_bytes for _, probe in probe_samples),
default=0,
)
max_debug_event_dom_items = max(
(probe.debug_event_dom_items for _, probe in probe_samples),
default=0,
)
assert renderer_growth_mb <= _MAX_RENDERER_GROWTH_MB, (
"DevUI renderer memory grew too much during a ~1.5 MB streaming response. "
f"start={start_renderer_rss_mb:.2f}MB "
f"peak={peak_renderer_rss_mb:.2f}MB "
f"growth={renderer_growth_mb:.2f}MB "
f"budget={_MAX_RENDERER_GROWTH_MB:.2f}MB "
f"samples={samples}"
f"samples={samples} "
f"probe_samples={probe_samples}"
)
assert max_streaming_state_storage_bytes <= _MAX_STREAMING_STATE_STORAGE_BYTES, (
"DevUI streaming resume state retained too much text in browser storage. "
f"peak={max_streaming_state_storage_bytes} bytes "
f"budget={_MAX_STREAMING_STATE_STORAGE_BYTES} bytes "
f"probe_samples={probe_samples}"
)
assert max_streaming_state_storage_bytes > 0, (
"DevUI streaming state storage was never written during the stress run "
"(cap assertion would be vacuous). "
f"probe_samples={probe_samples}"
)
assert max_debug_event_dom_items <= _MAX_DEBUG_EVENT_DOM_ITEMS, (
"DevUI debug panel rendered too many retained streaming events. "
f"peak={max_debug_event_dom_items} "
f"budget={_MAX_DEBUG_EVENT_DOM_ITEMS} "
f"probe_samples={probe_samples}"
)
assert max_debug_event_dom_items > 0, (
"DevUI debug panel rendered zero events during the stress run "
"(cap assertion would be vacuous). "
f"probe_samples={probe_samples}"
)
finally:
_shutdown_browser_process(browser_process, profile_dir=profile_dir)