Python: Fix DevUI streaming memory growth and add cross-platform regression coverage (#5221)

* fix for memory leak in devui

* update async sleep

* remove old func
This commit is contained in:
Eduard van Valkenburg
2026-04-14 11:27:52 +02:00
committed by GitHub
Unverified
parent 7bb0feca59
commit 98e17764a4
6 changed files with 1211 additions and 309 deletions
File diff suppressed because one or more lines are too long
+56 -4
View File
@@ -3,7 +3,7 @@
* Features: Entity selection, layout management, debug coordination
*/
import { useEffect, useCallback, useState } from "react";
import { useEffect, useCallback, useRef, useState } from "react";
import { AppHeader, DebugPanel, SettingsModal, DeploymentModal } from "@/components/layout";
import { GalleryView } from "@/components/features/gallery";
import { AgentView } from "@/components/features/agent";
@@ -15,17 +15,22 @@ import type {
AgentInfo,
WorkflowInfo,
ExtendedResponseStreamEvent,
ResponseTextDeltaEvent,
} from "@/types";
import { Button } from "./components/ui/button";
import { Input } from "./components/ui/input";
import { useDevUIStore } from "@/stores";
const DEBUG_TEXT_EVENT_FLUSH_INTERVAL_MS = 50;
export default function App() {
// Local state for auth handling
const [authRequired, setAuthRequired] = useState(false);
const [authToken, setAuthToken] = useState("");
const [isTestingToken, setIsTestingToken] = useState(false);
const [authError, setAuthError] = useState("");
const bufferedDebugTextRef = useRef<ResponseTextDeltaEvent | null>(null);
const lastBufferedDebugFlushAtRef = useRef(0);
// Entity state from Zustand
const agents = useDevUIStore((state) => state.agents);
@@ -303,16 +308,63 @@ export default function App() {
[selectEntity, updateAgent, updateWorkflow, addToast]
);
const flushBufferedDebugText = useCallback(() => {
const bufferedEvent = bufferedDebugTextRef.current;
if (!bufferedEvent) {
return;
}
bufferedDebugTextRef.current = null;
lastBufferedDebugFlushAtRef.current = performance.now();
addDebugEvent(bufferedEvent);
}, [addDebugEvent]);
// Handle debug events from active view
const handleDebugEvent = useCallback(
(event: ExtendedResponseStreamEvent | "clear") => {
if (event === "clear") {
bufferedDebugTextRef.current = null;
clearDebugEvents();
} else {
addDebugEvent(event);
return;
}
if (
event.type === "response.output_text.delta" &&
"delta" in event &&
typeof event.delta === "string" &&
event.delta.length > 0
) {
const bufferedEvent = bufferedDebugTextRef.current;
const isSameOutput =
bufferedEvent !== null &&
bufferedEvent.item_id === event.item_id &&
bufferedEvent.output_index === event.output_index &&
bufferedEvent.content_index === event.content_index;
if (isSameOutput && bufferedEvent) {
bufferedDebugTextRef.current = {
...bufferedEvent,
delta: bufferedEvent.delta + event.delta,
sequence_number: event.sequence_number ?? bufferedEvent.sequence_number,
};
} else {
flushBufferedDebugText();
bufferedDebugTextRef.current = { ...event } as ResponseTextDeltaEvent;
}
if (
performance.now() - lastBufferedDebugFlushAtRef.current >=
DEBUG_TEXT_EVENT_FLUSH_INTERVAL_MS
) {
flushBufferedDebugText();
}
return;
}
flushBufferedDebugText();
addDebugEvent(event);
},
[addDebugEvent, clearDebugEvents]
[addDebugEvent, clearDebugEvents, flushBufferedDebugText]
);
// Show loading state while initializing
@@ -44,6 +44,8 @@ import { loadStreamingState } from "@/services/streaming-state";
type DebugEventHandler = (event: ExtendedResponseStreamEvent | "clear") => void;
const ASSISTANT_TEXT_RENDER_INTERVAL_MS = 50;
interface AgentViewProps {
selectedAgent: AgentInfo;
onDebugEvent: DebugEventHandler;
@@ -309,6 +311,71 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
} | null>(null);
const userJustSentMessage = useRef<boolean>(false);
const accumulatedTextRef = useRef<string>("");
const lastAssistantTextRenderAt = useRef(0);
const renderAssistantStreamingText = useCallback(
(
assistantMessageId: string,
status: "in_progress" | "completed" | "incomplete" = "in_progress",
force: boolean = false
) => {
const now = performance.now();
if (
!force &&
now - lastAssistantTextRenderAt.current < ASSISTANT_TEXT_RENDER_INTERVAL_MS
) {
return;
}
const currentItems = useDevUIStore.getState().chatItems;
let changed = false;
const nextItems = currentItems.map((item) => {
if (item.id !== assistantMessageId || item.type !== "message") {
return item;
}
const nextText = accumulatedTextRef.current;
const existingTextContent = item.content.find(
(content) => content.type === "text" || content.type === "output_text"
);
const currentText =
existingTextContent && "text" in existingTextContent
? existingTextContent.text
: "";
if (currentText === nextText && item.status === status) {
return item;
}
changed = true;
const existingNonTextContent = item.content.filter(
(content) => content.type !== "text" && content.type !== "output_text"
);
return {
...item,
content: nextText
? [
...existingNonTextContent,
{
type: "text",
text: nextText,
} as import("@/types/openai").MessageTextContent,
]
: existingNonTextContent,
status,
};
});
if (changed) {
lastAssistantTextRenderAt.current = now;
setChatItems(nextItems);
} else if (force) {
lastAssistantTextRenderAt.current = now;
}
},
[setChatItems]
);
// Auto-scroll to bottom when new items arrive
useEffect(() => {
@@ -382,6 +449,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
undefined, // No abort signal for resume
storedState.responseId // Pass response ID for resume
);
lastAssistantTextRenderAt.current = 0;
for await (const openAIEvent of streamGenerator) {
// Pass all events to debug panel
@@ -412,6 +480,12 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
: JSON.stringify(error)
: "Request failed";
if (accumulatedTextRef.current) {
renderAssistantStreamingText(assistantMessage.id, "incomplete", true);
setIsStreaming(false);
return;
}
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
@@ -434,6 +508,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
// Handle function approval request events
if (openAIEvent.type === "response.function_approval.requested") {
const approvalEvent = openAIEvent as import("@/types/openai").ResponseFunctionApprovalRequestedEvent;
renderAssistantStreamingText(assistantMessage.id, "in_progress", true);
setPendingApprovals([
...useDevUIStore.getState().pendingApprovals,
{
@@ -458,6 +533,12 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
const errorEvent = openAIEvent as ExtendedResponseStreamEvent & { message?: string };
const errorMessage = errorEvent.message || "An error occurred";
if (accumulatedTextRef.current) {
renderAssistantStreamingText(assistantMessage.id, "incomplete", true);
setIsStreaming(false);
return;
}
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
@@ -484,27 +565,13 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
openAIEvent.delta
) {
accumulatedTextRef.current += openAIEvent.delta;
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
? {
...item,
content: [
{
type: "text",
text: accumulatedTextRef.current,
} as import("@/types/openai").MessageTextContent,
],
status: "in_progress" as const,
}
: item
));
renderAssistantStreamingText(assistantMessage.id);
}
}
// Stream ended - mark as complete
const finalUsage = currentMessageUsage.current;
renderAssistantStreamingText(assistantMessage.id, "in_progress", true);
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
@@ -721,11 +788,12 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
setIsStreaming(false);
setCurrentConversation(undefined);
accumulatedTextRef.current = "";
lastAssistantTextRenderAt.current = 0;
loadConversations();
// currentConversation is intentionally excluded - this effect should only run when agent changes
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [selectedAgent, onDebugEvent, setChatItems, setIsStreaming, setLoadingConversations, setAvailableConversations, setCurrentConversation, setPendingApprovals, updateConversationUsage]);
}, [selectedAgent, onDebugEvent, renderAssistantStreamingText, setChatItems, setIsStreaming, setLoadingConversations, setAvailableConversations, setCurrentConversation, setPendingApprovals, updateConversationUsage]);
// Removed old input handling functions - now handled by ChatMessageInput component
@@ -1118,6 +1186,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
// Clear text accumulator for new response
accumulatedTextRef.current = "";
lastAssistantTextRenderAt.current = 0;
// Create new AbortController for this request
const signal = createAbortSignal();
@@ -1167,6 +1236,12 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
}
// Update assistant message with error
if (accumulatedTextRef.current) {
renderAssistantStreamingText(assistantMessage.id, "incomplete", true);
setIsStreaming(false);
return; // Exit stream processing on failure
}
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
@@ -1189,6 +1264,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
// Handle function approval request events
if (openAIEvent.type === "response.function_approval.requested") {
const approvalEvent = openAIEvent as import("@/types/openai").ResponseFunctionApprovalRequestedEvent;
renderAssistantStreamingText(assistantMessage.id, "in_progress", true);
// Add to pending approvals (for popup)
setPendingApprovals([
@@ -1267,6 +1343,12 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
const errorMessage = errorEvent.message || "An error occurred";
// Update assistant message with error and stop streaming
if (accumulatedTextRef.current) {
renderAssistantStreamingText(assistantMessage.id, "incomplete", true);
setIsStreaming(false);
return; // Exit stream processing early on error
}
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
@@ -1290,6 +1372,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
if (openAIEvent.type === "response.output_item.added") {
const outputItemEvent = openAIEvent as import("@/types/openai").ResponseOutputItemAddedEvent;
const item = outputItemEvent.item;
renderAssistantStreamingText(assistantMessage.id, "in_progress", true);
// Handle function calls as separate conversation items
if (item.type === "function_call") {
@@ -1363,28 +1446,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
openAIEvent.delta
) {
accumulatedTextRef.current += openAIEvent.delta;
// Update assistant message with accumulated content
// Preserve any existing non-text content (images, files, data)
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) => {
if (item.id === assistantMessage.id && item.type === "message") {
// Keep existing non-text content, update text content
const existingNonTextContent = item.content.filter(c => c.type !== "text");
return {
...item,
content: [
...existingNonTextContent,
{
type: "text",
text: accumulatedTextRef.current,
} as import("@/types/openai").MessageTextContent,
],
status: "in_progress" as const,
};
}
return item;
}));
renderAssistantStreamingText(assistantMessage.id);
}
// Handle completion/error by detecting when streaming stops
@@ -1394,6 +1456,7 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
// Stream ended - mark as complete
// Usage is provided via response.completed event (OpenAI standard)
const finalUsage = currentMessageUsage.current;
renderAssistantStreamingText(assistantMessage.id, "in_progress", true);
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
@@ -1419,45 +1482,42 @@ export function AgentView({ selectedAgent, onDebugEvent }: AgentViewProps) {
if (isAbortError(error)) {
// User cancelled - mark as cancelled for UI feedback
setWasCancelled(true);
// Mark the message as completed with what we have
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
? {
...item,
status: accumulatedTextRef.current ? "completed" as const : "incomplete" as const,
// Keep whatever text we have accumulated
content: item.content,
}
: item
));
renderAssistantStreamingText(
assistantMessage.id,
accumulatedTextRef.current ? "completed" : "incomplete",
true
);
} else {
// Other errors - show error message
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
? {
...item,
content: [
{
type: "text",
text: `Error: ${
error instanceof Error
? error.message
: "Failed to get response"
}`,
} as import("@/types/openai").MessageTextContent,
],
status: "incomplete" as const,
}
: item
));
if (accumulatedTextRef.current) {
renderAssistantStreamingText(assistantMessage.id, "incomplete", true);
} else {
const currentItems = useDevUIStore.getState().chatItems;
setChatItems(currentItems.map((item) =>
item.id === assistantMessage.id && item.type === "message"
? {
...item,
content: [
{
type: "text",
text: `Error: ${
error instanceof Error
? error.message
: "Failed to get response"
}`,
} as import("@/types/openai").MessageTextContent,
],
status: "incomplete" as const,
}
: item
));
}
}
setIsStreaming(false);
resetCancelling();
}
},
[selectedAgent, currentConversation, onDebugEvent, setChatItems, setIsStreaming, setCurrentConversation, setAvailableConversations, setPendingApprovals, updateConversationUsage, createAbortSignal, resetCancelling]
[selectedAgent, currentConversation, onDebugEvent, renderAssistantStreamingText, setChatItems, setIsStreaming, setCurrentConversation, setAvailableConversations, setPendingApprovals, updateConversationUsage, createAbortSignal, resetCancelling]
);
// Handle non-streaming message sending
@@ -16,9 +16,10 @@ import type {
import type { AgentFrameworkRequest } from "@/types/agent-framework";
import type { ExtendedResponseStreamEvent } from "@/types/openai";
import {
applyStreamingEventToState,
createStreamingState,
loadStreamingState,
updateStreamingState,
markStreamingCompleted,
saveStreamingState,
clearStreamingState,
} from "./streaming-state";
import { isAbortError } from "@/hooks";
@@ -72,6 +73,7 @@ const DEFAULT_API_BASE_URL =
// Retry configuration for streaming
const RETRY_INTERVAL_MS = 1000; // Base retry interval (will use exponential backoff)
const MAX_RETRY_ATTEMPTS = 10; // Max 10 retries (~30 seconds with exponential backoff)
const STREAMING_STATE_SAVE_INTERVAL_MS = 250;
// Get backend URL from localStorage or default
function getBackendUrl(): string {
@@ -223,7 +225,7 @@ class ApiClient {
chat_client_type: entity.chat_client_type,
context_provider: entity.context_provider,
middleware: entity.middleware,
};
} as AgentInfo;
} else {
// Workflow - prefer executors field, fall back to tools for backward compatibility
const executorList = entity.executors || entity.tools || [];
@@ -263,7 +265,7 @@ class ApiClient {
input_type_name: entity.input_type_name || "Input",
start_executor_id: startExecutorId,
tools: [],
};
} as WorkflowInfo;
}
});
@@ -484,31 +486,65 @@ class ApiClient {
let hasYieldedAnyEvent = false;
let currentResponseId: string | undefined = resumeResponseId;
let lastMessageId: string | undefined = undefined;
let lastStreamingStateSaveAt = 0;
let storedState = conversationId ? loadStreamingState(conversationId) : null;
let streamingState = storedState ? { ...storedState } : null;
const persistStreamingState = (force: boolean = false): void => {
if (!conversationId || !streamingState) {
return;
}
const now = Date.now();
if (!force && now - lastStreamingStateSaveAt < STREAMING_STATE_SAVE_INTERVAL_MS) {
return;
}
lastStreamingStateSaveAt = now;
saveStreamingState({
...streamingState,
timestamp: now,
});
};
const recordStreamingEvent = (event: ExtendedResponseStreamEvent): void => {
if (!conversationId || !currentResponseId) {
return;
}
streamingState = applyStreamingEventToState(
streamingState ?? createStreamingState({
conversationId,
responseId: currentResponseId,
lastMessageId,
lastSequenceNumber,
accumulatedText: storedState?.accumulatedText,
}),
event,
currentResponseId,
lastMessageId
);
const isTextDelta =
event.type === "response.output_text.delta" &&
"delta" in event &&
typeof event.delta === "string" &&
event.delta.length > 0;
persistStreamingState(!isTextDelta);
};
// Try to resume from stored state if conversation ID is provided
if (conversationId) {
const storedState = loadStreamingState(conversationId);
if (storedState) {
// Use stored response ID if no explicit one provided
if (!resumeResponseId) {
currentResponseId = storedState.responseId;
}
lastSequenceNumber = storedState.lastSequenceNumber;
lastMessageId = storedState.lastMessageId;
// Replay stored events only if we're not explicitly resuming
// (explicit resume means the caller already has the events)
if (!resumeResponseId) {
for (const event of storedState.events) {
hasYieldedAnyEvent = true;
yield event;
}
} else {
// Mark that we've already seen events up to this sequence number
hasYieldedAnyEvent = storedState.events.length > 0;
}
if (storedState) {
// Use stored response ID if no explicit one provided
if (!resumeResponseId) {
currentResponseId = storedState.responseId;
}
lastSequenceNumber = storedState.lastSequenceNumber;
lastMessageId = storedState.lastMessageId;
hasYieldedAnyEvent =
storedState.lastSequenceNumber >= 0 ||
Boolean(storedState.accumulatedText);
}
while (retryCount <= MAX_RETRY_ATTEMPTS) {
@@ -621,7 +657,8 @@ class ApiClient {
if (done) {
// Stream completed successfully
if (conversationId) {
markStreamingCompleted(conversationId);
clearStreamingState(conversationId);
streamingState = null;
}
return;
}
@@ -640,7 +677,8 @@ class ApiClient {
// Handle [DONE] signal
if (dataStr === "[DONE]") {
if (conversationId) {
markStreamingCompleted(conversationId);
clearStreamingState(conversationId);
streamingState = null;
}
return;
}
@@ -676,6 +714,9 @@ class ApiClient {
if (conversationId) {
clearStreamingState(conversationId);
}
storedState = null;
streamingState = null;
lastStreamingStateSaveAt = 0;
yield {
type: "error",
message: "Connection lost - previous response failed. Starting new response.",
@@ -684,9 +725,7 @@ class ApiClient {
hasYieldedAnyEvent = true;
// Save new event to storage
if (conversationId && currentResponseId) {
updateStreamingState(conversationId, openAIEvent, currentResponseId, lastMessageId);
}
recordStreamingEvent(openAIEvent);
yield openAIEvent;
}
@@ -698,9 +737,7 @@ class ApiClient {
hasYieldedAnyEvent = true;
// Save event to storage before yielding
if (conversationId && currentResponseId) {
updateStreamingState(conversationId, openAIEvent, currentResponseId, lastMessageId);
}
recordStreamingEvent(openAIEvent);
yield openAIEvent;
}
@@ -709,9 +746,7 @@ class ApiClient {
hasYieldedAnyEvent = true;
// Still save to storage if we have conversation context
if (conversationId && currentResponseId) {
updateStreamingState(conversationId, openAIEvent, currentResponseId, lastMessageId);
}
recordStreamingEvent(openAIEvent);
yield openAIEvent;
}
@@ -730,7 +765,8 @@ class ApiClient {
// Don't retry on abort
if (isAbortError(error)) {
if (conversationId) {
markStreamingCompleted(conversationId); // Clean up state
clearStreamingState(conversationId);
streamingState = null;
}
throw error; // Re-throw abort error without retrying
}
@@ -3,7 +3,6 @@
*
* Manages browser storage of streaming response state to enable:
* - Resume interrupted streams after page refresh
* - Replay cached events before fetching new ones
* - Graceful recovery from network disconnections
*/
@@ -14,7 +13,6 @@ export interface StreamingState {
responseId: string;
lastMessageId?: string;
lastSequenceNumber: number;
events: ExtendedResponseStreamEvent[];
timestamp: number; // When this state was last updated
completed: boolean; // Whether the stream completed successfully
accumulatedText?: string; // Accumulated text content for quick restoration
@@ -23,6 +21,14 @@ export interface StreamingState {
const STORAGE_KEY_PREFIX = "devui_streaming_state_";
const STATE_EXPIRY_MS = 24 * 60 * 60 * 1000; // 24 hours
interface CreateStreamingStateOptions {
conversationId: string;
responseId: string;
lastMessageId?: string;
lastSequenceNumber?: number;
accumulatedText?: string;
}
/**
* Storage key for a specific conversation
*/
@@ -31,16 +37,81 @@ function getStorageKey(conversationId: string): string {
}
/**
* Extract accumulated text from events (for quick restoration)
* Read raw streaming state from storage, including completed entries.
*/
function extractAccumulatedText(events: ExtendedResponseStreamEvent[]): string {
let text = "";
for (const event of events) {
if (event.type === "response.output_text.delta" && "delta" in event) {
text += event.delta;
}
function readStreamingState(conversationId: string): StreamingState | null {
const key = getStorageKey(conversationId);
const data = localStorage.getItem(key);
if (!data) {
return null;
}
return text;
const state: StreamingState = JSON.parse(data);
// Check if state has expired
const age = Date.now() - state.timestamp;
if (age > STATE_EXPIRY_MS) {
clearStreamingState(conversationId);
return null;
}
return state;
}
/**
* Create an initial streaming state snapshot.
*/
export function createStreamingState({
conversationId,
responseId,
lastMessageId,
lastSequenceNumber = -1,
accumulatedText,
}: CreateStreamingStateOptions): StreamingState {
return {
conversationId,
responseId,
lastMessageId,
lastSequenceNumber,
timestamp: Date.now(),
completed: false,
accumulatedText,
};
}
/**
* Apply an incoming stream event to an in-memory streaming state snapshot.
*/
export function applyStreamingEventToState(
state: StreamingState,
event: ExtendedResponseStreamEvent,
responseId: string,
lastMessageId?: string
): StreamingState {
const sequenceNumber = "sequence_number" in event ? event.sequence_number : undefined;
const nextState: StreamingState = {
...state,
responseId,
lastMessageId,
timestamp: Date.now(),
completed: event.type === "response.completed" || event.type === "response.failed",
};
if (sequenceNumber !== undefined) {
nextState.lastSequenceNumber = sequenceNumber;
}
if (
event.type === "response.output_text.delta" &&
"delta" in event &&
typeof event.delta === "string" &&
event.delta.length > 0
) {
nextState.accumulatedText = `${state.accumulatedText ?? ""}${event.delta}`;
}
return nextState;
}
/**
@@ -71,19 +142,8 @@ export function saveStreamingState(state: StreamingState): void {
*/
export function loadStreamingState(conversationId: string): StreamingState | null {
try {
const key = getStorageKey(conversationId);
const data = localStorage.getItem(key);
if (!data) {
return null;
}
const state: StreamingState = JSON.parse(data);
// Check if state has expired
const age = Date.now() - state.timestamp;
if (age > STATE_EXPIRY_MS) {
clearStreamingState(conversationId);
const state = readStreamingState(conversationId);
if (!state) {
return null;
}
@@ -99,54 +159,6 @@ export function loadStreamingState(conversationId: string): StreamingState | nul
}
}
/**
* Update streaming state with a new event
*/
export function updateStreamingState(
conversationId: string,
event: ExtendedResponseStreamEvent,
responseId: string,
lastMessageId?: string
): void {
try {
const existing = loadStreamingState(conversationId);
const sequenceNumber = "sequence_number" in event ? event.sequence_number : undefined;
const newEvents = existing ? [...existing.events, event] : [event];
const state: StreamingState = {
conversationId,
responseId,
lastMessageId,
lastSequenceNumber: sequenceNumber ?? (existing?.lastSequenceNumber ?? -1),
events: newEvents,
timestamp: Date.now(),
completed: event.type === "response.completed" || event.type === "response.failed",
accumulatedText: extractAccumulatedText(newEvents),
};
saveStreamingState(state);
} catch (error) {
console.error("Failed to update streaming state:", error);
}
}
/**
* Mark streaming state as completed
*/
export function markStreamingCompleted(conversationId: string): void {
try {
const existing = loadStreamingState(conversationId);
if (existing) {
existing.completed = true;
existing.timestamp = Date.now();
saveStreamingState(existing);
}
} catch (error) {
console.error("Failed to mark streaming as completed:", error);
}
}
/**
* Clear streaming state for a conversation
*/
@@ -0,0 +1,750 @@
# Copyright (c) Microsoft. All rights reserved.
"""Browser-based regression test for DevUI streaming memory growth."""
from __future__ import annotations
import asyncio
import contextlib
import http.client
import json
import os
import shutil
import signal
import socket
import subprocess
import sys
import tempfile
import threading
import time
from collections.abc import AsyncIterable, Awaitable, Generator
from dataclasses import dataclass
from pathlib import Path
from typing import Any
import pytest
import uvicorn
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
AgentSession,
BaseAgent,
Content,
Message,
ResponseStream,
)
from websockets.asyncio.client import connect as websocket_connect
from agent_framework_devui import DevServer
_BROWSER_COMMANDS = (
"chrome",
"chrome.exe",
"google-chrome",
"google-chrome-stable",
"chromium",
"chromium-browser",
"microsoft-edge",
"msedge",
"msedge.exe",
)
_BROWSER_ENV_VARS = ("DEVUI_TEST_BROWSER", "CHROME_BIN", "BROWSER_BIN")
_WINDOWS_PROCESS_QUERY = """
$rows = @(
Get-CimInstance Win32_Process | ForEach-Object {
if (-not $_.CommandLine) {
return
}
try {
$process = Get-Process -Id $_.ProcessId -ErrorAction Stop
[PSCustomObject]@{
pid = [int]$_.ProcessId
parent_pid = [int]$_.ParentProcessId
rss_kb = [int][Math]::Round($process.WorkingSet64 / 1KB)
command = [string]$_.CommandLine
}
}
catch {
}
}
)
$rows | ConvertTo-Json -Compress
""".strip()
_STREAM_CHUNK_COUNT = 12_000
_STREAM_CHUNK_SIZE = 128
_POST_SEND_DELAY_S = 1.0
_SAMPLE_INTERVAL_S = 0.5
_SAMPLE_WINDOW_S = 12.0
_MAX_RENDERER_GROWTH_MB = 500.0
@dataclass(frozen=True)
class _BrowserProcessRow:
pid: int
parent_pid: int
rss_kb: int
command: str
class MemoryStressAgent(BaseAgent):
"""Agent that emits many small streaming chunks."""
def __init__(self, *, chunk_count: int, chunk_size: int, delay_ms: float, **kwargs: Any) -> None:
super().__init__(**kwargs)
self._chunk_count = chunk_count
self._chunk_size = max(chunk_size, 24)
self._delay_s = max(delay_ms, 0.0) / 1000.0
def run(
self,
messages: str | Message | list[str] | list[Message] | None = None,
*,
stream: bool = False,
session: AgentSession | None = None,
**kwargs: Any,
) -> Awaitable[AgentResponse] | ResponseStream[AgentResponseUpdate, AgentResponse]:
del messages, session, kwargs
if stream:
return self._run_stream()
return self._run()
async def _run(self) -> AgentResponse:
text = "".join(self._make_chunk(index) for index in range(self._chunk_count))
return AgentResponse(messages=[Message("assistant", [Content.from_text(text=text)])])
def _run_stream(self) -> ResponseStream[AgentResponseUpdate, AgentResponse]:
async def _iter() -> AsyncIterable[AgentResponseUpdate]:
for index in range(self._chunk_count):
yield AgentResponseUpdate(
contents=[Content.from_text(text=self._make_chunk(index))],
role="assistant",
)
if self._delay_s:
await asyncio.sleep(self._delay_s)
return ResponseStream(_iter(), finalizer=AgentResponse.from_updates)
def _make_chunk(self, index: int) -> str:
prefix = f"[{index:06d}] "
payload_size = max(self._chunk_size - len(prefix), 1)
payload = ("x" * (payload_size - 1)) + ("\n" if index % 8 == 7 else " ")
return prefix + payload
class _CDPClient:
"""Minimal Chrome DevTools Protocol client for a single attached page."""
def __init__(self, websocket: Any) -> None:
self._websocket = websocket
self._next_id = 0
async def send(
self,
method: str,
params: dict[str, Any] | None = None,
*,
session_id: str | None = None,
) -> dict[str, Any]:
self._next_id += 1
command_id = self._next_id
payload: dict[str, Any] = {"id": command_id, "method": method}
if params is not None:
payload["params"] = params
if session_id is not None:
payload["sessionId"] = session_id
await self._websocket.send(json.dumps(payload))
while True:
raw_message = await self._websocket.recv()
if isinstance(raw_message, bytes):
raw_message = raw_message.decode("utf-8")
message = json.loads(raw_message)
if message.get("id") != command_id:
continue
error = message.get("error")
if isinstance(error, dict):
raise RuntimeError(f"CDP command {method} failed: {error}")
result = message.get("result")
return result if isinstance(result, dict) else {}
async def evaluate(self, expression: str, *, session_id: str) -> Any:
result = await self.send(
"Runtime.evaluate",
{
"expression": expression,
"awaitPromise": True,
"returnByValue": True,
},
session_id=session_id,
)
remote_result = result.get("result")
if isinstance(remote_result, dict):
return remote_result.get("value")
return None
def _get_browser_candidates() -> tuple[Path, ...]:
if sys.platform == "darwin":
return (
Path("/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge"),
Path("/Applications/Google Chrome.app/Contents/MacOS/Google Chrome"),
Path("/Applications/Chromium.app/Contents/MacOS/Chromium"),
)
if sys.platform == "win32":
windows_bases: list[Path] = []
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
raw_value = os.environ.get(env_var)
if raw_value:
windows_bases.append(Path(raw_value))
return tuple(
dict.fromkeys(
[base / "Microsoft/Edge/Application/msedge.exe" for base in windows_bases]
+ [base / "Google/Chrome/Application/chrome.exe" for base in windows_bases]
+ [base / "Chromium/Application/chrome.exe" for base in windows_bases]
)
)
return (
Path("/usr/bin/google-chrome"),
Path("/usr/bin/google-chrome-stable"),
Path("/usr/bin/chromium"),
Path("/usr/bin/chromium-browser"),
Path("/usr/bin/microsoft-edge"),
Path("/opt/google/chrome/chrome"),
Path("/opt/microsoft/msedge/msedge"),
Path("/snap/bin/chromium"),
)
def _find_browser_executable() -> Path | None:
for env_var in _BROWSER_ENV_VARS:
configured_path = os.environ.get(env_var)
if not configured_path:
continue
candidate = Path(configured_path).expanduser()
if candidate.exists():
return candidate
for candidate in _get_browser_candidates():
if candidate.exists():
return candidate
for command in _BROWSER_COMMANDS:
resolved = shutil.which(command)
if resolved is not None:
return Path(resolved)
return None
def _find_available_port() -> int:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
sock.listen(1)
return int(sock.getsockname()[1])
def _get_json_response(*, host: str, port: int, path: str) -> dict[str, Any]:
connection = http.client.HTTPConnection(host, port, timeout=5)
try:
connection.request("GET", path)
response = connection.getresponse()
if response.status != 200:
raise RuntimeError(f"Request to {path} failed with status {response.status}")
payload = response.read().decode("utf-8")
finally:
connection.close()
data = json.loads(payload)
if isinstance(data, dict):
return data
raise RuntimeError(f"Expected JSON object from {path}, got: {type(data).__name__}")
async def _get_devtools_websocket_url(port: int) -> str:
deadline = time.monotonic() + 10.0
while time.monotonic() < deadline:
with contextlib.suppress(Exception):
version_data = _get_json_response(host="127.0.0.1", port=port, path="/json/version")
websocket_url = version_data.get("webSocketDebuggerUrl")
if isinstance(websocket_url, str) and websocket_url:
return websocket_url
await asyncio.sleep(0.1)
raise RuntimeError(f"Timed out waiting for DevTools on port {port}")
def _wait_for_server_details(server_instance: uvicorn.Server) -> tuple[int, str]:
deadline = time.monotonic() + 10.0
actual_port: int | None = None
while time.monotonic() < deadline:
if hasattr(server_instance, "servers") and server_instance.servers:
for uvicorn_server in server_instance.servers:
sockets = getattr(uvicorn_server, "sockets", None)
if not sockets:
continue
actual_port = int(sockets[0].getsockname()[1])
break
if actual_port is not None:
with contextlib.suppress(Exception):
health = _get_json_response(host="127.0.0.1", port=actual_port, path="/health")
if health.get("status") == "healthy":
entities = _get_json_response(host="127.0.0.1", port=actual_port, path="/v1/entities")
entity_list = entities.get("entities")
if isinstance(entity_list, list) and entity_list:
entity = entity_list[0]
if isinstance(entity, dict) and isinstance(entity.get("id"), str):
return actual_port, entity["id"]
time.sleep(0.1)
raise RuntimeError("Timed out waiting for DevUI server startup")
def _parse_posix_process_rows(output: str) -> list[_BrowserProcessRow]:
rows: list[_BrowserProcessRow] = []
for line in output.splitlines():
parts = line.strip().split(None, 3)
if len(parts) != 4:
continue
pid_text, parent_pid_text, rss_text, command = parts
with contextlib.suppress(ValueError):
rows.append(
_BrowserProcessRow(
pid=int(pid_text),
parent_pid=int(parent_pid_text),
rss_kb=int(rss_text),
command=command,
)
)
return rows
def _parse_windows_process_rows(output: str) -> list[_BrowserProcessRow]:
text = output.strip()
if not text:
return []
payload = json.loads(text)
items = payload if isinstance(payload, list) else [payload]
rows: list[_BrowserProcessRow] = []
for item in items:
if not isinstance(item, dict):
continue
pid = item.get("pid")
parent_pid = item.get("parent_pid")
rss_kb = item.get("rss_kb")
command = item.get("command")
if not all(isinstance(value, int) for value in (pid, parent_pid, rss_kb)):
continue
if not isinstance(command, str):
continue
rows.append(
_BrowserProcessRow(
pid=pid,
parent_pid=parent_pid,
rss_kb=rss_kb,
command=command,
)
)
return rows
def _read_process_rows() -> list[_BrowserProcessRow]:
if sys.platform == "win32":
result = subprocess.run(
["powershell", "-NoProfile", "-Command", _WINDOWS_PROCESS_QUERY],
capture_output=True,
text=True,
check=True,
encoding="utf-8",
)
return _parse_windows_process_rows(result.stdout)
result = subprocess.run(
["ps", "-axo", "pid=,ppid=,rss=,command="],
capture_output=True,
text=True,
check=True,
)
return _parse_posix_process_rows(result.stdout)
def _collect_process_tree(root_pids: set[int], process_rows: list[_BrowserProcessRow]) -> list[_BrowserProcessRow]:
process_by_pid = {row.pid: row for row in process_rows}
child_pids_by_parent: dict[int, list[int]] = {}
for row in process_rows:
child_pids_by_parent.setdefault(row.parent_pid, []).append(row.pid)
collected_rows: list[_BrowserProcessRow] = []
seen_pids: set[int] = set()
pending_pids = list(root_pids)
while pending_pids:
pid = pending_pids.pop()
if pid in seen_pids:
continue
seen_pids.add(pid)
process_row = process_by_pid.get(pid)
if process_row is None:
continue
collected_rows.append(process_row)
pending_pids.extend(child_pids_by_parent.get(pid, []))
return collected_rows
def _collect_browser_process_rows(root_pid: int, profile_dir: str) -> list[_BrowserProcessRow]:
process_rows = _read_process_rows()
normalized_profile_dir = profile_dir.casefold()
matched_root_pids = {row.pid for row in process_rows if normalized_profile_dir in row.command.casefold()}
matched_root_pids.add(root_pid)
return _collect_process_tree(matched_root_pids, process_rows)
def _sample_peak_renderer_rss_mb(root_pid: int, profile_dir: str) -> float:
renderer_rss_kb = [
row.rss_kb
for row in _collect_browser_process_rows(root_pid, profile_dir)
if "--type=renderer" in row.command.casefold()
]
return round((max(renderer_rss_kb, default=0)) / 1024, 2)
def _terminate_browser_processes(root_pid: int, profile_dir: str) -> None:
browser_rows = _collect_browser_process_rows(root_pid, profile_dir)
browser_pids = sorted({row.pid for row in browser_rows} | {root_pid}, reverse=True)
if sys.platform == "win32":
for pid in browser_pids:
with contextlib.suppress(subprocess.CalledProcessError):
subprocess.run(
["taskkill", "/PID", str(pid), "/T", "/F"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
return
for pid in browser_pids:
with contextlib.suppress(ProcessLookupError):
os.kill(pid, signal.SIGTERM)
def _launch_browser_process(*, browser_path: Path, debug_port: int, profile_dir: str) -> subprocess.Popen[str]:
return subprocess.Popen(
[
str(browser_path),
"--headless=new",
f"--remote-debugging-port={debug_port}",
"--remote-debugging-address=127.0.0.1",
f"--user-data-dir={profile_dir}",
"--no-first-run",
"--no-default-browser-check",
"--disable-background-networking",
"--disable-sync",
"--disable-renderer-backgrounding",
"--hide-scrollbars",
"--mute-audio",
"--enable-precise-memory-info",
"--no-sandbox",
"about:blank",
],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
text=True,
)
def _shutdown_browser_process(browser_process: subprocess.Popen[str], *, profile_dir: str) -> None:
with contextlib.suppress(Exception):
browser_process.terminate()
browser_process.wait(timeout=5)
_terminate_browser_processes(browser_process.pid, profile_dir)
def test_parse_posix_process_rows() -> None:
output = """
101 1 2048 /usr/bin/google-chrome --user-data-dir=/tmp/devui-memory
202 101 4096 /usr/bin/google-chrome --type=renderer --lang=en-US
""".strip()
assert _parse_posix_process_rows(output) == [
_BrowserProcessRow(
pid=101,
parent_pid=1,
rss_kb=2048,
command="/usr/bin/google-chrome --user-data-dir=/tmp/devui-memory",
),
_BrowserProcessRow(
pid=202,
parent_pid=101,
rss_kb=4096,
command="/usr/bin/google-chrome --type=renderer --lang=en-US",
),
]
def test_parse_windows_process_rows() -> None:
output = json.dumps([
{
"pid": 301,
"parent_pid": 1,
"rss_kb": 2048,
"command": r"C:\Program Files\Google\Chrome\Application\chrome.exe",
},
{
"pid": 302,
"parent_pid": 301,
"rss_kb": 6144,
"command": r"C:\Program Files\Google\Chrome\Application\chrome.exe --type=renderer",
},
])
assert _parse_windows_process_rows(output) == [
_BrowserProcessRow(
pid=301,
parent_pid=1,
rss_kb=2048,
command=r"C:\Program Files\Google\Chrome\Application\chrome.exe",
),
_BrowserProcessRow(
pid=302,
parent_pid=301,
rss_kb=6144,
command=r"C:\Program Files\Google\Chrome\Application\chrome.exe --type=renderer",
),
]
def test_sample_peak_renderer_rss_mb_uses_browser_process_tree(
monkeypatch: pytest.MonkeyPatch,
) -> None:
profile_dir = "/tmp/devui-memory-browser"
process_rows = [
_BrowserProcessRow(
pid=101,
parent_pid=1,
rss_kb=1024,
command="/usr/bin/google-chrome",
),
_BrowserProcessRow(
pid=102,
parent_pid=101,
rss_kb=4096,
command="/usr/bin/google-chrome --type=renderer",
),
_BrowserProcessRow(
pid=201,
parent_pid=1,
rss_kb=2048,
command=f"/usr/bin/google-chrome --user-data-dir={profile_dir}",
),
_BrowserProcessRow(
pid=202,
parent_pid=201,
rss_kb=8192,
command="/usr/bin/google-chrome --type=renderer",
),
_BrowserProcessRow(
pid=999,
parent_pid=1,
rss_kb=32768,
command="/usr/bin/google-chrome --type=renderer",
),
]
monkeypatch.setattr(sys.modules[__name__], "_read_process_rows", lambda: process_rows)
assert _sample_peak_renderer_rss_mb(101, profile_dir) == 8.0
@pytest.fixture
def memory_regression_server() -> Generator[tuple[str, str]]:
"""Start DevUI with a synthetic streaming agent and yield the base URL plus entity ID."""
server = DevServer(host="127.0.0.1", port=0)
server.register_entities([
MemoryStressAgent(
id="memory-stream-agent",
name="MemoryStreamAgent",
description="Streams many small chunks for UI memory profiling.",
chunk_count=_STREAM_CHUNK_COUNT,
chunk_size=_STREAM_CHUNK_SIZE,
delay_ms=1.0,
)
])
app = server.get_app()
server_config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=0,
log_level="error",
ws="none",
)
server_instance = uvicorn.Server(server_config)
def run_server() -> None:
asyncio.run(server_instance.serve())
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
actual_port, entity_id = _wait_for_server_details(server_instance)
yield f"http://127.0.0.1:{actual_port}", entity_id
with contextlib.suppress(Exception):
server_instance.should_exit = True
server_thread.join(timeout=5)
async def _wait_for_expression(
client: _CDPClient,
*,
session_id: str,
expression: str,
timeout_s: float,
) -> Any:
deadline = time.monotonic() + timeout_s
while time.monotonic() < deadline:
value = await client.evaluate(expression, session_id=session_id)
if value:
return value
await asyncio.sleep(0.1)
raise AssertionError(f"Timed out waiting for expression: {expression}")
async def test_devui_streaming_renderer_memory_is_bounded(
memory_regression_server: tuple[str, str],
) -> None:
"""Fail when frontend renderer memory grows unbounded during streaming."""
browser_path = _find_browser_executable()
if browser_path is None:
pytest.skip("No Chromium-based browser found for DevUI memory regression test")
base_url, entity_id = memory_regression_server
debug_port = _find_available_port()
with tempfile.TemporaryDirectory(prefix="devui-memory-browser-") as profile_dir:
browser_process = _launch_browser_process(
browser_path=browser_path,
debug_port=debug_port,
profile_dir=profile_dir,
)
try:
websocket_url = await _get_devtools_websocket_url(debug_port)
async with websocket_connect(websocket_url, max_size=None) as websocket:
client = _CDPClient(websocket)
target = await client.send("Target.createTarget", {"url": "about:blank"})
target_id = target["targetId"]
attached = await client.send(
"Target.attachToTarget",
{"targetId": target_id, "flatten": True},
)
session_id = attached["sessionId"]
await client.send("Page.enable", session_id=session_id)
await client.send("Runtime.enable", session_id=session_id)
await client.send(
"Page.navigate",
{"url": f"{base_url}/?entity_id={entity_id}"},
session_id=session_id,
)
await _wait_for_expression(
client,
session_id=session_id,
expression=(
"Boolean("
"document.querySelector('textarea') && "
"document.querySelector('button[aria-label=\"Send message\"]')"
")"
),
timeout_s=30.0,
)
start_renderer_rss_mb = _sample_peak_renderer_rss_mb(
browser_process.pid,
profile_dir,
)
await client.evaluate(
"""
(() => {
const textarea = document.querySelector("textarea");
const valueSetter = Object.getOwnPropertyDescriptor(
HTMLTextAreaElement.prototype,
"value"
).set;
valueSetter.call(textarea, "Stream a very long answer.");
textarea.dispatchEvent(new Event("input", { bubbles: true }));
document.querySelector('button[aria-label="Send message"]').click();
return true;
})()
""",
session_id=session_id,
)
await _wait_for_expression(
client,
session_id=session_id,
expression="Boolean(document.querySelector('button[aria-label=\"Stop generating response\"]'))",
timeout_s=10.0,
)
await asyncio.sleep(_POST_SEND_DELAY_S)
peak_renderer_rss_mb = start_renderer_rss_mb
samples: list[tuple[float, float]] = [(0.0, start_renderer_rss_mb)]
start_time = time.monotonic()
while time.monotonic() - start_time < _SAMPLE_WINDOW_S:
current_sample = _sample_peak_renderer_rss_mb(
browser_process.pid,
profile_dir,
)
elapsed_s = round(time.monotonic() - start_time, 2)
samples.append((elapsed_s, current_sample))
peak_renderer_rss_mb = max(peak_renderer_rss_mb, current_sample)
if peak_renderer_rss_mb - start_renderer_rss_mb > _MAX_RENDERER_GROWTH_MB:
break
await asyncio.sleep(_SAMPLE_INTERVAL_S)
renderer_growth_mb = round(peak_renderer_rss_mb - start_renderer_rss_mb, 2)
assert renderer_growth_mb <= _MAX_RENDERER_GROWTH_MB, (
"DevUI renderer memory grew too much during a ~1.5 MB streaming response. "
f"start={start_renderer_rss_mb:.2f}MB "
f"peak={peak_renderer_rss_mb:.2f}MB "
f"growth={renderer_growth_mb:.2f}MB "
f"budget={_MAX_RENDERER_GROWTH_MB:.2f}MB "
f"samples={samples}"
)
finally:
_shutdown_browser_process(browser_process, profile_dir=profile_dir)