mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
1ef24d3e91
* add initial backend service code for devui * add tests * add frontendcode * ui updates * update readme * ui updates and tweaks * update ui bundle * improve ui, add react flow base * add react flow ui, fix background * update ui, fix introspection bug * update readme * update ui build * add support for multimodal input - both backend and frontend * update ui build * refactor as main framework package * backend and tests refactor * ui build update * ui build update and refactor * update pyproject.toml, update uv.lock * update ui build * ui update to fit oai responses types * add backend updat and readme update * mypy and other fixes * add intial dev guide * update ui and fix workflow bug * update ui build, add thread support * type fixes * update workflow view * update uv.lock * fix workflow iport errors * lint and other fixes * mypy fixes * minor update * update ui build * refactor to use oai dependencies directly, update examples to samples, improve typing * readme update * update ui and ui build * fix workflow pyright error * update ui, fix issues with run workflow placement, miniamp menu, etc * make samples integrate serve --------- Co-authored-by: Chris <66376200+crickman@users.noreply.github.com> Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
285 lines
8.5 KiB
Python
285 lines
8.5 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""
|
|
Message Capture Script - Debug message flow
|
|
- This script is intended to provide a reference for the types of events
|
|
that are emitted by the server when agents and workflows are executed
|
|
"""
|
|
|
|
import asyncio
|
|
import contextlib
|
|
import http.client
|
|
import json
|
|
import threading
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import uvicorn
|
|
from openai import OpenAI
|
|
|
|
from agent_framework_devui import DevServer
|
|
|
|
|
|
def start_server() -> tuple[str, Any]:
|
|
"""Start server with samples directory."""
|
|
# Get samples directory
|
|
current_dir = Path(__file__).parent
|
|
samples_dir = current_dir.parent / "samples"
|
|
|
|
# Create and start server with simplified parameters
|
|
server = DevServer(
|
|
entities_dir=str(samples_dir.resolve()),
|
|
host="127.0.0.1",
|
|
port=8085, # Use different port
|
|
ui_enabled=False,
|
|
)
|
|
|
|
app = server.get_app()
|
|
|
|
server_config = uvicorn.Config(
|
|
app=app,
|
|
host="127.0.0.1",
|
|
port=8085,
|
|
log_level="info", # More verbose to see tracing setup
|
|
)
|
|
server_instance = uvicorn.Server(server_config)
|
|
|
|
def run_server():
|
|
asyncio.run(server_instance.serve())
|
|
|
|
server_thread = threading.Thread(target=run_server, daemon=True)
|
|
server_thread.start()
|
|
|
|
# Wait for server to start
|
|
time.sleep(5) # Increased wait time
|
|
|
|
# Verify server is running with retries
|
|
max_retries = 10
|
|
for attempt in range(max_retries):
|
|
try:
|
|
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=5)
|
|
try:
|
|
conn.request("GET", "/health")
|
|
response = conn.getresponse()
|
|
if response.status == 200:
|
|
break
|
|
finally:
|
|
conn.close()
|
|
except Exception as e:
|
|
if attempt < max_retries - 1:
|
|
time.sleep(2)
|
|
else:
|
|
raise RuntimeError(f"Server failed to start after {max_retries} attempts: {e}") from e
|
|
|
|
return "http://127.0.0.1:8085", server_instance
|
|
|
|
|
|
def capture_agent_stream_with_tracing(client: OpenAI, agent_id: str, scenario: str = "success") -> list[dict[str, Any]]:
|
|
"""Capture agent streaming events."""
|
|
|
|
try:
|
|
stream = client.responses.create(
|
|
model="agent-framework",
|
|
input="Tell me about the weather in Tokyo. I want details.",
|
|
stream=True,
|
|
extra_body={"entity_id": agent_id},
|
|
)
|
|
|
|
events = []
|
|
for event in stream:
|
|
# Serialize the entire event object
|
|
try:
|
|
event_dict = json.loads(event.model_dump_json())
|
|
except Exception:
|
|
# Fallback to dict conversion if model_dump_json fails
|
|
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
|
|
|
|
events.append(event_dict)
|
|
|
|
# Just capture everything as-is
|
|
if len(events) >= 200: # Increased limit
|
|
break
|
|
|
|
return events
|
|
|
|
except Exception as e:
|
|
# Return error information as events
|
|
error_event = {
|
|
"type": "error",
|
|
"scenario": scenario,
|
|
"error_message": str(e),
|
|
"error_type": type(e).__name__,
|
|
"timestamp": time.time(),
|
|
}
|
|
return [error_event]
|
|
|
|
|
|
def capture_workflow_stream_with_tracing(
|
|
client: OpenAI, workflow_id: str, scenario: str = "success"
|
|
) -> list[dict[str, Any]]:
|
|
"""Capture workflow streaming events."""
|
|
|
|
try:
|
|
stream = client.responses.create(
|
|
model="agent-framework",
|
|
input=(
|
|
"Process this spam detection workflow with multiple emails: "
|
|
"'Buy now!', 'Hello mom', 'URGENT: Click here!'"
|
|
),
|
|
stream=True,
|
|
extra_body={"entity_id": workflow_id},
|
|
)
|
|
|
|
events = []
|
|
for event in stream:
|
|
# Serialize the entire event object
|
|
try:
|
|
event_dict = json.loads(event.model_dump_json())
|
|
except Exception:
|
|
# Fallback to dict conversion if model_dump_json fails
|
|
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
|
|
|
|
events.append(event_dict)
|
|
|
|
# Just capture everything as-is
|
|
if len(events) >= 200: # Increased limit
|
|
break
|
|
|
|
return events
|
|
|
|
except Exception as e:
|
|
# Return error information as events
|
|
error_event = {
|
|
"type": "error",
|
|
"scenario": scenario,
|
|
"error_message": str(e),
|
|
"error_type": type(e).__name__,
|
|
"timestamp": time.time(),
|
|
"entity_type": "workflow",
|
|
}
|
|
return [error_event]
|
|
|
|
|
|
def capture_agent_with_bad_config(base_url: str, agent_id: str) -> list[dict[str, Any]]:
|
|
"""Capture agent events with intentionally bad configuration to test error handling."""
|
|
|
|
# Test with invalid API key
|
|
bad_client = OpenAI(base_url=f"{base_url}/v1", api_key="invalid-api-key-123")
|
|
|
|
try:
|
|
return capture_agent_stream_with_tracing(bad_client, agent_id, "bad_api_key")
|
|
except Exception as e:
|
|
return [
|
|
{
|
|
"type": "error",
|
|
"scenario": "bad_api_key",
|
|
"error_message": str(e),
|
|
"error_type": type(e).__name__,
|
|
"timestamp": time.time(),
|
|
}
|
|
]
|
|
|
|
|
|
def capture_agent_with_wrong_model(base_url: str, agent_id: str) -> list[dict[str, Any]]:
|
|
"""Capture agent events with wrong model name to test error handling."""
|
|
|
|
client = OpenAI(
|
|
base_url=f"{base_url}/v1",
|
|
api_key="dummy-key", # Use the same key as success case
|
|
)
|
|
|
|
try:
|
|
stream = client.responses.create(
|
|
model="gpt-4-nonexistent-model", # Wrong model name
|
|
input="Tell me about the weather in Tokyo. I want details.",
|
|
stream=True,
|
|
extra_body={"entity_id": agent_id},
|
|
)
|
|
|
|
events = []
|
|
for event in stream:
|
|
# Serialize the entire event object
|
|
try:
|
|
event_dict = json.loads(event.model_dump_json())
|
|
except Exception:
|
|
# Fallback to dict conversion if model_dump_json fails
|
|
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
|
|
|
|
events.append(event_dict)
|
|
|
|
if len(events) >= 200:
|
|
break
|
|
|
|
return events
|
|
|
|
except Exception as e:
|
|
return [
|
|
{
|
|
"type": "error",
|
|
"scenario": "wrong_model",
|
|
"error_message": str(e),
|
|
"error_type": type(e).__name__,
|
|
"timestamp": time.time(),
|
|
}
|
|
]
|
|
|
|
|
|
def main():
|
|
"""Main capture script - testing both success and failure scenarios."""
|
|
|
|
# Setup
|
|
output_dir = Path(__file__).parent / "captured_messages"
|
|
output_dir.mkdir(exist_ok=True)
|
|
|
|
# Start server
|
|
base_url, server_instance = start_server()
|
|
|
|
try:
|
|
# Create OpenAI client for success scenario
|
|
client = OpenAI(base_url=f"{base_url}/v1", api_key="dummy-key")
|
|
|
|
# Discover entities
|
|
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=10)
|
|
try:
|
|
conn.request("GET", "/v1/entities")
|
|
response = conn.getresponse()
|
|
response_data = response.read().decode("utf-8")
|
|
entities = json.loads(response_data)["entities"]
|
|
finally:
|
|
conn.close()
|
|
|
|
all_results = {}
|
|
|
|
# Test each entity
|
|
for entity in entities:
|
|
entity_type = entity["type"]
|
|
entity_id = entity["id"]
|
|
|
|
if entity_type == "agent":
|
|
events = capture_agent_stream_with_tracing(client, entity_id, "success")
|
|
elif entity_type == "workflow":
|
|
events = capture_workflow_stream_with_tracing(client, entity_id, "success")
|
|
else:
|
|
continue
|
|
|
|
all_results[f"{entity_type}_{entity_id}"] = {"entity_info": entity, "events": events}
|
|
# Save results
|
|
file_path = output_dir / "entities_stream_events.json"
|
|
with open(file_path, "w") as f:
|
|
json.dump(
|
|
{"timestamp": time.time(), "server_type": "DevServer", "entities_tested": all_results},
|
|
f,
|
|
indent=2,
|
|
default=str,
|
|
)
|
|
|
|
finally:
|
|
# Cleanup server
|
|
with contextlib.suppress(Exception):
|
|
server_instance.should_exit = True
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|