Files
agent-framework/python/packages/devui/tests/devui/capture_messages.py
Eduard van Valkenburg a2856d3b92 Python: restructure: Python samples into progressive 01-05 layout (#3862)
* restructure: Python samples into progressive 01-05 layout

- 01-get-started/: 6 numbered steps (hello agent → hosting)
- 02-agents/: all agent concept samples (tools, middleware, providers, etc.)
- 03-workflows/: ALL existing workflow samples preserved as-is
- 04-hosting/: azure-functions, durabletask, a2a
- 05-end-to-end/: demos, evaluation, hosted agents
- Old files moved to _to_delete/ for review
- Added AGENTS.md with structure documentation
- autogen-migration/ and semantic-kernel-migration/ preserved at root

* fix: switch to AzureOpenAI Foundry, fix CI failures

- Switch all 01-get-started samples to AzureOpenAIResponsesClient with
  Azure AI Foundry project endpoint (AZURE_AI_PROJECT_ENDPOINT +
  AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME + AzureCliCredential)
- Add _to_delete/ and 05-end-to-end/ to pyrightconfig.samples.json excludes
- Fix test paths in packages/ that referenced old getting_started/ dirs:
  durabletask conftest + streaming test, azurefunctions conftest,
  devui conftest + capture_messages + openai_sdk_integration
- Fix workflow_as_agent_human_in_the_loop.py import (sibling import)
- Update hosting READMEs and tool comment paths
- Replace root README.md with new structure overview
- Update AGENTS.md to document Azure OpenAI Foundry as default provider

* cleanup: remove _to_delete folder, copy resource files to active dirs

All files in _to_delete/ were either:
- Exact duplicates of files in the new structure (240 files)
- Same file with only comment path updates (100 files)
- One import-fix diff (workflow_as_agent_human_in_the_loop.py)
- One superseded minimal_sample.py

Resource files (sample.pdf, countries.json, employees.pdf, weather.json)
copied to 02-agents/sample_assets/ and 02-agents/resources/ since active
samples reference them.

* fix: address PR review comments, centralize resources, remove root duplicates

- Fix type annotation in 04_memory.py (string union -> proper types)
- Fix old sample paths in observability files
- Fix grammar/spelling in observability samples
- Move sample_assets/ and resources/ to shared/ folder
- Remove 8 duplicate observability files from 02-agents root
- Update resource path references in multimodal_input and provider samples

* fix: update broken links from old getting_started paths to new structure

- Update relative paths in READMEs: getting_started/ → 01-get-started/,
  02-agents/, 03-workflows/, 04-hosting/, 05-end-to-end/
- Fix absolute GitHub URLs in package READMEs
- Fix broken link in ollama package README

* fix: convert absolute GitHub URLs to relative paths for link checker

Absolute URLs to python/samples/ on main branch 404 until PR merges.
Converted to relative paths that linkspector can verify locally.

* fix: update link for handoff sample moved to orchestrations/

* fix: update chatkit-integration README path from demos/ to 05-end-to-end/

* fix: update broken links in orchestrations README to match flat directory structure
2026-02-12 17:36:36 +00:00

228 lines
6.8 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""
Message Capture Script - Debug message flow
- This script is intended to provide a reference for the types of events
that are emitted by the server when agents and workflows are executed
"""
import asyncio
import contextlib
import http.client
import json
import logging
import threading
import time
from pathlib import Path
from typing import Any
import uvicorn
from openai import OpenAI
from agent_framework_devui import DevServer
logger = logging.getLogger(__name__)
def start_server() -> tuple[str, Any]:
"""Start server with samples directory."""
# Get samples directory - updated path after samples were moved
current_dir = Path(__file__).parent
# Samples are now in python/samples/02-agents/devui
samples_dir = current_dir.parent.parent.parent / "samples" / "02-agents" / "devui"
if not samples_dir.exists():
raise RuntimeError(f"Samples directory not found: {samples_dir}")
logger.info(f"Using samples directory: {samples_dir}")
# Create and start server with simplified parameters
server = DevServer(
entities_dir=str(samples_dir.resolve()),
host="127.0.0.1",
port=8085, # Use different port
ui_enabled=False,
)
app = server.get_app()
server_config = uvicorn.Config(
app=app,
host="127.0.0.1",
port=8085,
# log_level="info", # More verbose to see tracing setup
)
server_instance = uvicorn.Server(server_config)
def run_server():
asyncio.run(server_instance.serve())
server_thread = threading.Thread(target=run_server, daemon=True)
server_thread.start()
# Wait for server to start
time.sleep(5) # Increased wait time
# Verify server is running with retries
max_retries = 10
for attempt in range(max_retries):
try:
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=5)
try:
conn.request("GET", "/health")
response = conn.getresponse()
if response.status == 200:
break
finally:
conn.close()
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2)
else:
raise RuntimeError(f"Server failed to start after {max_retries} attempts: {e}") from e
return "http://127.0.0.1:8085", server_instance
def capture_agent_stream_with_tracing(client: OpenAI, agent_id: str, scenario: str = "success") -> list[dict[str, Any]]:
"""Capture agent streaming events."""
try:
stream = client.responses.create(
metadata={"entity_id": agent_id},
input="Tell me about the weather in Tokyo. I want details.",
stream=True,
)
events = []
for event in stream:
# Serialize the entire event object
try:
event_dict = json.loads(event.model_dump_json())
except Exception:
# Fallback to dict conversion if model_dump_json fails
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
events.append(event_dict)
# Just capture everything as-is
if len(events) >= 200: # Increased limit
break
return events
except Exception as e:
# Return error information as events
error_event = {
"type": "error",
"scenario": scenario,
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
}
return [error_event]
def capture_workflow_stream_with_tracing(
client: OpenAI, workflow_id: str, scenario: str = "success"
) -> list[dict[str, Any]]:
"""Capture workflow streaming events."""
try:
stream = client.responses.create(
metadata={"entity_id": workflow_id},
input=(
"Process this spam detection workflow with multiple emails: "
"'Buy now!', 'Hello mom', 'URGENT: Click here!'"
),
stream=True,
)
events = []
for event in stream:
# Serialize the entire event object
try:
event_dict = json.loads(event.model_dump_json())
except Exception:
# Fallback to dict conversion if model_dump_json fails
event_dict = event.__dict__ if hasattr(event, "__dict__") else str(event)
events.append(event_dict)
# Just capture everything as-is
if len(events) >= 200: # Increased limit
break
return events
except Exception as e:
# Return error information as events
error_event = {
"type": "error",
"scenario": scenario,
"error_message": str(e),
"error_type": type(e).__name__,
"timestamp": time.time(),
"entity_type": "workflow",
}
return [error_event]
def main():
"""Main capture script - testing both success and failure scenarios."""
# Setup
output_dir = Path(__file__).parent / "captured_messages"
output_dir.mkdir(exist_ok=True)
# Start server
base_url, server_instance = start_server()
try:
# Create OpenAI client for success scenario
client = OpenAI(base_url=f"{base_url}/v1", api_key="dummy-key")
# Discover entities
conn = http.client.HTTPConnection("127.0.0.1", 8085, timeout=10)
try:
conn.request("GET", "/v1/entities")
response = conn.getresponse()
response_data = response.read().decode("utf-8")
entities = json.loads(response_data)["entities"]
finally:
conn.close()
all_results = {}
# Test each entity
for entity in entities:
entity_type = entity["type"]
entity_id = entity["id"]
if entity_type == "agent":
events = capture_agent_stream_with_tracing(client, entity_id, "success")
elif entity_type == "workflow":
events = capture_workflow_stream_with_tracing(client, entity_id, "success")
else:
continue
all_results[f"{entity_type}_{entity_id}"] = {"entity_info": entity, "events": events}
# Save results
file_path = output_dir / "entities_stream_events.json"
with open(file_path, "w") as f:
json.dump(
{"timestamp": time.time(), "server_type": "DevServer", "entities_tested": all_results},
f,
indent=2,
default=str,
)
finally:
# Cleanup server
with contextlib.suppress(Exception):
server_instance.should_exit = True
if __name__ == "__main__":
main()