Files
agent-framework/python/packages/devui/tests/capture_messages.py
T
Victor Dibia 1ef24d3e91 Python: Add DevUI to AgentFramework (#781)
* add initial backend service code for devui

* add tests

* add frontendcode

* ui updates

* update readme

* ui updates and tweaks

* update ui bundle

* improve ui, add react flow base

* add react flow ui, fix background

* update ui, fix introspection bug

* update readme

* update ui build

* add support for multimodal input - both backend and frontend

* update ui build

* refactor as main framework package

* backend and tests refactor

* ui build update

* ui build update and refactor

* update pyproject.toml, update uv.lock

* update ui build

* ui update to fit oai responses types

* add backend updat and readme update

* mypy and other fixes

* add intial dev guide

* update ui and fix workflow bug

* update ui build, add thread support

* type fixes

* update workflow view

* update uv.lock

* fix workflow iport errors

* lint and other fixes

* mypy fixes

* minor update

* update ui build

* refactor to use oai dependencies directly, update examples to samples, improve typing

* readme update

* update ui and ui build

* fix workflow pyright error

* update ui, fix issues with run workflow placement, miniamp menu, etc

* make samples integrate serve

---------

Co-authored-by: Chris <66376200+crickman@users.noreply.github.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
2025-09-22 23:30:08 +00:00

285 lines
8.5 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""
Message Capture Script - Debug message flow
- This script is intended to provide a reference for the types of events
that are emitted by the server when agents and workflows are executed
"""
import asyncio
import contextlib
import http.client
import json
import threading
import time
from pathlib import Path
from typing import Any
import uvicorn
from openai import OpenAI
from agent_framework_devui import DevServer
def start_server() -> tuple[str, Any]:
"""Start server with samples directory."""
# Get samples directory
current_dir = Path(__file__).parent
samples_dir = current_dir.parent / "samples"
# Create and start server with simplified parameters
server = DevServer(
entities_dir=str(samples_dir.resolve()),
host="127.0.0.1",
port=8085, # Use different port
ui_enabled=False,
)
app = server.get_app()
server_config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=8085,
log_level="info", # More verbose to see tracing setup
)
server_instance = uvicorn.Server(server_config)
def run_server():
asyncio.run(server_instance.serve())
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Wait for server to start
time.sleep(5) # Increased wait time
# Verify server is running with retries
max_retries = 10
for attempt in range(max_retries):
try:
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=5)
try:
conn.request("GET", "/health")
response = conn.getresponse()
if response.status == 200:
break
finally:
conn.close()
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2)
else:
raise RuntimeError(f"Server failed to start after {max_retries} attempts: {e}") from e
return "http://127.0.0.1:8085", server_instance
def capture_agent_stream_with_tracing(client: OpenAI, agent_id: str, scenario: str = "success") -> list[dict[str, Any]]:
"""Capture agent streaming events."""
try:
stream = client.responses.create(
model="agent-framework",
input="Tell me about the weather in Tokyo. I want details.",
stream=True,
extra_body={"entity_id": agent_id},
)
events = []
for event in stream:
# Serialize the entire event object
try:
event_dict = json.loads(event.model_dump_json())
except Exception:
# Fallback to dict conversion if model_dump_json fails
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
events.append(event_dict)
# Just capture everything as-is
if len(events) >= 200: # Increased limit
break
return events
except Exception as e:
# Return error information as events
error_event = {
"type": "error",
"scenario": scenario,
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
}
return [error_event]
def capture_workflow_stream_with_tracing(
client: OpenAI, workflow_id: str, scenario: str = "success"
) -> list[dict[str, Any]]:
"""Capture workflow streaming events."""
try:
stream = client.responses.create(
model="agent-framework",
input=(
"Process this spam detection workflow with multiple emails: "
"'Buy now!', 'Hello mom', 'URGENT: Click here!'"
),
stream=True,
extra_body={"entity_id": workflow_id},
)
events = []
for event in stream:
# Serialize the entire event object
try:
event_dict = json.loads(event.model_dump_json())
except Exception:
# Fallback to dict conversion if model_dump_json fails
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
events.append(event_dict)
# Just capture everything as-is
if len(events) >= 200: # Increased limit
break
return events
except Exception as e:
# Return error information as events
error_event = {
"type": "error",
"scenario": scenario,
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
"entity_type": "workflow",
}
return [error_event]
def capture_agent_with_bad_config(base_url: str, agent_id: str) -> list[dict[str, Any]]:
"""Capture agent events with intentionally bad configuration to test error handling."""
# Test with invalid API key
bad_client = OpenAI(base_url=f"{base_url}/v1", api_key="invalid-api-key-123")
try:
return capture_agent_stream_with_tracing(bad_client, agent_id, "bad_api_key")
except Exception as e:
return [
{
"type": "error",
"scenario": "bad_api_key",
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
}
]
def capture_agent_with_wrong_model(base_url: str, agent_id: str) -> list[dict[str, Any]]:
"""Capture agent events with wrong model name to test error handling."""
client = OpenAI(
base_url=f"{base_url}/v1",
api_key="dummy-key", # Use the same key as success case
)
try:
stream = client.responses.create(
model="gpt-4-nonexistent-model", # Wrong model name
input="Tell me about the weather in Tokyo. I want details.",
stream=True,
extra_body={"entity_id": agent_id},
)
events = []
for event in stream:
# Serialize the entire event object
try:
event_dict = json.loads(event.model_dump_json())
except Exception:
# Fallback to dict conversion if model_dump_json fails
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
events.append(event_dict)
if len(events) >= 200:
break
return events
except Exception as e:
return [
{
"type": "error",
"scenario": "wrong_model",
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
}
]
def main():
"""Main capture script - testing both success and failure scenarios."""
# Setup
output_dir = Path(__file__).parent / "captured_messages"
output_dir.mkdir(exist_ok=True)
# Start server
base_url, server_instance = start_server()
try:
# Create OpenAI client for success scenario
client = OpenAI(base_url=f"{base_url}/v1", api_key="dummy-key")
# Discover entities
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=10)
try:
conn.request("GET", "/v1/entities")
response = conn.getresponse()
response_data = response.read().decode("utf-8")
entities = json.loads(response_data)["entities"]
finally:
conn.close()
all_results = {}
# Test each entity
for entity in entities:
entity_type = entity["type"]
entity_id = entity["id"]
if entity_type == "agent":
events = capture_agent_stream_with_tracing(client, entity_id, "success")
elif entity_type == "workflow":
events = capture_workflow_stream_with_tracing(client, entity_id, "success")
else:
continue
all_results[f"{entity_type}_{entity_id}"] = {"entity_info": entity, "events": events}
# Save results
file_path = output_dir / "entities_stream_events.json"
with open(file_path, "w") as f:
json.dump(
{"timestamp": time.time(), "server_type": "DevServer", "entities_tested": all_results},
f,
indent=2,
default=str,
)
finally:
# Cleanup server
with contextlib.suppress(Exception):
server_instance.should_exit = True
if __name__ == "__main__":
main()