Python: Improve DevUI, add Context Inspector view as new tab under traces (#2742)

* Improve DevUI, add Context Inspector view as new tab under traces

* fix mypy errors

* fix: Handle stale MCP connections in DevUI executor

MCP tools can become stale when HTTP streaming responses end - the underlying
stdio streams close but `is_connected` remains True. This causes subsequent
requests to fail with `ClosedResourceError`.

Add `_ensure_mcp_connections()` to detect and reconnect stale MCP tools before
agent execution. This is a workaround for an upstream Agent Framework issue
where connection state isn't properly tracked.

Fixes MCP tools failing on second HTTP request in DevUI.

fixes  #1476 #1515 #2865

* fix #1572 report import dependency errors more clearly

* Ensure there is streaming toggle where users can select streaming vs non streaming mode in devui . Fixes .NET: [Python] DevUI tool call rendering in non-streaming mode?

* remove unused dead code

* improve ux - workflows with agents show a chat component in execution timelien, also ensure magentic final output shows correctly

* update ui build

* update devui to use instrumentation instead of tracing, other instrumentation and type/instance check fixes
This commit is contained in:
Victor Dibia
2026-01-07 00:26:08 -08:00
committed by GitHub
Unverified
parent db283cd396
commit 2e1189ca65
36 changed files with 7430 additions and 1662 deletions
@@ -94,7 +94,7 @@ def serve(
auto_open: bool = False,
cors_origins: list[str] | None = None,
ui_enabled: bool = True,
tracing_enabled: bool = False,
instrumentation_enabled: bool = False,
mode: str = "developer",
auth_enabled: bool = False,
auth_token: str | None = None,
@@ -109,7 +109,7 @@ def serve(
auto_open: Whether to automatically open browser
cors_origins: List of allowed CORS origins
ui_enabled: Whether to enable the UI
tracing_enabled: Whether to enable OpenTelemetry tracing
instrumentation_enabled: Whether to enable OpenTelemetry instrumentation
mode: Server mode - 'developer' (full access, verbose errors) or 'user' (restricted APIs, generic errors)
auth_enabled: Whether to enable Bearer token authentication
auth_token: Custom authentication token (auto-generated if not provided with auth_enabled=True)
@@ -172,22 +172,12 @@ def serve(
os.environ["AUTH_REQUIRED"] = "true"
os.environ["DEVUI_AUTH_TOKEN"] = auth_token
# Configure tracing environment variables if enabled
if tracing_enabled:
import os
# Enable instrumentation if requested
if instrumentation_enabled:
from agent_framework.observability import enable_instrumentation
# Only set if not already configured by user
if not os.environ.get("ENABLE_INSTRUMENTATION"):
os.environ["ENABLE_INSTRUMENTATION"] = "true"
logger.info("Set ENABLE_INSTRUMENTATION=true for tracing")
if not os.environ.get("ENABLE_SENSITIVE_DATA"):
os.environ["ENABLE_SENSITIVE_DATA"] = "true"
logger.info("Set ENABLE_SENSITIVE_DATA=true for tracing")
if not os.environ.get("OTLP_ENDPOINT"):
os.environ["OTLP_ENDPOINT"] = "http://localhost:4317"
logger.info("Set OTLP_ENDPOINT=http://localhost:4317 for tracing")
enable_instrumentation(enable_sensitive_data=True)
logger.info("Enabled Agent Framework instrumentation with sensitive data")
# Create server with direct parameters
server = DevServer(
@@ -28,7 +28,7 @@ Examples:
devui ./agents # Scan specific directory
devui --port 8000 # Custom port
devui --headless # API only, no UI
devui --tracing # Enable OpenTelemetry tracing
devui --instrumentation # Enable OpenTelemetry instrumentation
""",
)
@@ -53,7 +53,7 @@ Examples:
parser.add_argument("--reload", action="store_true", help="Enable auto-reload for development")
parser.add_argument("--tracing", action="store_true", help="Enable OpenTelemetry tracing for Agent Framework")
parser.add_argument("--instrumentation", action="store_true", help="Enable OpenTelemetry instrumentation")
parser.add_argument(
"--mode",
@@ -182,7 +182,7 @@ def main() -> None:
host=args.host,
auto_open=not args.no_open,
ui_enabled=ui_enabled,
tracing_enabled=args.tracing,
instrumentation_enabled=args.instrumentation,
mode=mode,
auth_enabled=args.auth,
auth_token=args.auth_token, # Pass through explicit token only
@@ -176,6 +176,31 @@ class ConversationStore(ABC):
"""
pass
@abstractmethod
def add_trace(self, conversation_id: str, trace_event: dict[str, Any]) -> None:
"""Add a trace event to the conversation for context inspection.
Traces capture execution metadata like token usage, timing, and LLM context
that isn't stored in the AgentThread but is useful for debugging.
Args:
conversation_id: Conversation ID
trace_event: Trace event data (from ResponseTraceEvent.data)
"""
pass
@abstractmethod
def get_traces(self, conversation_id: str) -> list[dict[str, Any]]:
"""Get all trace events for a conversation.
Args:
conversation_id: Conversation ID
Returns:
List of trace event dicts, or empty list if not found
"""
pass
class InMemoryConversationStore(ConversationStore):
"""In-memory conversation storage wrapping AgentThread.
@@ -215,6 +240,7 @@ class InMemoryConversationStore(ConversationStore):
"metadata": metadata or {},
"created_at": created_at,
"items": [],
"traces": [], # Trace events for context inspection (token usage, timing, etc.)
}
# Initialize item index for this conversation
@@ -407,10 +433,20 @@ class InMemoryConversationStore(ConversationStore):
elif content_type == "function_result":
# Function result - create separate ConversationItem
call_id = getattr(content, "call_id", None)
# Output is stored in additional_properties
output = ""
if hasattr(content, "additional_properties"):
output = content.additional_properties.get("output", "")
# Output is stored in the 'result' field of FunctionResultContent
result_value = getattr(content, "result", None)
# Convert result to string (it could be dict, list, or other types)
if result_value is None:
output = ""
elif isinstance(result_value, str):
output = result_value
else:
import json
try:
output = json.dumps(result_value)
except (TypeError, ValueError):
output = str(result_value)
if call_id:
function_results.append(
@@ -556,6 +592,34 @@ class InMemoryConversationStore(ConversationStore):
conv_data = self._conversations.get(conversation_id)
return conv_data["thread"] if conv_data else None
def add_trace(self, conversation_id: str, trace_event: dict[str, Any]) -> None:
"""Add a trace event to the conversation for context inspection.
Traces capture execution metadata like token usage, timing, and LLM context
that isn't stored in the AgentThread but is useful for debugging.
Args:
conversation_id: Conversation ID
trace_event: Trace event data (from ResponseTraceEvent.data)
"""
conv_data = self._conversations.get(conversation_id)
if conv_data:
traces = conv_data.get("traces", [])
traces.append(trace_event)
conv_data["traces"] = traces
def get_traces(self, conversation_id: str) -> list[dict[str, Any]]:
"""Get all trace events for a conversation.
Args:
conversation_id: Conversation ID
Returns:
List of trace event dicts, or empty list if not found
"""
conv_data = self._conversations.get(conversation_id)
return conv_data.get("traces", []) if conv_data else []
async def list_conversations_by_metadata(self, metadata_filter: dict[str, str]) -> list[Conversation]:
"""Filter conversations by metadata (e.g., agent_id)."""
results = []
@@ -666,7 +666,16 @@ class EntityDiscovery:
logger.debug(f"Successfully imported {pattern}")
return module, None
except ModuleNotFoundError:
except ModuleNotFoundError as e:
# Distinguish between "module pattern doesn't exist" vs "module has import errors"
# If the missing module is the pattern itself, it's just not found (try next pattern)
# If the missing module is something else (a dependency), capture the error
missing_module = getattr(e, "name", None)
if missing_module and missing_module != pattern and not pattern.endswith(f".{missing_module}"):
# The module exists but has an import error (missing dependency)
logger.warning(f"Error importing {pattern}: {e}")
return None, e
# The module pattern itself doesn't exist - this is expected, try next pattern
logger.debug(f"Import pattern {pattern} not found")
return None, None
except Exception as e:
@@ -4,7 +4,6 @@
import json
import logging
import os
from collections.abc import AsyncGenerator
from typing import Any
@@ -45,8 +44,8 @@ class AgentFrameworkExecutor:
"""
self.entity_discovery = entity_discovery
self.message_mapper = message_mapper
self._setup_tracing_provider()
self._setup_agent_framework_tracing()
self._setup_instrumentation_provider()
self._setup_agent_framework_instrumentation()
# Use provided conversation store or default to in-memory
self.conversation_store = conversation_store or InMemoryConversationStore()
@@ -56,7 +55,7 @@ class AgentFrameworkExecutor:
self.checkpoint_manager = CheckpointConversationManager(self.conversation_store)
def _setup_tracing_provider(self) -> None:
def _setup_instrumentation_provider(self) -> None:
"""Set up our own TracerProvider so we can add processors."""
try:
from opentelemetry import trace
@@ -71,7 +70,7 @@ class AgentFrameworkExecutor:
})
provider = TracerProvider(resource=resource)
trace.set_tracer_provider(provider)
logger.info("Set up TracerProvider for server tracing")
logger.info("Set up TracerProvider for instrumentation")
else:
logger.debug("TracerProvider already exists")
@@ -80,25 +79,86 @@ class AgentFrameworkExecutor:
except Exception as e:
logger.warning(f"Failed to setup TracerProvider: {e}")
def _setup_agent_framework_tracing(self) -> None:
"""Set up Agent Framework's built-in tracing."""
# Configure Agent Framework tracing only if ENABLE_INSTRUMENTATION is set
if os.environ.get("ENABLE_INSTRUMENTATION"):
try:
from agent_framework.observability import OBSERVABILITY_SETTINGS, configure_otel_providers
def _setup_agent_framework_instrumentation(self) -> None:
"""Set up Agent Framework's built-in instrumentation."""
try:
from agent_framework.observability import OBSERVABILITY_SETTINGS, configure_otel_providers
# Only configure if not already executed
# Configure if instrumentation is enabled (via enable_instrumentation() or env var)
if OBSERVABILITY_SETTINGS.ENABLED:
# Only configure providers if not already executed
if not OBSERVABILITY_SETTINGS._executed_setup:
# Run the configure_otel_providers
# This ensures OTLP exporters are created even if env vars were set late
configure_otel_providers(enable_sensitive_data=True)
# Call configure_otel_providers to set up exporters.
# If OTEL_EXPORTER_OTLP_ENDPOINT is set, exporters will be created automatically.
# If not set, no exporters are created (no console spam), but DevUI's
# TracerProvider from _setup_instrumentation_provider() remains active for local capture.
configure_otel_providers(enable_sensitive_data=OBSERVABILITY_SETTINGS.SENSITIVE_DATA_ENABLED)
logger.info("Enabled Agent Framework observability")
else:
logger.debug("Agent Framework observability already configured")
else:
logger.debug("Instrumentation not enabled, skipping observability setup")
except Exception as e:
logger.warning(f"Failed to enable Agent Framework observability: {e}")
async def _ensure_mcp_connections(self, agent: Any) -> None:
"""Ensure MCP tool connections are healthy before agent execution.
This is a workaround for an Agent Framework bug where MCP tool connections
can become stale (underlying streams closed) but is_connected remains True.
This happens when HTTP streaming responses end and GeneratorExit propagates.
This method detects stale connections and reconnects them. It's designed to
be a no-op once the Agent Framework fixes this issue upstream.
Args:
agent: Agent object that may have MCP tools
"""
if not hasattr(agent, "_local_mcp_tools"):
return
for mcp_tool in agent._local_mcp_tools:
if not getattr(mcp_tool, "is_connected", False):
continue
tool_name = getattr(mcp_tool, "name", "unknown")
try:
# Check if underlying write stream is closed
session = getattr(mcp_tool, "session", None)
if session is None:
continue
write_stream = getattr(session, "_write_stream", None)
if write_stream is None:
continue
# Detect stale connection: is_connected=True but stream is closed
is_closed = getattr(write_stream, "_closed", False)
if not is_closed:
continue # Connection is healthy
# Stale connection detected - reconnect
logger.warning(f"MCP tool '{tool_name}' has stale connection (stream closed), reconnecting...")
# Clean up old connection
try:
if hasattr(mcp_tool, "close"):
await mcp_tool.close()
except Exception as close_err:
logger.debug(f"Error closing stale MCP tool '{tool_name}': {close_err}")
# Force reset state
mcp_tool.is_connected = False
mcp_tool.session = None
# Reconnect
if hasattr(mcp_tool, "connect"):
await mcp_tool.connect()
logger.info(f"MCP tool '{tool_name}' reconnected successfully")
except Exception as e:
logger.warning(f"Failed to enable Agent Framework observability: {e}")
else:
logger.debug("ENABLE_INSTRUMENTATION not set, skipping observability setup")
# If detection fails, log and continue - let it fail naturally during execution
logger.debug(f"Error checking MCP tool '{tool_name}' connection: {e}")
async def discover_entities(self) -> list[EntityInfo]:
"""Discover all available entities.
@@ -192,11 +252,11 @@ class AgentFrameworkExecutor:
logger.info(f"Executing {entity_info.type}: {entity_id}")
# Extract session_id from request for trace context
session_id = getattr(request.extra_body, "session_id", None) if request.extra_body else None
# Extract response_id from request for trace context (added by _server.py)
response_id = request.extra_body.get("response_id") if request.extra_body else None
# Use simplified trace capture
with capture_traces(session_id=session_id, entity_id=entity_id) as trace_collector:
with capture_traces(response_id=response_id, entity_id=entity_id) as trace_collector:
if entity_info.type == "agent":
async for event in self._execute_agent(entity_obj, request, trace_collector):
yield event
@@ -260,6 +320,12 @@ class AgentFrameworkExecutor:
logger.debug(f"Executing agent with text input: {user_message[:100]}...")
else:
logger.debug(f"Executing agent with multimodal ChatMessage: {type(user_message)}")
# Workaround for MCP tool stale connection bug (GitHub issue pending)
# When HTTP streaming ends, GeneratorExit can close MCP stdio streams
# but is_connected stays True. Detect and reconnect before execution.
await self._ensure_mcp_connections(agent)
# Check if agent supports streaming
if hasattr(agent, "run_stream") and callable(agent.run_stream):
# Use Agent Framework's native streaming with optional thread
@@ -12,6 +12,7 @@ from datetime import datetime
from typing import Any, Union
from uuid import uuid4
from agent_framework import ChatMessage, TextContent
from openai.types.responses import (
Response,
ResponseContentPartAddedEvent,
@@ -225,27 +226,128 @@ class MessageMapper:
Final aggregated OpenAI response
"""
try:
# Extract text content from events
content_parts = []
# Collect output items in order
output_items: list[Any] = []
# Track text content parts per message (keyed by item_id)
text_parts_by_message: dict[str, list[str]] = {}
# Track function calls (keyed by call_id) to accumulate arguments
function_calls: dict[str, dict[str, Any]] = {}
# Track function results (keyed by call_id)
function_results: dict[str, dict[str, Any]] = {}
for event in events:
# Extract delta text from ResponseTextDeltaEvent
if hasattr(event, "delta") and hasattr(event, "type") and event.type == "response.output_text.delta":
content_parts.append(event.delta)
event_type = getattr(event, "type", None)
# Combine content
full_content = "".join(content_parts)
# Handle text deltas - accumulate text per message
if event_type == "response.output_text.delta":
item_id = getattr(event, "item_id", "default")
if item_id not in text_parts_by_message:
text_parts_by_message[item_id] = []
text_parts_by_message[item_id].append(event.delta)
# Create proper OpenAI Response
response_output_text = ResponseOutputText(type="output_text", text=full_content, annotations=[])
# Handle output_item.added events (function_call, message, etc.)
elif event_type == "response.output_item.added":
item = getattr(event, "item", None)
if item:
# Handle both object and dict formats
item_type = item.get("type") if isinstance(item, dict) else getattr(item, "type", None)
response_output_message = ResponseOutputMessage(
type="message",
role="assistant",
content=[response_output_text],
id=f"msg_{uuid.uuid4().hex[:8]}",
status="completed",
)
# Track function calls to accumulate their arguments
if item_type == "function_call":
# Handle both object and dict formats
if isinstance(item, dict):
call_id = item.get("call_id") or item.get("id")
if call_id:
function_calls[call_id] = {
"id": item.get("id", call_id),
"call_id": call_id,
"name": item.get("name", ""),
"arguments": item.get("arguments", ""),
"type": "function_call",
"status": item.get("status", "completed"),
}
else:
call_id = getattr(item, "call_id", None) or getattr(item, "id", None)
if call_id:
function_calls[call_id] = {
"id": getattr(item, "id", call_id),
"call_id": call_id,
"name": getattr(item, "name", ""),
"arguments": getattr(item, "arguments", ""),
"type": "function_call",
"status": getattr(item, "status", "completed"),
}
# Other output items (message, etc.) - track for later
elif item_type == "message":
# Messages will be built from text_parts_by_message
pass
# Handle function call arguments delta - accumulate arguments
elif event_type == "response.function_call_arguments.delta":
item_id = getattr(event, "item_id", None)
delta = getattr(event, "delta", "")
# item_id for function calls is the call_id
if item_id and item_id in function_calls:
function_calls[item_id]["arguments"] += delta
# Handle function result complete events
elif event_type == "response.function_result.complete":
call_id = getattr(event, "call_id", None)
if call_id:
function_results[call_id] = {
"type": "function_call_output",
"call_id": call_id,
"output": getattr(event, "output", ""),
"status": getattr(event, "status", "completed"),
}
# Build output array in order: function_calls, then final message
# Add function call items
for _call_id, fc_data in function_calls.items():
output_items.append(ResponseFunctionToolCall(**fc_data))
# Note: function_call_output items are NOT added to output array
# In OpenAI's Responses API, function results are user inputs, not assistant outputs
# The function_results dict is kept for potential future use or debugging
# but we don't include them in the Response output
_ = function_results # Acknowledge but don't use
# Build final text message from accumulated deltas
# Combine all text parts (usually there's just one message)
all_text_parts = []
for _item_id, parts in text_parts_by_message.items():
all_text_parts.extend(parts)
full_content = "".join(all_text_parts)
# Only add message if there's text content
if full_content:
response_output_text = ResponseOutputText(type="output_text", text=full_content, annotations=[])
response_output_message = ResponseOutputMessage(
type="message",
role="assistant",
content=[response_output_text],
id=f"msg_{uuid.uuid4().hex[:8]}",
status="completed",
)
output_items.append(response_output_message)
# If no output items at all, create an empty message
if not output_items:
response_output_text = ResponseOutputText(type="output_text", text="", annotations=[])
response_output_message = ResponseOutputMessage(
type="message",
role="assistant",
content=[response_output_text],
id=f"msg_{uuid.uuid4().hex[:8]}",
status="completed",
)
output_items.append(response_output_message)
# Get usage from accumulator (OpenAI standard)
request_id = str(id(request))
@@ -278,7 +380,7 @@ class MessageMapper:
object="response",
created_at=datetime.now().timestamp(),
model=request.model or "devui",
output=[response_output_message],
output=output_items,
usage=usage,
parallel_tool_calls=False,
tool_choice="none",
@@ -501,7 +603,7 @@ class MessageMapper:
return events
# Check if we're streaming text content
has_text_content = any(content.__class__.__name__ == "TextContent" for content in update.contents)
has_text_content = any(isinstance(content, TextContent) for content in update.contents)
# Check if we're in an executor context with an existing item
executor_id = context.get("current_executor_id")
@@ -791,17 +893,35 @@ class MessageMapper:
# Extract text from output data based on type
text = None
if hasattr(output_data, "__class__") and output_data.__class__.__name__ == "ChatMessage":
if isinstance(output_data, ChatMessage):
# Handle ChatMessage (from Magentic and AgentExecutor with output_response=True)
text = getattr(output_data, "text", None)
if not text:
# Fallback to string representation
text = str(output_data)
elif isinstance(output_data, list):
# Handle list of ChatMessage objects (from Magentic yield_output([final_answer]))
text_parts = []
for item in output_data:
if isinstance(item, ChatMessage):
item_text = getattr(item, "text", None)
if item_text:
text_parts.append(item_text)
else:
text_parts.append(str(item))
elif isinstance(item, str):
text_parts.append(item)
else:
try:
text_parts.append(json.dumps(item, indent=2))
except (TypeError, ValueError):
text_parts.append(str(item))
text = "\n".join(text_parts) if text_parts else str(output_data)
elif isinstance(output_data, str):
# String output
text = output_data
else:
# Object/dict/list → JSON string
# Object/dict → JSON string
try:
text = json.dumps(output_data, indent=2)
except (TypeError, ValueError):
@@ -1081,275 +1201,6 @@ class MessageMapper:
return [trace_event]
# Handle Magentic-specific events
if event_class == "MagenticAgentDeltaEvent":
agent_id = getattr(event, "agent_id", "unknown_agent")
text = getattr(event, "text", None)
if text:
# Check if we're inside an executor - route to executor's item
# This prevents duplicate timeline entries (executor + inner agent)
current_executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{current_executor_id}" if current_executor_id else None
if executor_item_key and executor_item_key in context:
# Route delta to the executor's item instead of creating a new message item
item_id = context[executor_item_key]
# Emit text delta event routed to the executor's item
return [
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context.get("output_index", 0),
content_index=0,
item_id=item_id,
delta=text,
logprobs=[],
sequence_number=self._next_sequence(context),
)
]
# Fallback: No executor context - create separate message item (original behavior)
# This handles cases where MagenticAgentDeltaEvent is emitted outside an executor
events = []
# Track Magentic agent messages separately from regular messages
# Use timestamp to ensure uniqueness for multiple runs of same agent
magentic_key = f"magentic_message_{agent_id}"
# Check if this is the first delta from this agent (need to create message container)
if magentic_key not in context:
# Create a unique message ID for this agent's streaming session
message_id = f"msg_{agent_id}_{uuid4().hex[:8]}"
context[magentic_key] = message_id
context["output_index"] = context.get("output_index", -1) + 1
# Import required types for creating message containers
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_content_part_added_event import (
ResponseContentPartAddedEvent,
)
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
# Emit message output item (container for the agent's message)
# This matches what _convert_agent_update does for regular agents
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message",
id=message_id,
role="assistant",
content=[],
status="in_progress",
# Add metadata to identify this as a Magentic agent message
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
),
)
)
# Add content part for text (establishes the text container)
events.append(
ResponseContentPartAddedEvent(
type="response.content_part.added",
output_index=context["output_index"],
content_index=0,
item_id=message_id,
sequence_number=self._next_sequence(context),
part=ResponseOutputText(type="output_text", text="", annotations=[]),
)
)
# Get the message ID for this agent
message_id = context[magentic_key]
# Emit text delta event using the message ID (matches regular agent behavior)
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context["output_index"],
content_index=0, # Always 0 for single text content
item_id=message_id,
delta=text,
logprobs=[],
sequence_number=self._next_sequence(context),
)
)
return events
# Handle function calls from Magentic agents
if getattr(event, "function_call_id", None) and getattr(event, "function_call_name", None):
# Handle function call initiation
function_call_id = getattr(event, "function_call_id", None)
function_call_name = getattr(event, "function_call_name", None)
function_call_arguments = getattr(event, "function_call_arguments", None)
# Track function call for accumulating arguments
context["active_function_calls"][function_call_id] = {
"item_id": function_call_id,
"name": function_call_name,
"arguments_chunks": [],
}
# Emit function call output item
return [
ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseFunctionToolCall(
id=function_call_id,
call_id=function_call_id,
name=function_call_name,
arguments=json.dumps(function_call_arguments) if function_call_arguments else "",
type="function_call",
status="in_progress",
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
]
# For other non-text deltas, emit as trace for debugging
return [
ResponseTraceEventComplete(
type="response.trace.completed",
data={
"trace_type": "magentic_delta",
"agent_id": agent_id,
"function_call_id": getattr(event, "function_call_id", None),
"function_call_name": getattr(event, "function_call_name", None),
"function_result_id": getattr(event, "function_result_id", None),
"timestamp": datetime.now().isoformat(),
},
span_id=f"magentic_delta_{uuid4().hex[:8]}",
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
)
]
if event_class == "MagenticAgentMessageEvent":
agent_id = getattr(event, "agent_id", "unknown_agent")
message = getattr(event, "message", None)
# Check if we're inside an executor - if so, deltas were already routed there
# We don't need to emit a separate message completion event
current_executor_id = context.get("current_executor_id")
executor_item_key = f"exec_item_{current_executor_id}" if current_executor_id else None
if executor_item_key and executor_item_key in context:
# Deltas were routed to executor item - no separate message item to complete
# The executor's output_item.done will mark completion
logger.debug(
f"MagenticAgentMessageEvent from {agent_id} - "
f"deltas routed to executor {current_executor_id}, skipping"
)
return []
# Fallback: Handle case where we created a separate message item (no executor context)
magentic_key = f"magentic_message_{agent_id}"
# Check if we were streaming for this agent
if magentic_key in context:
# Mark the streaming message as complete
message_id = context[magentic_key]
# Import required types
from openai.types.responses import ResponseOutputMessage
from openai.types.responses.response_output_item_done_event import ResponseOutputItemDoneEvent
# Extract text from ChatMessage for the completed message
text = None
if message and hasattr(message, "text"):
text = message.text
# Emit output_item.done to mark message as complete
events = [
ResponseOutputItemDoneEvent(
type="response.output_item.done",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message",
id=message_id,
role="assistant",
content=[], # Content already streamed via deltas
status="completed",
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
),
)
]
# Clean up context for this agent
del context[magentic_key]
logger.debug(f"MagenticAgentMessageEvent from {agent_id} marked streaming message as complete")
return events
# No streaming occurred, create a complete message (shouldn't happen normally)
# Extract text from ChatMessage
text = None
if message and hasattr(message, "text"):
text = message.text
if text:
# Emit as output item for this agent
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
context["output_index"] = context.get("output_index", -1) + 1
text_content = ResponseOutputText(type="output_text", text=text, annotations=[])
output_message = ResponseOutputMessage(
type="message",
id=f"msg_{agent_id}_{uuid4().hex[:8]}",
role="assistant",
content=[text_content],
status="completed",
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
)
logger.debug(
f"MagenticAgentMessageEvent from {agent_id} converted to output_item.added (non-streaming)"
)
return [
ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=output_message,
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
]
if event_class == "MagenticOrchestratorMessageEvent":
orchestrator_id = getattr(event, "orchestrator_id", "orchestrator")
message = getattr(event, "message", None)
kind = getattr(event, "kind", "unknown")
# Extract text from ChatMessage
text = None
if message and hasattr(message, "text"):
text = message.text
# Emit as trace event for orchestrator messages (typically task ledger, instructions)
return [
ResponseTraceEventComplete(
type="response.trace.completed",
data={
"trace_type": "magentic_orchestrator",
"orchestrator_id": orchestrator_id,
"kind": kind,
"text": text or "",
"timestamp": datetime.now().isoformat(),
},
span_id=f"magentic_orch_{uuid4().hex[:8]}",
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
)
]
# For unknown/legacy events, still emit as workflow event for backward compatibility
# Get event data and serialize if it's a SerializationMixin
raw_event_data = getattr(event, "data", None)
@@ -407,7 +407,7 @@ class DevServer:
framework="agent_framework",
runtime="python", # Python DevUI backend
capabilities={
"tracing": os.getenv("ENABLE_INSTRUMENTATION") == "true",
"instrumentation": os.getenv("ENABLE_INSTRUMENTATION") == "true",
"openai_proxy": openai_executor.is_configured,
"deployment": True, # Deployment feature is available
},
@@ -748,6 +748,11 @@ class DevServer:
response_id = f"resp_{uuid.uuid4().hex[:8]}"
logger.info(f"[CANCELLATION] Creating response {response_id} for entity {entity_id}")
# Inject response_id into extra_body for trace context
if request.extra_body is None:
request.extra_body = {}
request.extra_body["response_id"] = response_id
return StreamingResponse(
self._stream_with_cancellation(executor, request, response_id),
media_type="text/event-stream",
@@ -1000,10 +1005,16 @@ class DevServer:
logger.warning(f"Unexpected item type: {type(item)}, converting to dict")
serialized_items.append(dict(item))
# Get stored traces for context inspection (DevUI extension)
traces = executor.conversation_store.get_traces(conversation_id)
return {
"object": "list",
"data": serialized_items,
"has_more": has_more,
"metadata": {
"traces": traces, # Trace events for token usage, timing, LLM context
},
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
@@ -1080,10 +1091,22 @@ class DevServer:
# Collect events for final response.completed event
events = []
# Get conversation_id for trace storage
conversation_id = request._get_conversation_id()
# Stream all events
async for event in executor.execute_streaming(request):
events.append(event)
# Store trace events for context inspection (persisted with conversation)
if conversation_id and hasattr(event, "type") and event.type == "response.trace.completed":
try:
trace_data = event.data if hasattr(event, "data") else None
if trace_data:
executor.conversation_store.add_trace(conversation_id, trace_data)
except Exception as e:
logger.debug(f"Failed to store trace event: {e}")
# IMPORTANT: Check model_dump_json FIRST because to_json() can have newlines (pretty-printing)
# which breaks SSE format. model_dump_json() returns single-line JSON.
if hasattr(event, "model_dump_json"):
@@ -18,14 +18,14 @@ logger = logging.getLogger(__name__)
class SimpleTraceCollector(SpanExporter):
"""Simple trace collector that captures spans for direct yielding."""
def __init__(self, session_id: str | None = None, entity_id: str | None = None) -> None:
def __init__(self, response_id: str | None = None, entity_id: str | None = None) -> None:
"""Initialize trace collector.
Args:
session_id: Session identifier for context
response_id: Response identifier for grouping traces by turn
entity_id: Entity identifier for context
"""
self.session_id = session_id
self.response_id = response_id
self.entity_id = entity_id
self.collected_events: list[ResponseTraceEvent] = []
@@ -93,7 +93,7 @@ class SimpleTraceCollector(SpanExporter):
"duration_ms": duration_ms,
"attributes": dict(span.attributes) if span.attributes else {},
"status": str(span.status.status_code) if hasattr(span, "status") else "OK",
"session_id": self.session_id,
"response_id": self.response_id,
"entity_id": self.entity_id,
}
@@ -121,18 +121,18 @@ class SimpleTraceCollector(SpanExporter):
@contextmanager
def capture_traces(
session_id: str | None = None, entity_id: str | None = None
response_id: str | None = None, entity_id: str | None = None
) -> Generator[SimpleTraceCollector, None, None]:
"""Context manager to capture traces during execution.
Args:
session_id: Session identifier for context
response_id: Response identifier for grouping traces by turn
entity_id: Entity identifier for context
Yields:
SimpleTraceCollector instance to get trace events from
"""
collector = SimpleTraceCollector(session_id, entity_id)
collector = SimpleTraceCollector(response_id, entity_id)
try:
from opentelemetry import trace
@@ -146,7 +146,7 @@ def capture_traces(
# Check if this is a real TracerProvider (not the default NoOpTracerProvider)
if isinstance(provider, TracerProvider):
provider.add_span_processor(processor)
logger.debug(f"Added trace collector to TracerProvider for session: {session_id}, entity: {entity_id}")
logger.debug(f"Added trace collector to TracerProvider for response: {response_id}, entity: {entity_id}")
try:
yield collector
@@ -390,7 +390,7 @@ class MetaResponse(BaseModel):
"""Backend runtime/language - 'python' or 'dotnet' for deployment guides and feature availability."""
capabilities: dict[str, bool] = {}
"""Server capabilities (e.g., tracing, openai_proxy)."""
"""Server capabilities (e.g., instrumentation, openai_proxy)."""
auth_required: bool = False
"""Whether the server requires Bearer token authentication."""
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long