Python: DevUI Fix Serialization, Timestamp and Other Issues (#1584)

* refactor(devui): adopt standard OpenAI lifecycle events for agents and workflows

- Replace custom workflow events with OpenAI Responses API standard lifecycle events
- Add AgentStartedEvent, AgentCompletedEvent, AgentFailedEvent for clean separation
- Implement ExecutorActionItem for workflow executor tracking
- Convert informational events to trace events to reduce noise
- Update README mapper table with comprehensive event mappings
- Maintain full backward compatibility with legacy events

* fix(devui): resolve timestamp overwriting and Content serialization errors

- Fix tool call timestamps being overwritten on each render (#1483)
- Add recursive Content serialization to handle ChatMessage and nested objects (#1548)
- Implement proper MCP tool cleanup on server shutdown
- Add timestamp field to function_result.complete events
- Enhance credential and client resource cleanup

Fixes #1483, #1548
Partial improvements for #1476
This commit is contained in:
Victor Dibia
2025-10-23 11:19:20 -07:00
committed by GitHub
Unverified
parent 064ee8afbe
commit 6b66a34609
21 changed files with 1859 additions and 682 deletions
+55 -20
View File
@@ -49,6 +49,19 @@ devui ./agents --port 8080
When DevUI starts with no discovered entities, it displays a **sample entity gallery** with curated examples from the Agent Framework repository. You can download these samples, review them, and run them locally to get started quickly.
## Using MCP Tools
**Important:** Don't use `async with` context managers when creating agents with MCP tools for DevUI - connections will close before execution.
```python
# ✅ Correct - DevUI handles cleanup automatically
mcp_tool = MCPStreamableHTTPTool(url="http://localhost:8011/mcp", chat_client=chat_client)
agent = ChatAgent(tools=mcp_tool)
serve(entities=[agent])
```
MCP tools use lazy initialization and connect automatically on first use. DevUI attempts to clean up connections on shutdown
## Directory Structure
For your agents to be discovered by the DevUI, they must be organized in a directory structure like below. Each agent/workflow must have an `__init__.py` that exports the required variable (`agent` or `workflow`).
@@ -157,42 +170,62 @@ Options:
Given that DevUI offers an OpenAI Responses API, it internally maps messages and events from Agent Framework to OpenAI Responses API events (in `_mapper.py`). For transparency, this mapping is shown below:
| Agent Framework Content | OpenAI Event/Type | Status |
| ------------------------------- | ---------------------------------------- | -------- |
| `TextContent` | `response.output_text.delta` | Standard |
| `TextReasoningContent` | `response.reasoning_text.delta` | Standard |
| `FunctionCallContent` (initial) | `response.output_item.added` | Standard |
| `FunctionCallContent` (args) | `response.function_call_arguments.delta` | Standard |
| `FunctionResultContent` | `response.function_result.complete` | DevUI |
| `FunctionApprovalRequestContent`| `response.function_approval.requested` | DevUI |
| `FunctionApprovalResponseContent`| `response.function_approval.responded` | DevUI |
| `ErrorContent` | `error` | Standard |
| `UsageContent` | Final `Response.usage` field (not streamed) | Standard |
| `WorkflowEvent` | `response.workflow_event.complete` | DevUI |
| `DataContent` | `response.trace.complete` | DevUI |
| `UriContent` | `response.trace.complete` | DevUI |
| `HostedFileContent` | `response.trace.complete` | DevUI |
| `HostedVectorStoreContent` | `response.trace.complete` | DevUI |
| OpenAI Event/Type | Agent Framework Content | Status |
| ------------------------------------------------------------ | --------------------------------- | -------- |
| | **Lifecycle Events** | |
| `response.created` + `response.in_progress` | `AgentStartedEvent` | OpenAI |
| `response.completed` | `AgentCompletedEvent` | OpenAI |
| `response.failed` | `AgentFailedEvent` | OpenAI |
| `response.created` + `response.in_progress` | `WorkflowStartedEvent` | OpenAI |
| `response.completed` | `WorkflowCompletedEvent` | OpenAI |
| `response.failed` | `WorkflowFailedEvent` | OpenAI |
| | **Content Types** | |
| `response.content_part.added` + `response.output_text.delta` | `TextContent` | OpenAI |
| `response.reasoning_text.delta` | `TextReasoningContent` | OpenAI |
| `response.output_item.added` | `FunctionCallContent` (initial) | OpenAI |
| `response.function_call_arguments.delta` | `FunctionCallContent` (args) | OpenAI |
| `response.function_result.complete` | `FunctionResultContent` | DevUI |
| `response.function_approval.requested` | `FunctionApprovalRequestContent` | DevUI |
| `response.function_approval.responded` | `FunctionApprovalResponseContent` | DevUI |
| `error` | `ErrorContent` | OpenAI |
| Final `Response.usage` field (not streamed) | `UsageContent` | OpenAI |
| | **Workflow Events** | |
| `response.output_item.added` (ExecutorActionItem)* | `ExecutorInvokedEvent` | OpenAI |
| `response.output_item.done` (ExecutorActionItem)* | `ExecutorCompletedEvent` | OpenAI |
| `response.output_item.done` (ExecutorActionItem with error)* | `ExecutorFailedEvent` | OpenAI |
| `response.workflow_event.complete` | `WorkflowEvent` (other) | DevUI |
| `response.trace.complete` | `WorkflowStatusEvent` | DevUI |
| `response.trace.complete` | `WorkflowWarningEvent` | DevUI |
| | **Trace Content** | |
| `response.trace.complete` | `DataContent` | DevUI |
| `response.trace.complete` | `UriContent` | DevUI |
| `response.trace.complete` | `HostedFileContent` | DevUI |
| `response.trace.complete` | `HostedVectorStoreContent` | DevUI |
- **Standard** = OpenAI Responses API spec
- **DevUI** = Custom extensions for Agent Framework features (workflows, traces, function approvals)
\*Uses standard OpenAI event structure but carries DevUI-specific `ExecutorActionItem` payload
- **OpenAI** = Standard OpenAI Responses API event types
- **DevUI** = Custom event types specific to Agent Framework (e.g., workflows, traces, function approvals)
### OpenAI Responses API Compliance
DevUI follows the OpenAI Responses API specification for maximum compatibility:
**Standard OpenAI Types Used:**
**OpenAI Standard Event Types Used:**
- `ResponseOutputItemAddedEvent` - Output item notifications (function calls and results)
- `ResponseOutputItemDoneEvent` - Output item completion notifications
- `Response.usage` - Token usage (in final response, not streamed)
- All standard text, reasoning, and function call events
**Custom DevUI Extensions:**
- `response.function_approval.requested` - Function approval requests (for interactive approval workflows)
- `response.function_approval.responded` - Function approval responses (user approval/rejection)
- `response.workflow_event.complete` - Agent Framework workflow events
- `response.trace.complete` - Execution traces and internal content (DataContent, UriContent, hosted files/stores)
These custom extensions are clearly namespaced and can be safely ignored by standard OpenAI clients.
These custom extensions are clearly namespaced and can be safely ignored by standard OpenAI clients. Note that DevUI also uses standard OpenAI events with custom payloads (e.g., `ExecutorActionItem` within `response.output_item.added`).
### Entity Management
@@ -224,12 +257,14 @@ These custom extensions are clearly namespaced and can be safely ignored by stan
DevUI is designed as a **sample application for local development** and should not be exposed to untrusted networks or used in production environments.
**Security features:**
- Only loads entities from local directories or in-memory registration
- No remote code execution capabilities
- Binds to localhost (127.0.0.1) by default
- All samples must be manually downloaded and reviewed before running
**Best practices:**
- Never expose DevUI to the internet
- Review all agent/workflow code before running
- Only load entities from trusted sources
@@ -127,7 +127,7 @@ class EntityDiscovery:
# Cache the loaded object
self._loaded_objects[entity_id] = entity_obj
logger.info(f"Successfully loaded entity: {entity_id} (type: {enriched_info.type})")
logger.info(f"Successfully loaded entity: {entity_id} (type: {enriched_info.type})")
return entity_obj
@@ -217,7 +217,7 @@ class EntityDiscovery:
if entity_info and "lazy_loaded" in entity_info.metadata:
entity_info.metadata["lazy_loaded"] = False
logger.info(f"♻️ Entity invalidated: {entity_id} (will reload on next access)")
logger.info(f"Entity invalidated: {entity_id} (will reload on next access)")
def invalidate_all(self) -> None:
"""Invalidate all cached entities.
@@ -217,6 +217,11 @@ class AgentFrameworkExecutor:
Agent update events and trace events
"""
try:
# Emit agent lifecycle start event
from .models._openai_custom import AgentStartedEvent
yield AgentStartedEvent()
# Convert input to proper ChatMessage or string
user_message = self._convert_input_to_chat_message(request.input)
@@ -266,8 +271,19 @@ class AgentFrameworkExecutor:
else:
raise ValueError("Agent must implement either run() or run_stream() method")
# Emit agent lifecycle completion event
from .models._openai_custom import AgentCompletedEvent
yield AgentCompletedEvent()
except Exception as e:
logger.error(f"Error in agent execution: {e}")
# Emit agent lifecycle failure event
from .models._openai_custom import AgentFailedEvent
yield AgentFailedEvent(error=e)
# Still yield the error for backward compatibility
yield {"type": "error", "message": f"Agent execution error: {e!s}"}
async def _execute_workflow(
@@ -284,14 +300,9 @@ class AgentFrameworkExecutor:
Workflow events and trace events
"""
try:
# Get input data - prefer structured data from extra_body
input_data: str | list[Any] | dict[str, Any]
if request.extra_body and isinstance(request.extra_body, dict) and request.extra_body.get("input_data"):
input_data = request.extra_body.get("input_data") # type: ignore
logger.debug(f"Using structured input_data from extra_body: {type(input_data)}")
else:
input_data = request.input
logger.debug(f"Using input field as fallback: {type(input_data)}")
# Get input data directly from request.input field
input_data = request.input
logger.debug(f"Using input field: {type(input_data)}")
# Parse input based on workflow's expected input type
parsed_input = await self._parse_workflow_input(workflow, input_data)
@@ -4,17 +4,32 @@
import json
import logging
import time
import uuid
from collections import OrderedDict
from collections.abc import Sequence
from datetime import datetime
from typing import Any, Union
from uuid import uuid4
from openai.types.responses import (
Response,
ResponseContentPartAddedEvent,
ResponseCreatedEvent,
ResponseError,
ResponseFailedEvent,
ResponseInProgressEvent,
)
from .models import (
AgentFrameworkRequest,
CustomResponseOutputItemAddedEvent,
CustomResponseOutputItemDoneEvent,
ExecutorActionItem,
InputTokensDetails,
OpenAIResponse,
OutputTokensDetails,
ResponseCompletedEvent,
ResponseErrorEvent,
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionResultComplete,
@@ -41,6 +56,56 @@ EventType = Union[
]
def _serialize_content_recursive(value: Any) -> Any:
"""Recursively serialize Agent Framework Content objects to JSON-compatible values.
This handles nested Content objects (like TextContent inside FunctionResultContent.result)
that can't be directly serialized by json.dumps().
Args:
value: Value to serialize (can be Content object, dict, list, primitive, etc.)
Returns:
JSON-serializable version with all Content objects converted to dicts/primitives
"""
# Handle None and basic JSON-serializable types
if value is None or isinstance(value, (str, int, float, bool)):
return value
# Check if it's a SerializationMixin (includes all Content types)
# Content objects have to_dict() method
if hasattr(value, "to_dict") and callable(getattr(value, "to_dict", None)):
try:
return value.to_dict()
except Exception as e:
# If to_dict() fails, fall through to other methods
logger.debug(f"Failed to serialize with to_dict(): {e}")
# Handle dictionaries - recursively process values
if isinstance(value, dict):
return {key: _serialize_content_recursive(val) for key, val in value.items()}
# Handle lists and tuples - recursively process elements
if isinstance(value, (list, tuple)):
serialized = [_serialize_content_recursive(item) for item in value]
# For single-item lists containing text Content, extract just the text
# This handles the MCP case where result = [TextContent(text="Hello")]
# and we want output = "Hello" not output = '[{"type": "text", "text": "Hello"}]'
if len(serialized) == 1 and isinstance(serialized[0], dict) and serialized[0].get("type") == "text":
return serialized[0].get("text", "")
return serialized
# For other objects with model_dump(), try that
if hasattr(value, "model_dump") and callable(getattr(value, "model_dump", None)):
try:
return value.model_dump()
except Exception as e:
logger.debug(f"Failed to serialize with model_dump(): {e}")
# Return as-is and let json.dumps handle it (may raise TypeError for non-serializable types)
return value
class MessageMapper:
"""Maps Agent Framework messages/responses to OpenAI format."""
@@ -102,6 +167,12 @@ class MessageMapper:
)
]
# Handle Agent lifecycle events first
from .models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent
if isinstance(raw_event, (AgentStartedEvent, AgentCompletedEvent, AgentFailedEvent)):
return await self._convert_agent_lifecycle_event(raw_event, context)
# Import Agent Framework types for proper isinstance checks
try:
from agent_framework import AgentRunResponse, AgentRunResponseUpdate, WorkflowEvent
@@ -245,6 +316,7 @@ class MessageMapper:
"content_index": 0,
"output_index": 0,
"request_id": str(request_key), # For usage accumulation
"request": request, # Store the request for model name access
# Track active function calls: {call_id: {name, item_id, args_chunks}}
"active_function_calls": {},
}
@@ -267,7 +339,7 @@ class MessageMapper:
return int(context["sequence_counter"])
async def _convert_agent_update(self, update: Any, context: dict[str, Any]) -> Sequence[Any]:
"""Convert AgentRunResponseUpdate to OpenAI events using comprehensive content mapping.
"""Convert agent text updates to proper content part events.
Args:
update: Agent run response update
@@ -283,10 +355,60 @@ class MessageMapper:
if not hasattr(update, "contents") or not update.contents:
return events
# Check if we're streaming text content
has_text_content = any(content.__class__.__name__ == "TextContent" for content in update.contents)
# If we have text content and haven't created a message yet, create one
if has_text_content and "current_message_id" not in context:
message_id = f"msg_{uuid4().hex[:8]}"
context["current_message_id"] = message_id
context["output_index"] = context.get("output_index", -1) + 1
# Add message output item
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message", id=message_id, role="assistant", content=[], status="in_progress"
),
)
)
# Add content part for text
context["content_index"] = 0
events.append(
ResponseContentPartAddedEvent(
type="response.content_part.added",
output_index=context["output_index"],
content_index=context["content_index"],
item_id=message_id,
sequence_number=self._next_sequence(context),
part=ResponseOutputText(type="output_text", text="", annotations=[]),
)
)
# Process each content item
for content in update.contents:
content_type = content.__class__.__name__
if content_type in self.content_mappers:
# Special handling for TextContent to use proper delta events
if content_type == "TextContent" and "current_message_id" in context:
# Stream text content via proper delta events
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context["output_index"],
content_index=context.get("content_index", 0),
item_id=context["current_message_id"],
delta=content.text,
logprobs=[], # We don't have logprobs from Agent Framework
sequence_number=self._next_sequence(context),
)
)
elif content_type in self.content_mappers:
# Use existing mappers for other content types
mapped_events = await self.content_mappers[content_type](content, context)
if mapped_events is not None: # Handle None returns (e.g., UsageContent)
if isinstance(mapped_events, list):
@@ -297,7 +419,9 @@ class MessageMapper:
# Graceful fallback for unknown content types
events.append(await self._create_unknown_content_event(content, context))
context["content_index"] += 1
# Don't increment content_index for text deltas within the same part
if content_type != "TextContent":
context["content_index"] = context.get("content_index", 0) + 1
except Exception as e:
logger.warning(f"Error converting agent update: {e}")
@@ -358,8 +482,105 @@ class MessageMapper:
return events
async def _convert_agent_lifecycle_event(self, event: Any, context: dict[str, Any]) -> Sequence[Any]:
"""Convert agent lifecycle events to OpenAI response events.
Args:
event: AgentStartedEvent, AgentCompletedEvent, or AgentFailedEvent
context: Conversion context
Returns:
List of OpenAI response stream events
"""
from .models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent
try:
# Get model name from context (the agent name)
model_name = context.get("request", {}).model if context.get("request") else "agent"
if isinstance(event, AgentStartedEvent):
execution_id = f"agent_{uuid4().hex[:12]}"
context["execution_id"] = execution_id
# Create Response object
response_obj = Response(
id=f"resp_{execution_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[],
status="in_progress",
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
# Emit both created and in_progress events
return [
ResponseCreatedEvent(
type="response.created", sequence_number=self._next_sequence(context), response=response_obj
),
ResponseInProgressEvent(
type="response.in_progress", sequence_number=self._next_sequence(context), response=response_obj
),
]
if isinstance(event, AgentCompletedEvent):
execution_id = context.get("execution_id", f"agent_{uuid4().hex[:12]}")
response_obj = Response(
id=f"resp_{execution_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[],
status="completed",
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
return [
ResponseCompletedEvent(
type="response.completed", sequence_number=self._next_sequence(context), response=response_obj
)
]
if isinstance(event, AgentFailedEvent):
execution_id = context.get("execution_id", f"agent_{uuid4().hex[:12]}")
# Create error object
response_error = ResponseError(
message=str(event.error) if event.error else "Unknown error", code="server_error"
)
response_obj = Response(
id=f"resp_{execution_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[],
status="failed",
error=response_error,
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
return [
ResponseFailedEvent(
type="response.failed", sequence_number=self._next_sequence(context), response=response_obj
)
]
return []
except Exception as e:
logger.warning(f"Error converting agent lifecycle event: {e}")
return [await self._create_error_event(str(e), context)]
async def _convert_workflow_event(self, event: Any, context: dict[str, Any]) -> Sequence[Any]:
"""Convert workflow event to structured OpenAI events.
"""Convert workflow events to standard OpenAI event objects.
Args:
event: Workflow event
@@ -369,22 +590,247 @@ class MessageMapper:
List of OpenAI response stream events
"""
try:
event_class = event.__class__.__name__
# Response-level events - construct proper OpenAI objects
if event_class == "WorkflowStartedEvent":
workflow_id = getattr(event, "workflow_id", str(uuid4()))
context["workflow_id"] = workflow_id
# Import Response type for proper construction
from openai.types.responses import Response
# Return proper OpenAI event objects
events: list[Any] = []
# Determine the model name - use request model or default to "workflow"
# The request model will be the agent name for agents, workflow name for workflows
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create a full Response object with all required fields
response_obj = Response(
id=f"resp_{workflow_id}",
object="response",
created_at=float(time.time()),
model=model_name, # Use the actual model/agent name
output=[], # Empty output list initially
status="in_progress",
# Required fields with safe defaults
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
# First emit response.created
events.append(
ResponseCreatedEvent(
type="response.created", sequence_number=self._next_sequence(context), response=response_obj
)
)
# Then emit response.in_progress (reuse same response object)
events.append(
ResponseInProgressEvent(
type="response.in_progress", sequence_number=self._next_sequence(context), response=response_obj
)
)
return events
if event_class in ["WorkflowCompletedEvent", "WorkflowOutputEvent"]:
workflow_id = context.get("workflow_id", str(uuid4()))
# Import Response type for proper construction
from openai.types.responses import Response
# Get model name from context
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create a full Response object for completed state
response_obj = Response(
id=f"resp_{workflow_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[], # Output should be populated by this point from text streaming
status="completed",
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
return [
ResponseCompletedEvent(
type="response.completed", sequence_number=self._next_sequence(context), response=response_obj
)
]
if event_class == "WorkflowFailedEvent":
workflow_id = context.get("workflow_id", str(uuid4()))
error_info = getattr(event, "error", None)
# Import Response and ResponseError types
from openai.types.responses import Response, ResponseError
# Get model name from context
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create error object
error_message = str(error_info) if error_info else "Unknown error"
# Create ResponseError object (code must be one of the allowed values)
response_error = ResponseError(
message=error_message,
code="server_error", # Use generic server_error code for workflow failures
)
# Create a full Response object for failed state
response_obj = Response(
id=f"resp_{workflow_id}",
object="response",
created_at=float(time.time()),
model=model_name,
output=[],
status="failed",
error=response_error,
parallel_tool_calls=False,
tool_choice="none",
tools=[],
)
return [
ResponseFailedEvent(
type="response.failed", sequence_number=self._next_sequence(context), response=response_obj
)
]
# Executor-level events (output items)
if event_class == "ExecutorInvokedEvent":
executor_id = getattr(event, "executor_id", "unknown")
item_id = f"exec_{executor_id}_{uuid4().hex[:8]}"
context[f"exec_item_{executor_id}"] = item_id
context["output_index"] = context.get("output_index", -1) + 1
# Create ExecutorActionItem with proper type
executor_item = ExecutorActionItem(
type="executor_action",
id=item_id,
executor_id=executor_id,
status="in_progress",
metadata=getattr(event, "metadata", {}),
)
# Use our custom event type that accepts ExecutorActionItem
return [
CustomResponseOutputItemAddedEvent(
type="response.output_item.added",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=executor_item,
)
]
if event_class == "ExecutorCompletedEvent":
executor_id = getattr(event, "executor_id", "unknown")
item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown")
# Create ExecutorActionItem with completed status
# ExecutorCompletedEvent uses 'data' field, not 'result'
executor_item = ExecutorActionItem(
type="executor_action",
id=item_id,
executor_id=executor_id,
status="completed",
result=getattr(event, "data", None),
)
# Use our custom event type
return [
CustomResponseOutputItemDoneEvent(
type="response.output_item.done",
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
item=executor_item,
)
]
if event_class == "ExecutorFailedEvent":
executor_id = getattr(event, "executor_id", "unknown")
item_id = context.get(f"exec_item_{executor_id}", f"exec_{executor_id}_unknown")
error_info = getattr(event, "error", None)
# Create ExecutorActionItem with failed status
executor_item = ExecutorActionItem(
type="executor_action",
id=item_id,
executor_id=executor_id,
status="failed",
error={"message": str(error_info)} if error_info else None,
)
# Use our custom event type
return [
CustomResponseOutputItemDoneEvent(
type="response.output_item.done",
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
item=executor_item,
)
]
# Handle informational workflow events (status, warnings, errors)
if event_class in ["WorkflowStatusEvent", "WorkflowWarningEvent", "WorkflowErrorEvent", "RequestInfoEvent"]:
# These are informational events that don't map to OpenAI lifecycle events
# Convert them to trace events for debugging visibility
event_data: dict[str, Any] = {}
# Extract relevant data based on event type
if event_class == "WorkflowStatusEvent":
event_data["state"] = str(getattr(event, "state", "unknown"))
elif event_class == "WorkflowWarningEvent":
event_data["message"] = str(getattr(event, "message", ""))
elif event_class == "WorkflowErrorEvent":
event_data["message"] = str(getattr(event, "message", ""))
event_data["error"] = str(getattr(event, "error", ""))
elif event_class == "RequestInfoEvent":
request_info = getattr(event, "data", {})
event_data["request_info"] = request_info if isinstance(request_info, dict) else str(request_info)
# Create a trace event for debugging
trace_event = ResponseTraceEventComplete(
type="response.trace.complete",
data={
"trace_type": "workflow_info",
"event_type": event_class,
"data": event_data,
"timestamp": datetime.now().isoformat(),
},
span_id=f"workflow_info_{uuid4().hex[:8]}",
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
)
return [trace_event]
# For unknown/legacy events, still emit as workflow event for backward compatibility
# Get event data and serialize if it's a SerializationMixin
event_data = getattr(event, "data", None)
if event_data is not None and hasattr(event_data, "to_dict"):
raw_event_data = getattr(event, "data", None)
serialized_event_data: dict[str, Any] | str | None = raw_event_data
if raw_event_data is not None and hasattr(raw_event_data, "to_dict"):
# SerializationMixin objects - convert to dict for JSON serialization
try:
event_data = event_data.to_dict()
serialized_event_data = raw_event_data.to_dict()
except Exception as e:
logger.debug(f"Failed to serialize event data with to_dict(): {e}")
event_data = str(event_data)
serialized_event_data = str(raw_event_data)
# Create structured workflow event
# Create structured workflow event (keeping for backward compatibility)
workflow_event = ResponseWorkflowEventComplete(
type="response.workflow_event.complete",
data={
"event_type": event.__class__.__name__,
"data": event_data,
"data": serialized_event_data,
"executor_id": getattr(event, "executor_id", None),
"timestamp": datetime.now().isoformat(),
},
@@ -394,6 +840,7 @@ class MessageMapper:
sequence_number=self._next_sequence(context),
)
logger.debug(f"Unhandled workflow event type: {event_class}, emitting as legacy workflow event")
return [workflow_event]
except Exception as e:
@@ -538,8 +985,16 @@ class MessageMapper:
result = getattr(content, "result", None)
exception = getattr(content, "exception", None)
# Convert result to string
output = result if isinstance(result, str) else json.dumps(result) if result is not None else ""
# Convert result to string, handling nested Content objects from MCP tools
if isinstance(result, str):
output = result
elif result is not None:
# Recursively serialize any nested Content objects (e.g., from MCP tools)
serialized = _serialize_content_recursive(result)
# Convert to JSON string if still not a string
output = serialized if isinstance(serialized, str) else json.dumps(serialized)
else:
output = ""
# Determine status based on exception
status = "incomplete" if exception else "completed"
@@ -556,6 +1011,7 @@ class MessageMapper:
item_id=item_id,
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
timestamp=datetime.now().isoformat(),
)
async def _map_error_content(self, content: Any, context: dict[str, Any]) -> ResponseErrorEvent:
@@ -723,7 +1179,7 @@ class MessageMapper:
async def _create_unknown_content_event(self, content: Any, context: dict[str, Any]) -> ResponseStreamEvent:
"""Create event for unknown content types."""
content_type = content.__class__.__name__
text = f"⚠️ Unknown content type: {content_type}\n"
text = f"Warning: Unknown content type: {content_type}\n"
return self._create_text_delta_event(text, context)
async def _create_error_response(self, error_message: str, request: AgentFrameworkRequest) -> OpenAIResponse:
@@ -85,19 +85,25 @@ class DevServer:
return self.executor
async def _cleanup_entities(self) -> None:
"""Cleanup entity resources (close clients, credentials, etc.)."""
"""Cleanup entity resources (close clients, MCP tools, credentials, etc.)."""
if not self.executor:
return
logger.info("Cleaning up entity resources...")
entities = self.executor.entity_discovery.list_entities()
closed_count = 0
mcp_tools_closed = 0
credentials_closed = 0
for entity_info in entities:
try:
entity_obj = self.executor.entity_discovery.get_entity_object(entity_info.id)
# Close chat clients and their credentials
if entity_obj and hasattr(entity_obj, "chat_client"):
client = entity_obj.chat_client
# Close the chat client itself
if hasattr(client, "close") and callable(client.close):
if inspect.iscoroutinefunction(client.close):
await client.close()
@@ -105,11 +111,47 @@ class DevServer:
client.close()
closed_count += 1
logger.debug(f"Closed client for entity: {entity_info.id}")
# Close credentials attached to chat clients (e.g., AzureCliCredential)
credential_attrs = ["credential", "async_credential", "_credential", "_async_credential"]
for attr in credential_attrs:
if hasattr(client, attr):
cred = getattr(client, attr)
if cred and hasattr(cred, "close") and callable(cred.close):
try:
if inspect.iscoroutinefunction(cred.close):
await cred.close()
else:
cred.close()
credentials_closed += 1
logger.debug(f"Closed credential for entity: {entity_info.id}")
except Exception as e:
logger.warning(f"Error closing credential for {entity_info.id}: {e}")
# Close MCP tools (framework tracks them in _local_mcp_tools)
if entity_obj and hasattr(entity_obj, "_local_mcp_tools"):
for mcp_tool in entity_obj._local_mcp_tools:
if hasattr(mcp_tool, "close") and callable(mcp_tool.close):
try:
if inspect.iscoroutinefunction(mcp_tool.close):
await mcp_tool.close()
else:
mcp_tool.close()
mcp_tools_closed += 1
tool_name = getattr(mcp_tool, "name", "unknown")
logger.debug(f"Closed MCP tool '{tool_name}' for entity: {entity_info.id}")
except Exception as e:
logger.warning(f"Error closing MCP tool for {entity_info.id}: {e}")
except Exception as e:
logger.warning(f"Error closing entity {entity_info.id}: {e}")
if closed_count > 0:
logger.info(f"Closed {closed_count} entity client(s)")
if credentials_closed > 0:
logger.info(f"Closed {credentials_closed} credential(s)")
if mcp_tools_closed > 0:
logger.info(f"Closed {mcp_tools_closed} MCP tool(s)")
def create_app(self) -> FastAPI:
"""Create the FastAPI application."""
@@ -30,6 +30,9 @@ from openai.types.shared import Metadata, ResponsesModel
from ._discovery_models import DiscoveryResponse, EntityInfo
from ._openai_custom import (
AgentFrameworkRequest,
CustomResponseOutputItemAddedEvent,
CustomResponseOutputItemDoneEvent,
ExecutorActionItem,
OpenAIError,
ResponseFunctionResultComplete,
ResponseTraceEvent,
@@ -46,8 +49,11 @@ __all__ = [
"Conversation",
"ConversationDeletedResource",
"ConversationItem",
"CustomResponseOutputItemAddedEvent",
"CustomResponseOutputItemDoneEvent",
"DiscoveryResponse",
"EntityInfo",
"ExecutorActionItem",
"InputTokensDetails",
"Metadata",
"OpenAIError",
@@ -8,6 +8,7 @@ to support Agent Framework specific features like workflows and traces.
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Literal
from pydantic import BaseModel, ConfigDict
@@ -15,6 +16,69 @@ from pydantic import BaseModel, ConfigDict
# Custom Agent Framework OpenAI event types for structured data
# Agent lifecycle events - simple and clear
class AgentStartedEvent:
"""Event emitted when an agent starts execution."""
pass
class AgentCompletedEvent:
"""Event emitted when an agent completes execution successfully."""
pass
@dataclass
class AgentFailedEvent:
"""Event emitted when an agent fails during execution."""
error: Exception | None = None
class ExecutorActionItem(BaseModel):
"""Custom item type for workflow executor actions.
This is a DevUI-specific extension to represent workflow executors as output items.
Since OpenAI's ResponseOutputItemAddedEvent only accepts specific item types,
and executor actions are not part of the standard, we need this custom type.
"""
type: Literal["executor_action"] = "executor_action"
id: str
executor_id: str
status: Literal["in_progress", "completed", "failed", "cancelled"] = "in_progress"
metadata: dict[str, Any] | None = None
result: Any | None = None
error: dict[str, Any] | None = None
class CustomResponseOutputItemAddedEvent(BaseModel):
"""Custom version of ResponseOutputItemAddedEvent that accepts any item type.
This allows us to emit executor action items while maintaining the same
event structure as OpenAI's standard.
"""
type: Literal["response.output_item.added"] = "response.output_item.added"
output_index: int
sequence_number: int
item: dict[str, Any] | ExecutorActionItem | Any # Flexible item type
class CustomResponseOutputItemDoneEvent(BaseModel):
"""Custom version of ResponseOutputItemDoneEvent that accepts any item type.
This allows us to emit executor action items while maintaining the same
event structure as OpenAI's standard.
"""
type: Literal["response.output_item.done"] = "response.output_item.done"
output_index: int
sequence_number: int
item: dict[str, Any] | ExecutorActionItem | Any # Flexible item type
class ResponseWorkflowEventComplete(BaseModel):
"""Complete workflow event data."""
@@ -57,6 +121,7 @@ class ResponseFunctionResultComplete(BaseModel):
item_id: str
output_index: int = 0
sequence_number: int
timestamp: str | None = None # Optional timestamp for UI display
# Agent Framework extension fields
@@ -64,7 +129,7 @@ class AgentFrameworkExtraBody(BaseModel):
"""Agent Framework specific routing fields for OpenAI requests."""
entity_id: str
input_data: dict[str, Any] | None = None
# input_data removed - now using standard input field for all data
model_config = ConfigDict(extra="allow")
@@ -80,7 +145,7 @@ class AgentFrameworkRequest(BaseModel):
# All OpenAI fields from ResponseCreateParams
model: str # Used as entity_id in DevUI!
input: str | list[Any] # ResponseInputParam
input: str | list[Any] | dict[str, Any] # ResponseInputParam + dict for workflow structured input
stream: bool | None = False
# OpenAI conversation parameter (standard!)
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -5,7 +5,7 @@
<link rel="icon" type="image/svg+xml" href="/agentframework.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Agent Framework Dev UI</title>
<script type="module" crossorigin src="/assets/index-DmL7WSFa.js"></script>
<script type="module" crossorigin src="/assets/index-D_Y1oSGu.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CE4pGoXh.css">
</head>
<body>
+3 -1
View File
@@ -241,6 +241,8 @@ export default function App() {
// Show error state if loading failed
if (entityError) {
const currentBackendUrl = apiClient.getBaseUrl();
return (
<div className="h-screen flex flex-col bg-background">
<AppHeader
@@ -290,7 +292,7 @@ export default function App() {
<p className="text-xs text-muted-foreground">
Default:{" "}
<span className="font-mono">http://localhost:8080</span>
<span className="font-mono">{currentBackendUrl}</span>
</p>
</div>
@@ -408,46 +408,82 @@ export function WorkflowView({
// This preserves the workflow's final output for display
};
// Extract workflow events from OpenAI events for executor tracking
// Extract workflow and output item events from OpenAI events for executor tracking
const workflowEvents = useMemo(() => {
return openAIEvents.filter(
(event) => event.type === "response.workflow_event.complete"
(event) =>
event.type === "response.output_item.added" ||
event.type === "response.output_item.done" ||
event.type === "response.created" ||
event.type === "response.in_progress" ||
event.type === "response.completed" ||
event.type === "response.failed" ||
// Keep legacy support for older backends
event.type === "response.workflow_event.complete"
);
}, [openAIEvents]);
// Extract executor history from workflow events (filter out workflow-level events)
const executorHistory = useMemo(() => {
return workflowEvents
.filter((event) => {
if ("data" in event && event.data && typeof event.data === "object") {
const data = event.data as Record<string, unknown>;
// Filter out workflow-level events (those without executor_id)
// These include: WorkflowStartedEvent, WorkflowOutputEvent, WorkflowStatusEvent, etc.
return data.executor_id != null;
const history: Array<{
executorId: string;
message: string;
timestamp: string;
status: "running" | "completed" | "error";
}> = [];
workflowEvents.forEach((event) => {
// Handle new standard OpenAI events
if (
event.type === "response.output_item.added" ||
event.type === "response.output_item.done"
) {
const item = (event as any).item;
if (item && item.type === "executor_action" && item.executor_id) {
history.push({
executorId: item.executor_id,
message:
event.type === "response.output_item.added"
? "Executor started"
: item.status === "completed"
? "Executor completed"
: item.status === "failed"
? "Executor failed"
: "Executor processing",
timestamp: new Date().toISOString(),
status:
item.status === "completed"
? "completed"
: item.status === "failed"
? "error"
: "running",
});
}
return false;
})
.map((event) => {
if ("data" in event && event.data && typeof event.data === "object") {
const data = event.data as Record<string, unknown>;
return {
}
// Legacy support for older backends
else if (
event.type === "response.workflow_event.complete" &&
"data" in event &&
event.data &&
typeof event.data === "object"
) {
const data = event.data as Record<string, unknown>;
if (data.executor_id != null) {
history.push({
executorId: String(data.executor_id),
message: String(data.event_type || "Processing"),
timestamp: String(data.timestamp || new Date().toISOString()),
status: String(data.event_type || "").includes("Completed")
? ("completed" as const)
? "completed"
: String(data.event_type || "").includes("Error")
? ("error" as const)
: ("running" as const),
};
? "error"
: "running",
});
}
return {
executorId: "unknown",
message: "Processing",
timestamp: new Date().toISOString(),
status: "running" as const,
};
});
}
});
return history;
}, [workflowEvents]);
// Track active executors
@@ -525,16 +561,51 @@ export function WorkflowView({
);
for await (const openAIEvent of streamGenerator) {
// Only store workflow events in state for performance
// Text deltas are processed directly without state updates
if (openAIEvent.type === "response.workflow_event.complete") {
// Store workflow-related events for tracking
if (
openAIEvent.type === "response.output_item.added" ||
openAIEvent.type === "response.output_item.done" ||
openAIEvent.type === "response.created" ||
openAIEvent.type === "response.in_progress" ||
openAIEvent.type === "response.completed" ||
openAIEvent.type === "response.failed" ||
openAIEvent.type === "response.workflow_event.complete" // Legacy
) {
setOpenAIEvents((prev) => [...prev, openAIEvent]);
}
// Pass to debug panel
onDebugEvent(openAIEvent);
// Handle workflow events to track current executor
// Handle new standard OpenAI events
if (openAIEvent.type === "response.output_item.added") {
const item = (openAIEvent as any).item;
if (item && item.type === "executor_action" && item.executor_id) {
currentStreamingExecutor.current = item.executor_id;
// Initialize output for this executor if not exists
if (!executorOutputs.current[item.executor_id]) {
executorOutputs.current[item.executor_id] = "";
}
}
}
// Handle workflow completion
if (openAIEvent.type === "response.completed") {
// Workflow completed successfully
// Final output is already in workflowResult from text streaming
}
// Handle workflow failure
if (openAIEvent.type === "response.failed") {
const error = (openAIEvent as any).response?.error;
if (error) {
setWorkflowError(
typeof error === "string" ? error : JSON.stringify(error)
);
}
}
// Legacy support for older backends
if (
openAIEvent.type === "response.workflow_event.complete" &&
"data" in openAIEvent &&
@@ -116,6 +116,39 @@ function getFunctionResultFromEvent(event: ExtendedResponseStreamEvent): {
return null;
}
// Helper to get a stable timestamp for an event
// Uses event's own timestamp fields if available
function getEventTimestamp(event: ExtendedResponseStreamEvent): string {
// Priority 1: Check for top-level timestamp (DevUI custom events like function_result.complete)
if ('timestamp' in event && typeof event.timestamp === 'string') {
return new Date(event.timestamp).toLocaleTimeString();
}
// Priority 2: Check for nested data.timestamp (workflow/trace events)
if ('data' in event && event.data && typeof event.data === 'object' && 'timestamp' in event.data) {
const dataTimestamp = (event.data as any).timestamp;
if (typeof dataTimestamp === 'string') {
return new Date(dataTimestamp).toLocaleTimeString();
}
}
// Priority 3: Check for created_at in response object (lifecycle events)
if ('response' in event && event.response && typeof event.response === 'object' && 'created_at' in event.response) {
const createdAt = (event.response as any).created_at;
if (typeof createdAt === 'number') {
return new Date(createdAt * 1000).toLocaleTimeString();
}
}
// Fallback: use sequence number as label (better than showing same time for all)
if ('sequence_number' in event && typeof event.sequence_number === 'number') {
return `#${event.sequence_number}`;
}
// Last resort: hide timestamp by returning empty string
return '';
}
// Helper function to accumulate OpenAI events into meaningful units
function processEventsForDisplay(
events: ExtendedResponseStreamEvent[]
@@ -551,7 +584,7 @@ function EventItem({ event }: EventItemProps) {
const [isExpanded, setIsExpanded] = useState(false);
const Icon = getEventIcon(event.type);
const colorClass = getEventColor(event.type);
const timestamp = new Date().toLocaleTimeString();
const timestamp = getEventTimestamp(event);
const summary = getEventSummary(event);
// Determine if this event has expandable content
@@ -1487,7 +1520,7 @@ function ToolsTab({ events }: { events: ExtendedResponseStreamEvent[] }) {
}
function ToolEventItem({ event }: { event: ExtendedResponseStreamEvent }) {
const timestamp = new Date().toLocaleTimeString();
const timestamp = getEventTimestamp(event);
// Check if this is a function call or result event
const isFunctionCall = event.type === "response.function_call.complete";
@@ -18,7 +18,7 @@
* - Horizontal rules (---)
*/
import React, { useState } from "react";
import React, { useState, useRef, useEffect } from "react";
interface MarkdownRendererProps {
content: string;
@@ -35,10 +35,10 @@ interface CodeBlockProps {
*/
function CodeBlock({ code, language }: CodeBlockProps) {
const [copied, setCopied] = useState(false);
const timeoutRef = React.useRef<NodeJS.Timeout | null>(null);
const timeoutRef = useRef<NodeJS.Timeout | null>(null);
// Cleanup timeout on unmount
React.useEffect(() => {
useEffect(() => {
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
@@ -397,9 +397,7 @@ class ApiClient {
// Convert to OpenAI format - use model field for entity_id (same as agents)
const openAIRequest: AgentFrameworkRequest = {
model: workflowId, // Use workflow ID in model field (matches agent pattern)
input: typeof request.input_data === 'string'
? request.input_data
: JSON.stringify(request.input_data || ""), // Convert input_data to string
input: request.input_data || "", // Send dict directly, no stringification needed
stream: true,
conversation: request.conversation_id, // Include conversation if present
};
@@ -68,13 +68,13 @@ export type ResponseInputParam = ResponseInputItem[];
// Agent Framework extension fields (matches backend AgentFrameworkExtraBody)
export interface AgentFrameworkExtraBody {
entity_id: string;
input_data?: Record<string, unknown>;
// input_data removed - now using standard input field for all data
}
// Agent Framework Request - OpenAI ResponseCreateParams with extensions
export interface AgentFrameworkRequest {
model: string;
input: string | ResponseInputParam; // Union type matching OpenAI
input: string | ResponseInputParam | Record<string, unknown>; // Union type matching OpenAI + dict for workflows
stream?: boolean;
// OpenAI conversation parameter (standard!)
@@ -104,11 +104,19 @@ export type {
ResponseWorkflowEventComplete,
ResponseTraceEventComplete,
ResponseOutputItemAddedEvent,
ResponseOutputItemDoneEvent,
ResponseCreatedEvent,
ResponseInProgressEvent,
ResponseCompletedEvent,
ResponseFailedEvent,
ResponseFunctionResultComplete,
StructuredEvent,
WorkflowItem,
ExecutorActionItem,
} from "./openai";
export { isExecutorAction } from "./openai";
// Re-export Agent Framework types
export type {
AgentFrameworkRequest,
@@ -21,6 +21,48 @@ export interface ResponseStreamEvent {
created_at?: number;
}
// Standard OpenAI Response Lifecycle Events
export interface ResponseCreatedEvent {
type: "response.created";
response: {
id: string;
status: "in_progress";
created_at: number;
output?: any[];
};
sequence_number?: number;
}
export interface ResponseInProgressEvent {
type: "response.in_progress";
response: {
id: string;
status: "in_progress";
};
sequence_number?: number;
}
export interface ResponseCompletedEvent {
type: "response.completed";
response: {
id: string;
status: "completed";
usage?: any; // Optional usage information
model?: string; // Optional model information
};
sequence_number?: number;
}
export interface ResponseFailedEvent {
type: "response.failed";
response: {
id: string;
status: "failed";
error?: any;
};
sequence_number?: number;
}
// Custom Agent Framework OpenAI event types with structured data
export interface ResponseWorkflowEventComplete {
type: "response.workflow_event.complete";
@@ -83,13 +125,41 @@ export interface ResponseFunctionToolCall {
status?: "in_progress" | "completed" | "incomplete";
}
// OpenAI Responses API - Output Item Added Event
// OpenAI standard: Output item added event
// Workflow Item Types - flexible interface for any workflow item
export interface WorkflowItem {
type: string; // "executor_action", "workflow_action", "message", or any future type
id: string;
status?: "in_progress" | "completed" | "failed" | "cancelled";
[key: string]: any; // Allow any additional fields
}
// Executor Action Item (DevUI specific)
export interface ExecutorActionItem extends WorkflowItem {
type: "executor_action";
executor_id: string;
metadata?: Record<string, any>;
result?: any;
error?: any;
}
// Type guard for executor actions
export function isExecutorAction(item: WorkflowItem): item is ExecutorActionItem {
return item.type === "executor_action" && "executor_id" in item;
}
// OpenAI Responses API - Output Item Events
export interface ResponseOutputItemAddedEvent {
type: "response.output_item.added";
item: ResponseFunctionToolCall;
item: WorkflowItem | ResponseFunctionToolCall | any; // Flexible to support various item types
output_index: number;
sequence_number: number;
sequence_number?: number;
}
export interface ResponseOutputItemDoneEvent {
type: "response.output_item.done";
item: WorkflowItem | ResponseFunctionToolCall | any;
output_index: number;
sequence_number?: number;
}
// Trace event - matching actual backend output
@@ -171,6 +241,7 @@ export interface ResponseFunctionResultComplete {
item_id: string;
output_index: number;
sequence_number: number;
timestamp?: string; // Optional ISO timestamp for UI display
}
// DevUI Extension: Turn Separator (UI-only event for grouping)
@@ -182,11 +253,15 @@ export interface TurnSeparatorEvent {
// Union type for all structured events
export type StructuredEvent =
| ResponseCreatedEvent
| ResponseInProgressEvent
| ResponseCompletedEvent
| ResponseFailedEvent
| ResponseWorkflowEventComplete
| ResponseTraceEventComplete
| ResponseTraceComplete
| ResponseOutputItemAddedEvent
| ResponseOutputItemDoneEvent
| ResponseFunctionCallComplete
| ResponseFunctionCallDelta
| ResponseFunctionCallArgumentsDelta
@@ -249,12 +324,6 @@ export interface ResponseUsage {
};
}
// OpenAI standard: response.completed event
export interface ResponseCompletedEvent {
type: "response.completed";
response: OpenAIResponse;
sequence_number: number;
}
// Request format for Agent Framework
// AgentFrameworkRequest moved to agent-framework.ts to avoid conflicts
@@ -307,6 +307,7 @@ export function applyDagreLayout(
/**
* Process workflow events and extract node updates
* Handles both new standard OpenAI events and legacy workflow events
*/
export function processWorkflowEvents(
events: ExtendedResponseStreamEvent[],
@@ -316,7 +317,43 @@ export function processWorkflowEvents(
let hasWorkflowStarted = false;
events.forEach((event) => {
if (
// Handle new standard OpenAI events
if (event.type === "response.output_item.added" || event.type === "response.output_item.done") {
const item = (event as any).item;
if (item && item.type === "executor_action" && item.executor_id) {
const executorId = item.executor_id;
let state: ExecutorState = "pending";
let error: string | undefined;
if (event.type === "response.output_item.added") {
state = "running";
} else if (event.type === "response.output_item.done") {
if (item.status === "completed") {
state = "completed";
} else if (item.status === "failed") {
state = "failed";
error = item.error ? (typeof item.error === "string" ? item.error : JSON.stringify(item.error)) : "Execution failed";
} else if (item.status === "cancelled") {
state = "cancelled";
}
}
nodeUpdates[executorId] = {
nodeId: executorId,
state,
data: item.result,
error,
timestamp: new Date().toISOString(),
};
}
}
// Handle workflow lifecycle events
else if (event.type === "response.created" || event.type === "response.in_progress") {
hasWorkflowStarted = true;
}
// Legacy support for older backends
else if (
event.type === "response.workflow_event.complete" &&
"data" in event &&
event.data
@@ -417,7 +454,20 @@ export function getCurrentlyExecutingExecutors(
// Process events to find the most recent event for each executor
events.forEach((event) => {
if (
// Handle new standard OpenAI events
if (event.type === "response.output_item.added" || event.type === "response.output_item.done") {
const item = (event as any).item;
if (item && item.type === "executor_action" && item.executor_id) {
const executorId = item.executor_id;
executorTimeline[executorId] = {
lastEvent: event.type === "response.output_item.added" ? "ExecutorInvokedEvent" : "ExecutorCompletedEvent",
timestamp: new Date().toISOString(),
};
}
}
// Legacy support for older backends
else if (
event.type === "response.workflow_event.complete" &&
"data" in event &&
event.data
+238 -5
View File
@@ -13,7 +13,14 @@ import pytest
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "main"))
# Import Agent Framework types (assuming they are always available)
from agent_framework._types import AgentRunResponseUpdate, ErrorContent, FunctionCallContent, Role, TextContent
from agent_framework._types import (
AgentRunResponseUpdate,
ErrorContent,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
)
from agent_framework_devui._mapper import MessageMapper
from agent_framework_devui.models._openai_custom import AgentFrameworkRequest
@@ -79,15 +86,30 @@ async def test_critical_isinstance_bug_detection(mapper: MessageMapper, test_req
async def test_text_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test TextContent mapping."""
"""Test TextContent mapping with proper OpenAI event hierarchy."""
content = create_test_content("text", text="Hello, clean test!")
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) == 1
assert events[0].type == "response.output_text.delta"
assert events[0].delta == "Hello, clean test!"
# With proper OpenAI hierarchy, we expect 3 events:
# 1. response.output_item.added (message)
# 2. response.content_part.added (text part)
# 3. response.output_text.delta (actual text)
assert len(events) == 3
# Check message output item
assert events[0].type == "response.output_item.added"
assert events[0].item.type == "message"
assert events[0].item.role == "assistant"
# Check content part
assert events[1].type == "response.content_part.added"
assert events[1].part.type == "output_text"
# Check text delta
assert events[2].type == "response.output_text.delta"
assert events[2].delta == "Hello, clean test!"
async def test_function_call_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
@@ -108,6 +130,83 @@ async def test_function_call_mapping(mapper: MessageMapper, test_request: AgentF
assert "TestCity" in full_json
async def test_function_result_content_with_string_result(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with plain string result (regular tools)."""
content = FunctionResultContent(
call_id="test_call_123",
result="Hello, World!", # Plain string like regular Python function tools
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should produce response.function_result.complete event
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
assert result_events[0].output == "Hello, World!"
assert result_events[0].call_id == "test_call_123"
assert result_events[0].status == "completed"
async def test_function_result_content_with_nested_content_objects(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with nested Content objects (MCP tools case).
This tests the issue from GitHub #1476 where MCP tools return FunctionResultContent
with nested TextContent objects that fail to serialize properly.
"""
# This is what MCP tools return - result contains nested Content objects
content = FunctionResultContent(
call_id="mcp_call_456",
result=[TextContent(text="Hello from MCP!")], # List containing TextContent object
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
# Should successfully serialize the nested Content object
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
# The output should contain the text from the nested TextContent
# Should not have TypeError or empty output
assert result_events[0].output != ""
assert "Hello from MCP!" in result_events[0].output
assert result_events[0].call_id == "mcp_call_456"
async def test_function_result_content_with_multiple_nested_content_objects(
mapper: MessageMapper, test_request: AgentFrameworkRequest
) -> None:
"""Test FunctionResultContent with multiple nested Content objects."""
# MCP tools can return multiple Content objects
content = FunctionResultContent(
call_id="mcp_call_789",
result=[
TextContent(text="First result"),
TextContent(text="Second result"),
],
)
update = create_test_agent_update([content])
events = await mapper.convert_event(update, test_request)
assert len(events) >= 1
result_events = [e for e in events if e.type == "response.function_result.complete"]
assert len(result_events) == 1
# Should serialize all nested Content objects
output = result_events[0].output
assert output != ""
assert "First result" in output
assert "Second result" in output
async def test_error_content_mapping(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test ErrorContent mapping."""
content = create_test_content("error", message="Test error", code="test_code")
@@ -182,6 +281,140 @@ async def test_agent_run_response_mapping(mapper: MessageMapper, test_request: A
assert text_events[0].delta == "Complete response from run()"
async def test_agent_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that agent lifecycle events are properly converted to OpenAI format."""
from agent_framework_devui.models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent
# Test AgentStartedEvent
start_event = AgentStartedEvent()
events = await mapper.convert_event(start_event, test_request)
assert len(events) == 2 # Should emit response.created and response.in_progress
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
assert events[0].response.model == "test_agent" # Should use model from request
assert events[0].response.status == "in_progress"
# Test AgentCompletedEvent
complete_event = AgentCompletedEvent()
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.completed"
assert events[0].response.status == "completed"
# Test AgentFailedEvent
error = Exception("Test error")
failed_event = AgentFailedEvent(error=error)
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
assert events[0].response.status == "failed"
assert events[0].response.error.message == "Test error"
assert events[0].response.error.code == "server_error"
@pytest.mark.skip(reason="Workflow events need real classes from agent_framework.workflows")
async def test_workflow_lifecycle_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that workflow lifecycle events are properly converted to OpenAI format."""
# Create mock workflow events (since we don't have access to the real ones in tests)
class WorkflowStartedEvent: # noqa: B903
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
class WorkflowCompletedEvent: # noqa: B903
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
class WorkflowFailedEvent: # noqa: B903
def __init__(self, workflow_id: str, error_info: dict | None = None):
self.workflow_id = workflow_id
self.error_info = error_info
# Test WorkflowStartedEvent
start_event = WorkflowStartedEvent(workflow_id="test_workflow_123")
events = await mapper.convert_event(start_event, test_request)
assert len(events) == 2 # Should emit response.created and response.in_progress
assert events[0].type == "response.created"
assert events[1].type == "response.in_progress"
assert events[0].response.model == "test_agent" # Should use model from request
assert events[0].response.status == "in_progress"
# Test WorkflowCompletedEvent
complete_event = WorkflowCompletedEvent(workflow_id="test_workflow_123")
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.completed"
assert events[0].response.status == "completed"
# Test WorkflowFailedEvent with error info
failed_event = WorkflowFailedEvent(workflow_id="test_workflow_123", error_info={"message": "Workflow failed"})
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.failed"
assert events[0].response.status == "failed"
assert events[0].response.error.message == "{'message': 'Workflow failed'}"
assert events[0].response.error.code == "server_error"
@pytest.mark.skip(reason="Executor events need real classes from agent_framework.workflows")
async def test_executor_action_events(mapper: MessageMapper, test_request: AgentFrameworkRequest) -> None:
"""Test that workflow executor events are properly converted to custom output item events."""
# Create mock executor events (since we don't have access to the real ones in tests)
class ExecutorInvokedEvent: # noqa: B903
def __init__(self, executor_id: str, executor_type: str = "test"):
self.executor_id = executor_id
self.executor_type = executor_type
class ExecutorCompletedEvent: # noqa: B903
def __init__(self, executor_id: str, result: Any = None):
self.executor_id = executor_id
self.result = result
class ExecutorFailedEvent: # noqa: B903
def __init__(self, executor_id: str, error: Exception | None = None):
self.executor_id = executor_id
self.error = error
# Test ExecutorInvokedEvent
invoked_event = ExecutorInvokedEvent(executor_id="exec_123", executor_type="test_executor")
events = await mapper.convert_event(invoked_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.added"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "in_progress"
# Test ExecutorCompletedEvent
complete_event = ExecutorCompletedEvent(executor_id="exec_123", result={"data": "success"})
events = await mapper.convert_event(complete_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "completed"
assert events[0].item["result"] == {"data": "success"}
# Test ExecutorFailedEvent
failed_event = ExecutorFailedEvent(executor_id="exec_123", error=Exception("Executor failed"))
events = await mapper.convert_event(failed_event, test_request)
assert len(events) == 1
assert events[0].type == "response.output_item.done"
assert events[0].item["type"] == "executor_action"
assert events[0].item["executor_id"] == "exec_123"
assert events[0].item["status"] == "failed"
assert "Executor failed" in str(events[0].item["error"]["message"])
if __name__ == "__main__":
# Simple test runner
async def run_all_tests() -> None:
@@ -143,6 +143,104 @@ def test_select_primary_input_type_prefers_string_and_dict():
assert fallback is int
@pytest.mark.asyncio
async def test_credential_cleanup() -> None:
"""Test that async credentials are properly closed during server cleanup."""
from unittest.mock import AsyncMock, Mock
from agent_framework import ChatAgent
# Create mock credential with async close
mock_credential = AsyncMock()
mock_credential.close = AsyncMock()
# Create mock chat client with credential
mock_client = Mock()
mock_client.async_credential = mock_credential
mock_client.model_id = "test-model"
# Create agent with mock client
agent = ChatAgent(name="TestAgent", chat_client=mock_client, instructions="Test agent")
# Create DevUI server with agent
server = DevServer()
server._pending_entities = [agent]
await server._ensure_executor()
# Run cleanup
await server._cleanup_entities()
# Verify credential.close() was called
assert mock_credential.close.called, "Async credential close should have been called"
assert mock_credential.close.call_count == 1
@pytest.mark.asyncio
async def test_credential_cleanup_error_handling() -> None:
"""Test that credential cleanup errors are handled gracefully."""
from unittest.mock import AsyncMock, Mock
from agent_framework import ChatAgent
# Create mock credential that raises error on close
mock_credential = AsyncMock()
mock_credential.close = AsyncMock(side_effect=Exception("Close failed"))
# Create mock chat client with credential
mock_client = Mock()
mock_client.async_credential = mock_credential
mock_client.model_id = "test-model"
# Create agent with mock client
agent = ChatAgent(name="TestAgent", chat_client=mock_client, instructions="Test agent")
# Create DevUI server with agent
server = DevServer()
server._pending_entities = [agent]
await server._ensure_executor()
# Run cleanup - should not raise despite credential error
await server._cleanup_entities()
# Verify close was attempted
assert mock_credential.close.called
@pytest.mark.asyncio
async def test_multiple_credential_attributes() -> None:
"""Test that we check all common credential attribute names."""
from unittest.mock import AsyncMock, Mock
from agent_framework import ChatAgent
# Create mock credentials
mock_cred1 = Mock()
mock_cred1.close = Mock()
mock_cred2 = AsyncMock()
mock_cred2.close = AsyncMock()
# Create mock chat client with multiple credential attributes
mock_client = Mock()
mock_client.credential = mock_cred1
mock_client.async_credential = mock_cred2
mock_client.model_id = "test-model"
# Create agent with mock client
agent = ChatAgent(name="TestAgent", chat_client=mock_client, instructions="Test agent")
# Create DevUI server with agent
server = DevServer()
server._pending_entities = [agent]
await server._ensure_executor()
# Run cleanup
await server._cleanup_entities()
# Verify both credentials were closed
assert mock_cred1.close.called, "Sync credential should be closed"
assert mock_cred2.close.called, "Async credential should be closed"
if __name__ == "__main__":
# Simple test runner
async def run_tests():