Revert "Merge from main"

This reverts commit b8206a85d7.
This commit is contained in:
Dmytro Struk
2025-11-11 18:44:25 -08:00
Unverified
parent b8206a85d7
commit 85fcd230bf
231 changed files with 4138 additions and 19654 deletions
@@ -5,87 +5,20 @@
import importlib.metadata
import logging
import webbrowser
from collections.abc import Callable
from typing import Any
from ._conversations import CheckpointConversationManager
from ._server import DevServer
from .models import AgentFrameworkRequest, OpenAIError, OpenAIResponse, ResponseStreamEvent
from .models._discovery_models import DiscoveryResponse, EntityInfo, EnvVarRequirement
logger = logging.getLogger(__name__)
# Module-level cleanup registry (before serve() is called)
_cleanup_registry: dict[int, list[Callable[[], Any]]] = {}
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
def register_cleanup(entity: Any, *hooks: Callable[[], Any]) -> None:
"""Register cleanup hook(s) for an entity.
Cleanup hooks execute during DevUI server shutdown, before entity
clients are closed. Supports both synchronous and asynchronous callables.
Args:
entity: Agent, workflow, or other entity object
*hooks: One or more cleanup callables (sync or async)
Raises:
ValueError: If no hooks provided
Examples:
Single cleanup hook:
>>> from agent_framework.devui import serve, register_cleanup
>>> credential = DefaultAzureCredential()
>>> agent = ChatAgent(...)
>>> register_cleanup(agent, credential.close)
>>> serve(entities=[agent])
Multiple cleanup hooks:
>>> register_cleanup(agent, credential.close, session.close, db_pool.close)
Works with file-based discovery:
>>> # In agents/my_agent/agent.py
>>> from agent_framework.devui import register_cleanup
>>> credential = DefaultAzureCredential()
>>> agent = ChatAgent(...)
>>> register_cleanup(agent, credential.close)
>>> # Run: devui ./agents
"""
if not hooks:
raise ValueError("At least one cleanup hook required")
# Use id() to track entity identity (works across modules)
entity_id = id(entity)
if entity_id not in _cleanup_registry:
_cleanup_registry[entity_id] = []
_cleanup_registry[entity_id].extend(hooks)
logger.debug(
f"Registered {len(hooks)} cleanup hook(s) for {type(entity).__name__} "
f"(id: {entity_id}, total: {len(_cleanup_registry[entity_id])})"
)
def _get_registered_cleanup_hooks(entity: Any) -> list[Callable[[], Any]]:
"""Get cleanup hooks registered for an entity (internal use).
Args:
entity: Entity object to get hooks for
Returns:
List of cleanup hooks registered for the entity
"""
entity_id = id(entity)
return _cleanup_registry.get(entity_id, [])
def serve(
entities: list[Any] | None = None,
entities_dir: str | None = None,
@@ -95,9 +28,6 @@ def serve(
cors_origins: list[str] | None = None,
ui_enabled: bool = True,
tracing_enabled: bool = False,
mode: str = "developer",
auth_enabled: bool = False,
auth_token: str | None = None,
) -> None:
"""Launch Agent Framework DevUI with simple API.
@@ -110,9 +40,6 @@ def serve(
cors_origins: List of allowed CORS origins
ui_enabled: Whether to enable the UI
tracing_enabled: Whether to enable OpenTelemetry tracing
mode: Server mode - 'developer' (full access, verbose errors) or 'user' (restricted APIs, generic errors)
auth_enabled: Whether to enable Bearer token authentication
auth_token: Custom authentication token (auto-generated if not provided with auth_enabled=True)
"""
import re
@@ -126,52 +53,6 @@ def serve(
if not isinstance(port, int) or not (1 <= port <= 65535):
raise ValueError(f"Invalid port: {port}. Must be integer between 1 and 65535")
# Security check: Warn if network-exposed without authentication
if host not in ("127.0.0.1", "localhost") and not auth_enabled:
logger.warning("⚠️ WARNING: Exposing DevUI to network without authentication!")
logger.warning("⚠️ This is INSECURE - anyone on your network can access your agents")
logger.warning("💡 For network exposure, add --auth flag: devui --host 0.0.0.0 --auth")
# Handle authentication configuration
if auth_enabled:
import os
import secrets
# Check if token is in environment variable first
if not auth_token:
auth_token = os.environ.get("DEVUI_AUTH_TOKEN")
# Auto-generate token if STILL not provided
if not auth_token:
# Check if we're in a production-like environment
is_production = (
host not in ("127.0.0.1", "localhost") # Exposed to network
or os.environ.get("CI") == "true" # Running in CI
or os.environ.get("KUBERNETES_SERVICE_HOST") # Running in k8s
)
if is_production:
# REFUSE to start without explicit token
logger.error("❌ Authentication enabled but no token provided")
logger.error("❌ Auto-generated tokens are NOT secure for network-exposed deployments")
logger.error("💡 Set token: export DEVUI_AUTH_TOKEN=<your-secure-token>")
logger.error("💡 Or pass: serve(entities=[...], auth_token='your-token')")
raise ValueError("DEVUI_AUTH_TOKEN required when host is not localhost")
# Development mode: auto-generate and show
auth_token = secrets.token_urlsafe(32)
logger.info("🔒 Authentication enabled with auto-generated token")
logger.info("\n" + "=" * 70)
logger.info("🔑 DEV TOKEN (localhost only, shown once):")
logger.info(f" {auth_token}")
logger.info("=" * 70 + "\n")
else:
logger.info("🔒 Authentication enabled with provided token")
# Set environment variable for server to use
os.environ["AUTH_REQUIRED"] = "true"
os.environ["DEVUI_AUTH_TOKEN"] = auth_token
# Configure tracing environment variables if enabled
if tracing_enabled:
import os
@@ -191,12 +72,7 @@ def serve(
# Create server with direct parameters
server = DevServer(
entities_dir=entities_dir,
port=port,
host=host,
cors_origins=cors_origins,
ui_enabled=ui_enabled,
mode=mode,
entities_dir=entities_dir, port=port, host=host, cors_origins=cors_origins, ui_enabled=ui_enabled
)
# Register in-memory entities if provided
@@ -263,7 +139,6 @@ def main() -> None:
# Export main public API
__all__ = [
"AgentFrameworkRequest",
"CheckpointConversationManager",
"DevServer",
"DiscoveryResponse",
"EntityInfo",
@@ -272,6 +147,5 @@ __all__ = [
"OpenAIResponse",
"ResponseStreamEvent",
"main",
"register_cleanup",
"serve",
]
@@ -55,41 +55,6 @@ Examples:
parser.add_argument("--tracing", action="store_true", help="Enable OpenTelemetry tracing for Agent Framework")
parser.add_argument(
"--mode",
choices=["developer", "user"],
default=None,
help="Server mode - 'developer' (full access, verbose errors) or 'user' (restricted APIs, generic errors)",
)
# Add --dev/--no-dev as a convenient alternative to --mode
parser.add_argument(
"--dev",
dest="dev_mode",
action="store_true",
default=None,
help="Enable developer mode (shorthand for --mode developer)",
)
parser.add_argument(
"--no-dev",
dest="dev_mode",
action="store_false",
help="Disable developer mode (shorthand for --mode user)",
)
parser.add_argument(
"--auth",
action="store_true",
help="Enable authentication via Bearer token (required for deployed environments)",
)
parser.add_argument(
"--auth-token",
type=str,
help="Custom authentication token (auto-generated if not provided with --auth)",
)
parser.add_argument("--version", action="version", version=f"Agent Framework DevUI {get_version()}")
return parser
@@ -113,35 +78,26 @@ def validate_directory(directory: str) -> str:
abs_dir = os.path.abspath(directory)
if not os.path.exists(abs_dir):
print(f"Error: Directory '{directory}' does not exist", file=sys.stderr) # noqa: T201
print(f"Error: Directory '{directory}' does not exist", file=sys.stderr) # noqa: T201
sys.exit(1)
if not os.path.isdir(abs_dir):
print(f"Error: '{directory}' is not a directory", file=sys.stderr) # noqa: T201
print(f"Error: '{directory}' is not a directory", file=sys.stderr) # noqa: T201
sys.exit(1)
return abs_dir
def print_startup_info(
entities_dir: str, host: str, port: int, ui_enabled: bool, reload: bool, auth_token: str | None = None
) -> None:
def print_startup_info(entities_dir: str, host: str, port: int, ui_enabled: bool, reload: bool) -> None:
"""Print startup information."""
print("Agent Framework DevUI") # noqa: T201
print("🤖 Agent Framework DevUI") # noqa: T201
print("=" * 50) # noqa: T201
print(f"Entities directory: {entities_dir}") # noqa: T201
print(f"Server URL: http://{host}:{port}") # noqa: T201
print(f"UI enabled: {'Yes' if ui_enabled else 'No'}") # noqa: T201
print(f"Auto-reload: {'Yes' if reload else 'No'}") # noqa: T201
# Display auth token if authentication is enabled
if auth_token:
print("Authentication: Enabled") # noqa: T201
print(f"Auth token: {auth_token}") # noqa: T201
print("💡 Use this token in Authorization: Bearer <token> header") # noqa: T201
print(f"📁 Entities directory: {entities_dir}") # noqa: T201
print(f"🌐 Server URL: http://{host}:{port}") # noqa: T201
print(f"🎨 UI enabled: {'Yes' if ui_enabled else 'No'}") # noqa: T201
print(f"🔄 Auto-reload: {'Yes' if reload else 'No'}") # noqa: T201
print("=" * 50) # noqa: T201
print("Scanning for entities...") # noqa: T201
print("🔍 Scanning for entities...") # noqa: T201
def main() -> None:
@@ -158,19 +114,8 @@ def main() -> None:
# Extract parameters directly from args
ui_enabled = not args.headless
# Determine mode from --mode or --dev/--no-dev flags
if args.dev_mode is not None:
# --dev or --no-dev was specified
mode = "developer" if args.dev_mode else "user"
elif args.mode is not None:
# --mode was specified
mode = args.mode
else:
# Default to developer mode
mode = "developer"
# Print startup info (don't show token - serve() will handle it)
print_startup_info(entities_dir, args.host, args.port, ui_enabled, args.reload, None)
# Print startup info
print_startup_info(entities_dir, args.host, args.port, ui_enabled, args.reload)
# Import and start server
try:
@@ -183,17 +128,14 @@ def main() -> None:
auto_open=not args.no_open,
ui_enabled=ui_enabled,
tracing_enabled=args.tracing,
mode=mode,
auth_enabled=args.auth,
auth_token=args.auth_token, # Pass through explicit token only
)
except KeyboardInterrupt:
print("\nShutting down Agent Framework DevUI...") # noqa: T201
print("\n👋 Shutting down Agent Framework DevUI...") # noqa: T201
sys.exit(0)
except Exception as e:
logger.exception("Failed to start server")
print(f"Error: {e}", file=sys.stderr) # noqa: T201
print(f"Error: {e}", file=sys.stderr) # noqa: T201
sys.exit(1)
@@ -12,7 +12,6 @@ from abc import ABC, abstractmethod
from typing import Any, Literal, cast
from agent_framework import AgentThread, ChatMessage
from agent_framework._workflows._checkpoint import InMemoryCheckpointStorage
from openai.types.conversations import Conversation, ConversationDeletedResource
from openai.types.conversations.conversation_item import ConversationItem
from openai.types.conversations.message import Message
@@ -27,10 +26,6 @@ from openai.types.responses import (
# Type alias for OpenAI Message role literals
MessageRole = Literal["unknown", "user", "assistant", "system", "critic", "discriminator", "developer", "tool"]
# Checkpoint item type constants
CONVERSATION_ITEM_TYPE_CHECKPOINT = "checkpoint"
CONVERSATION_TYPE_CHECKPOINT_CONTAINER = "checkpoint_container"
class ConversationStore(ABC):
"""Abstract base class for conversation storage.
@@ -40,17 +35,14 @@ class ConversationStore(ABC):
"""
@abstractmethod
def create_conversation(
self, metadata: dict[str, str] | None = None, conversation_id: str | None = None
) -> Conversation:
def create_conversation(self, metadata: dict[str, str] | None = None) -> Conversation:
"""Create a new conversation (wraps AgentThread creation).
Args:
metadata: Optional metadata dict (e.g., {"agent_id": "weather_agent"})
conversation_id: Optional conversation ID (if None, generates one)
Returns:
Conversation object with generated or provided ID
Conversation object with generated ID
"""
pass
@@ -135,7 +127,7 @@ class ConversationStore(ABC):
@abstractmethod
def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
"""Get a specific conversation item by ID.
"""Get specific conversation item.
Args:
conversation_id: Conversation ID
@@ -192,23 +184,17 @@ class InMemoryConversationStore(ConversationStore):
# Item index for O(1) lookup: {conversation_id: {item_id: ConversationItem}}
self._item_index: dict[str, dict[str, ConversationItem]] = {}
def create_conversation(
self, metadata: dict[str, str] | None = None, conversation_id: str | None = None
) -> Conversation:
"""Create a new conversation with underlying AgentThread and checkpoint storage."""
conv_id = conversation_id or f"conv_{uuid.uuid4().hex}"
def create_conversation(self, metadata: dict[str, str] | None = None) -> Conversation:
"""Create a new conversation with underlying AgentThread."""
conv_id = f"conv_{uuid.uuid4().hex}"
created_at = int(time.time())
# Create AgentThread with default ChatMessageStore
thread = AgentThread()
# Create session-scoped checkpoint storage (one per conversation)
checkpoint_storage = InMemoryCheckpointStorage()
self._conversations[conv_id] = {
"id": conv_id,
"thread": thread,
"checkpoint_storage": checkpoint_storage, # Stored alongside thread
"metadata": metadata or {},
"created_at": created_at,
"items": [],
@@ -438,23 +424,6 @@ class InMemoryConversationStore(ConversationStore):
# Add function result items
items.extend(function_results)
# Include checkpoints from checkpoint storage as conversation items
checkpoint_storage = conv_data.get("checkpoint_storage")
if checkpoint_storage:
# Get all checkpoints for this conversation
checkpoints = await checkpoint_storage.list_checkpoints()
for checkpoint in checkpoints:
# Create a conversation item for each checkpoint
checkpoint_item = {
"id": f"checkpoint_{checkpoint.checkpoint_id}",
"type": "checkpoint",
"checkpoint_id": checkpoint.checkpoint_id,
"workflow_id": checkpoint.workflow_id,
"timestamp": checkpoint.timestamp,
"status": "completed",
}
items.append(cast(ConversationItem, checkpoint_item))
# Apply pagination
if order == "desc":
items = items[::-1]
@@ -473,9 +442,12 @@ class InMemoryConversationStore(ConversationStore):
return paginated_items, has_more
def get_item(self, conversation_id: str, item_id: str) -> ConversationItem | None:
"""Get a specific conversation item by ID."""
# Use the item index for O(1) lookup
conv_items = self._item_index.get(conversation_id, {})
"""Get specific conversation item - O(1) lookup via index."""
# Use index for O(1) lookup instead of linear search
conv_items = self._item_index.get(conversation_id)
if not conv_items:
return None
return conv_items.get(item_id)
def get_thread(self, conversation_id: str) -> AgentThread | None:
@@ -499,42 +471,3 @@ class InMemoryConversationStore(ConversationStore):
)
)
return results
class CheckpointConversationManager:
"""Manages checkpoint storage for workflow sessions - SESSION-SCOPED.
Simplified architecture: Each conversation has its own InMemoryCheckpointStorage
stored in conv_data["checkpoint_storage"]. This manager just retrieves it.
Session isolation comes from each conversation having a separate storage instance.
"""
def __init__(self, conversation_store: ConversationStore):
# Runtime validation since we need specific implementation details
if not isinstance(conversation_store, InMemoryConversationStore):
raise TypeError("CheckpointConversationManager currently requires InMemoryConversationStore")
self._store: InMemoryConversationStore = conversation_store
# Keep public reference for backward compatibility with tests
self.conversation_store = conversation_store
def get_checkpoint_storage(self, conversation_id: str) -> InMemoryCheckpointStorage:
"""Get the checkpoint storage for a specific conversation.
Args:
conversation_id: Conversation ID
Returns:
InMemoryCheckpointStorage instance for this conversation
Raises:
ValueError: If conversation not found
"""
# Access internal conversations dict (we know it's InMemoryConversationStore)
conv_data = self._store._conversations.get(conversation_id)
if not conv_data:
raise ValueError(f"Conversation {conversation_id} not found")
checkpoint_storage = conv_data["checkpoint_storage"]
if not isinstance(checkpoint_storage, InMemoryCheckpointStorage):
raise TypeError(f"Expected InMemoryCheckpointStorage but got {type(checkpoint_storage)}")
return checkpoint_storage
@@ -1,588 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
"""Azure Container Apps deployment manager for DevUI entities."""
import asyncio
import logging
import re
import secrets
import uuid
from collections.abc import AsyncGenerator
from datetime import datetime, timezone
from pathlib import Path
from .models._discovery_models import Deployment, DeploymentConfig, DeploymentEvent
logger = logging.getLogger(__name__)
class DeploymentManager:
"""Manages entity deployments to Azure Container Apps."""
def __init__(self) -> None:
"""Initialize deployment manager."""
self._deployments: dict[str, Deployment] = {}
async def deploy(self, config: DeploymentConfig, entity_path: Path) -> AsyncGenerator[DeploymentEvent, None]:
"""Deploy entity to Azure Container Apps with streaming events.
Args:
config: Deployment configuration
entity_path: Path to entity directory
Yields:
DeploymentEvent objects for real-time progress updates
Raises:
ValueError: If prerequisites not met or deployment fails
"""
deployment_id = str(uuid.uuid4())
try:
# Step 1: Validate prerequisites
yield DeploymentEvent(
type="deploy.validating",
message="Checking prerequisites (Azure CLI, Docker, authentication)...",
)
await self._validate_prerequisites()
# Step 2: Generate Dockerfile
yield DeploymentEvent(
type="deploy.dockerfile",
message="Generating Dockerfile with authentication enabled...",
)
_ = await self._generate_dockerfile(entity_path, config)
# Step 3: Generate auth token
yield DeploymentEvent(
type="deploy.token",
message="Generating secure authentication token...",
)
auth_token = secrets.token_urlsafe(32)
# Step 4: Discover existing Container App Environment
yield DeploymentEvent(
type="deploy.environment",
message="Checking for existing Container App Environment...",
)
# Step 5: Build and deploy with Azure CLI
yield DeploymentEvent(
type="deploy.building",
message=f"Deploying to Azure Container Apps ({config.region})...",
)
# Create a queue for streaming events from subprocess
event_queue: asyncio.Queue[DeploymentEvent] = asyncio.Queue()
# Run deployment in background task with event queue
deployment_task = asyncio.create_task(self._deploy_to_azure(config, entity_path, auth_token, event_queue))
# Stream events from queue while deployment runs
while True:
try:
# Check if deployment task is done
if deployment_task.done():
# Get the result or exception
deployment_url = await deployment_task
break
# Get event from queue with short timeout
event = await asyncio.wait_for(event_queue.get(), timeout=0.1)
yield event
except asyncio.TimeoutError:
# No event in queue, continue waiting
continue
# Step 5: Store deployment record
deployment = Deployment(
id=deployment_id,
entity_id=config.entity_id,
resource_group=config.resource_group,
app_name=config.app_name,
region=config.region,
url=deployment_url,
status="deployed",
created_at=datetime.now(timezone.utc).isoformat(),
)
self._deployments[deployment_id] = deployment
# Step 6: Success - return URL and token
yield DeploymentEvent(
type="deploy.completed",
message=f"Deployment successful! URL: {deployment_url}",
url=deployment_url,
auth_token=auth_token, # Shown once to user
)
except Exception as e:
error_msg = f"Deployment failed: {e!s}"
logger.exception(error_msg)
# Store failed deployment
deployment = Deployment(
id=deployment_id,
entity_id=config.entity_id,
resource_group=config.resource_group,
app_name=config.app_name,
region=config.region,
url="",
status="failed",
created_at=datetime.now(timezone.utc).isoformat(),
error=str(e),
)
self._deployments[deployment_id] = deployment
yield DeploymentEvent(
type="deploy.failed",
message=error_msg,
)
async def _validate_prerequisites(self) -> None:
"""Validate that Azure CLI, Docker, authentication, and resource providers are available.
Raises:
ValueError: If prerequisites not met
"""
# Check Azure CLI
az_check = await asyncio.create_subprocess_exec(
"az", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await az_check.communicate()
if az_check.returncode != 0:
raise ValueError(
"Azure CLI not found. Install from: https://learn.microsoft.com/cli/azure/install-azure-cli"
)
# Check Docker
docker_check = await asyncio.create_subprocess_exec(
"docker", "--version", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
await docker_check.communicate()
if docker_check.returncode != 0:
raise ValueError("Docker not found. Install from: https://www.docker.com/get-started")
# Check Azure authentication
az_account_check = await asyncio.create_subprocess_exec(
"az", "account", "show", stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, _ = await az_account_check.communicate()
if az_account_check.returncode != 0:
raise ValueError("Not authenticated with Azure. Run: az login")
# Check required resource providers are registered
required_providers = ["Microsoft.App", "Microsoft.ContainerRegistry", "Microsoft.OperationalInsights"]
unregistered_providers = []
# Get list of registered providers
provider_check = await asyncio.create_subprocess_exec(
"az",
"provider",
"list",
"--query",
"[?registrationState=='Registered'].namespace",
"--output",
"json",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, _stderr = await provider_check.communicate()
if provider_check.returncode == 0:
import json
try:
registered = json.loads(stdout.decode())
for provider in required_providers:
if provider not in registered:
unregistered_providers.append(provider)
except json.JSONDecodeError:
logger.warning("Could not parse provider list, skipping provider validation")
else:
logger.warning("Could not check provider registration status")
if unregistered_providers:
commands = [f"az provider register -n {p} --wait" for p in unregistered_providers]
raise ValueError(
f"Required Azure resource providers not registered: {', '.join(unregistered_providers)}\n\n"
f"Register them by running:\n" + "\n".join(commands) + "\n\n"
"This is a one-time setup per Azure subscription."
)
logger.info("All prerequisites validated successfully")
async def _generate_dockerfile(self, entity_path: Path, config: DeploymentConfig) -> Path:
"""Generate Dockerfile for entity deployment.
Args:
entity_path: Path to entity directory
config: Deployment configuration
Returns:
Path to generated Dockerfile
"""
# Validate ui_mode
if config.ui_mode not in ["user", "developer"]:
raise ValueError(f"Invalid ui_mode: {config.ui_mode}. Must be 'user' or 'developer'.")
# Check if requirements.txt exists in the entity directory
has_requirements = (entity_path / "requirements.txt").exists()
requirements_section = ""
if has_requirements:
logger.info(f"Found requirements.txt in {entity_path}, will include in Dockerfile")
requirements_section = """# Install entity dependencies
COPY requirements.txt ./
RUN pip install -r requirements.txt
"""
else:
logger.info(f"No requirements.txt found in {entity_path}, skipping dependency installation")
dockerfile_content = f"""FROM python:3.11-slim
WORKDIR /app
{requirements_section}# Install DevUI from PyPI
RUN pip install agent-framework-devui --pre
# Copy entity code
COPY . /app/entity/
ENV PORT=8080
EXPOSE 8080
# Launch DevUI with auth enabled (token from environment variable)
CMD ["devui", "/app/entity", "--mode", "{config.ui_mode}", "--host", "0.0.0.0", "--port", "8080", "--auth"]
"""
dockerfile_path = entity_path / "Dockerfile"
# Warn if Dockerfile already exists
if dockerfile_path.exists():
logger.warning(f"Dockerfile already exists at {dockerfile_path}, overwriting...")
dockerfile_path.write_text(dockerfile_content)
logger.info(f"Generated Dockerfile at {dockerfile_path}")
return dockerfile_path
async def _discover_container_app_environment(self, resource_group: str, region: str) -> str | None:
"""Discover existing Container App Environment in resource group.
Args:
resource_group: Resource group name
region: Azure region (for filtering if needed)
Returns:
Environment name if found, None otherwise
"""
cmd = [
"az",
"containerapp",
"env",
"list",
"--resource-group",
resource_group,
"--query",
"[0].name",
"--output",
"tsv",
]
logger.info(f"Discovering existing Container App Environments in {resource_group}...")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode == 0:
env_name = stdout.decode().strip()
if env_name:
logger.info(f"Found existing environment: {env_name}")
return env_name
logger.info("No existing environments found in resource group")
return None
logger.warning(f"Failed to query environments: {stderr.decode()}")
return None
async def _deploy_to_azure(
self, config: DeploymentConfig, entity_path: Path, auth_token: str, event_queue: asyncio.Queue[DeploymentEvent]
) -> str:
"""Deploy to Azure Container Apps, reusing existing environments.
Args:
config: Deployment configuration
entity_path: Path to entity directory
auth_token: Authentication token to inject
event_queue: Queue for streaming progress events
Returns:
Deployment URL
Raises:
ValueError: If deployment fails
"""
# Step 1: Try to discover existing Container App Environment
existing_env = await self._discover_container_app_environment(config.resource_group, config.region)
if existing_env:
# Use existing environment - avoids needing environment creation permissions
logger.info(f"Reusing existing Container App Environment: {existing_env} (cost efficient, no side effects)")
cmd = [
"az",
"containerapp",
"up",
"--name",
config.app_name,
"--resource-group",
config.resource_group,
"--environment",
existing_env,
"--source",
str(entity_path),
"--env-vars",
f"DEVUI_AUTH_TOKEN={auth_token}",
"--ingress",
"external",
"--target-port",
"8080",
]
logger.info(f"Creating new Container App '{config.app_name}' in environment '{existing_env}'...")
else:
# No existing environment - try to create one (may fail if no permissions)
logger.warning(
"No existing Container App Environment found. "
"Attempting to create new environment (requires Microsoft.App/managedEnvironments/write permission)..."
)
cmd = [
"az",
"containerapp",
"up",
"--name",
config.app_name,
"--resource-group",
config.resource_group,
"--location",
config.region,
"--source",
str(entity_path),
"--env-vars",
f"DEVUI_AUTH_TOKEN={auth_token}",
"--ingress",
"external",
"--target-port",
"8080",
]
logger.info(f"Running: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT
)
# Stream output line by line
output_lines = []
try:
if not process.stdout:
raise ValueError("Failed to capture process output")
while True:
# Read with timeout
line = await asyncio.wait_for(process.stdout.readline(), timeout=600)
if not line:
break
line_text = line.decode().strip()
if line_text:
output_lines.append(line_text)
# Stream meaningful updates to user
if "WARNING:" in line_text:
# Parse and send user-friendly warnings
if "Creating resource group" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress",
message=f"Creating resource group '{config.resource_group}'...",
)
)
elif "Creating ContainerAppEnvironment" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress",
message="Setting up Container App Environment (this may take 2-3 minutes)...",
)
)
elif "Registering resource provider" in line_text:
provider = line_text.split("provider")[-1].strip()
if provider.endswith("..."):
provider = provider[:-3]
await event_queue.put(
DeploymentEvent(
type="deploy.progress", message=f"Registering Azure provider{provider}..."
)
)
elif "Creating Azure Container Registry" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress", message="Creating Container Registry for your images..."
)
)
elif "No Log Analytics workspace" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress", message="Creating Log Analytics workspace for monitoring..."
)
)
elif "Building image" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress",
message="Building Docker image (this may take several minutes)...",
)
)
elif "Pushing image" in line_text:
await event_queue.put(
DeploymentEvent(
type="deploy.progress", message="Pushing image to Azure Container Registry..."
)
)
elif "Creating Container App" in line_text:
await event_queue.put(
DeploymentEvent(type="deploy.progress", message="Creating your Container App...")
)
elif "Container app created" in line_text:
await event_queue.put(
DeploymentEvent(type="deploy.progress", message="Container app created successfully!")
)
elif "ERROR:" in line_text:
# Stream errors immediately
await event_queue.put(DeploymentEvent(type="deploy.error", message=line_text))
elif "Step" in line_text and "/" in line_text:
# Docker build steps
await event_queue.put(
DeploymentEvent(type="deploy.progress", message=f"Docker build: {line_text}")
)
elif "https://" in line_text and ".azurecontainerapps.io" in line_text:
# Deployment URL detected
await event_queue.put(
DeploymentEvent(type="deploy.progress", message="Deployment URL generated!")
)
# Wait for process to complete
return_code = await process.wait()
if return_code != 0:
error_output = "\n".join(output_lines[-10:]) # Last 10 lines for context
raise ValueError(f"Azure deployment failed:\n{error_output}")
except asyncio.TimeoutError as e:
process.kill()
raise ValueError(
"Azure deployment timed out after 10 minutes. Please check Azure portal for status."
) from e
# Parse output to extract FQDN
output = "\n".join(output_lines)
logger.debug(f"Azure CLI output: {output}")
# Extract FQDN from output (az containerapp up returns it)
# Format: https://<app-name>.<random-id>.<region>.azurecontainerapps.io
deployment_url = self._extract_fqdn_from_output(output, config.app_name)
logger.info(f"Deployment successful: {deployment_url}")
return deployment_url
def _extract_fqdn_from_output(self, output: str, app_name: str) -> str:
"""Extract FQDN from Azure CLI output.
Args:
output: Azure CLI command output
app_name: Container app name
Returns:
Full HTTPS URL to deployed app
"""
# Try to find FQDN in output
for line in output.split("\n"):
if "fqdn" in line.lower() or app_name in line:
# Extract URL-like string
match = re.search(r"https?://[\w\-\.]+\.azurecontainerapps\.io", line)
if match:
return match.group(0)
# If we can't extract FQDN, fail explicitly rather than return a broken URL
logger.error(f"Could not extract FQDN from Azure CLI output. Output:\n{output}")
raise ValueError(
"Could not extract deployment URL from Azure CLI output. "
"The deployment may have succeeded - check the Azure portal for your container app URL."
)
async def list_deployments(self, entity_id: str | None = None) -> list[Deployment]:
"""List all deployments, optionally filtered by entity.
Args:
entity_id: Optional entity ID to filter by
Returns:
List of deployment records
"""
if entity_id:
return [d for d in self._deployments.values() if d.entity_id == entity_id]
return list(self._deployments.values())
async def get_deployment(self, deployment_id: str) -> Deployment | None:
"""Get deployment by ID.
Args:
deployment_id: Deployment ID
Returns:
Deployment record or None if not found
"""
return self._deployments.get(deployment_id)
async def delete_deployment(self, deployment_id: str) -> None:
"""Delete deployment from Azure Container Apps.
Args:
deployment_id: Deployment ID to delete
Raises:
ValueError: If deployment not found or deletion fails
"""
deployment = self._deployments.get(deployment_id)
if not deployment:
raise ValueError(f"Deployment {deployment_id} not found")
# Execute: az containerapp delete
cmd = [
"az",
"containerapp",
"delete",
"--name",
deployment.app_name,
"--resource-group",
deployment.resource_group,
"--yes", # Skip confirmation
]
logger.info(f"Deleting deployment: {' '.join(cmd)}")
process = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await process.communicate()
if process.returncode != 0:
error_output = stderr.decode() if stderr else stdout.decode()
raise ValueError(f"Deployment deletion failed: {error_output}")
# Remove from store
del self._deployments[deployment_id]
logger.info(f"Deployment {deployment_id} deleted successfully")
@@ -4,7 +4,6 @@
from __future__ import annotations
import ast
import importlib
import importlib.util
import logging
@@ -32,7 +31,6 @@ class EntityDiscovery:
self.entities_dir = entities_dir
self._entities: dict[str, EntityInfo] = {}
self._loaded_objects: dict[str, Any] = {}
self._cleanup_hooks: dict[str, list[Any]] = {}
async def discover_entities(self) -> list[EntityInfo]:
"""Scan for Agent Framework entities.
@@ -72,15 +70,14 @@ class EntityDiscovery:
"""
return self._loaded_objects.get(entity_id)
async def load_entity(self, entity_id: str, checkpoint_manager: Any = None) -> Any:
"""Load entity on-demand and inject checkpoint storage for workflows.
async def load_entity(self, entity_id: str) -> Any:
"""Load entity on-demand (lazy loading).
This method implements lazy loading by importing the entity module only when needed.
In-memory entities are returned from cache immediately.
Args:
entity_id: Entity identifier
checkpoint_manager: Optional checkpoint manager for workflow storage injection
Returns:
Loaded entity object
@@ -110,13 +107,9 @@ class EntityDiscovery:
else:
raise ValueError(
f"Unsupported entity source: {entity_info.source}. "
f"Only 'directory' and 'in-memory' sources are supported."
f"Only 'directory' and 'in_memory' sources are supported."
)
# Note: Checkpoint storage is now injected at runtime via run_stream() parameter,
# not at load time. This provides cleaner architecture and explicit control flow.
# See _executor.py _execute_workflow() for runtime checkpoint storage injection.
# Enrich metadata with actual entity data
# Don't pass entity_type if it's "unknown" - let inference determine the real type
enriched_info = await self.create_entity_info_from_object(
@@ -129,27 +122,11 @@ class EntityDiscovery:
# Preserve the original path from sparse metadata
if "path" in entity_info.metadata:
enriched_info.metadata["path"] = entity_info.metadata["path"]
# Now that we have the path, properly check deployment support
entity_path = Path(entity_info.metadata["path"])
deployment_supported, deployment_reason = self._check_deployment_support(entity_path, entity_info.source)
enriched_info.deployment_supported = deployment_supported
enriched_info.deployment_reason = deployment_reason
enriched_info.metadata["lazy_loaded"] = True
self._entities[entity_id] = enriched_info
# Cache the loaded object
self._loaded_objects[entity_id] = entity_obj
# Check module-level registry for cleanup hooks
from . import _get_registered_cleanup_hooks
registered_hooks = _get_registered_cleanup_hooks(entity_obj)
if registered_hooks:
if entity_id not in self._cleanup_hooks:
self._cleanup_hooks[entity_id] = []
self._cleanup_hooks[entity_id].extend(registered_hooks)
logger.debug(f"Discovered {len(registered_hooks)} registered cleanup hook(s) for: {entity_id}")
logger.info(f"Successfully loaded entity: {entity_id} (type: {enriched_info.type})")
return entity_obj
@@ -210,17 +187,6 @@ class EntityDiscovery:
"""
return list(self._entities.values())
def get_cleanup_hooks(self, entity_id: str) -> list[Any]:
"""Get cleanup hooks registered for an entity.
Args:
entity_id: Entity identifier
Returns:
List of cleanup hooks for the entity
"""
return self._cleanup_hooks.get(entity_id, [])
def invalidate_entity(self, entity_id: str) -> None:
"""Invalidate (clear cache for) an entity to enable hot reload.
@@ -273,17 +239,6 @@ class EntityDiscovery:
"""
self._entities[entity_id] = entity_info
self._loaded_objects[entity_id] = entity_object
# Check module-level registry for cleanup hooks
from . import _get_registered_cleanup_hooks
registered_hooks = _get_registered_cleanup_hooks(entity_object)
if registered_hooks:
if entity_id not in self._cleanup_hooks:
self._cleanup_hooks[entity_id] = []
self._cleanup_hooks[entity_id].extend(registered_hooks)
logger.debug(f"Discovered {len(registered_hooks)} registered cleanup hook(s) for: {entity_id}")
logger.debug(f"Registered entity: {entity_id} ({entity_info.type})")
async def create_entity_info_from_object(
@@ -350,17 +305,6 @@ class EntityDiscovery:
elif not has_run_stream and not has_run:
logger.warning(f"Agent '{entity_id}' lacks both run() and run_stream() methods. May not work.")
# Check deployment support based on source
# For directory-based entities, we need the path to verify deployment support
deployment_supported = False
deployment_reason = "In-memory entities cannot be deployed (no source directory)"
if source == "directory":
# Directory-based entity - will be checked properly after enrichment when path is available
# For now, mark as potentially deployable - will be re-evaluated after enrichment
deployment_supported = True
deployment_reason = "Ready for deployment (pending path verification)"
# Create EntityInfo with Agent Framework specifics
return EntityInfo(
id=entity_id,
@@ -377,8 +321,6 @@ class EntityDiscovery:
executors=tools_list if entity_type == "workflow" else [],
input_schema={"type": "string"}, # Default schema
start_executor_id=tools_list[0] if tools_list and entity_type == "workflow" else None,
deployment_supported=deployment_supported,
deployment_reason=deployment_reason,
metadata={
"source": "agent_framework_object",
"class_name": entity_object.__class__.__name__
@@ -462,31 +404,6 @@ class EntityDiscovery:
# Has __init__.py but no specific file
return "unknown"
def _check_deployment_support(self, entity_path: Path, source: str) -> tuple[bool, str | None]:
"""Check if entity can be deployed to Azure Container Apps.
Args:
entity_path: Path to entity directory or file
source: Entity source ("directory" or "in_memory")
Returns:
Tuple of (supported, reason) explaining deployment eligibility
"""
# In-memory entities cannot be deployed
if source == "in_memory":
return False, "In-memory entities cannot be deployed (no source directory)"
# File-based entities need a directory structure for deployment
if not entity_path.is_dir():
return False, "Only directory-based entities can be deployed"
# Must have __init__.py
if not (entity_path / "__init__.py").exists():
return False, "Missing __init__.py file"
# Passed all checks
return True, "Ready for deployment"
def _register_sparse_entity(self, dir_path: Path) -> None:
"""Register entity with sparse metadata (no import).
@@ -496,9 +413,6 @@ class EntityDiscovery:
entity_id = dir_path.name
entity_type = self._detect_entity_type(dir_path)
# Check deployment support
deployment_supported, deployment_reason = self._check_deployment_support(dir_path, "directory")
entity_info = EntityInfo(
id=entity_id,
name=entity_id.replace("_", " ").title(),
@@ -507,8 +421,6 @@ class EntityDiscovery:
tools=[], # Sparse - will be populated on load
description="", # Sparse - will be populated on load
source="directory",
deployment_supported=deployment_supported,
deployment_reason=deployment_reason,
metadata={
"path": str(dir_path),
"discovered": True,
@@ -519,52 +431,14 @@ class EntityDiscovery:
self._entities[entity_id] = entity_info
logger.debug(f"Registered sparse entity: {entity_id} (type: {entity_type})")
def _has_entity_exports(self, file_path: Path) -> bool:
"""Check if a Python file has entity exports (agent or workflow) using AST parsing.
This safely checks for module-level assignments like:
- agent = ChatAgent(...)
- workflow = WorkflowBuilder()...
Args:
file_path: Python file to check
Returns:
True if file has 'agent' or 'workflow' exports
"""
try:
# Read and parse the file's AST
source = file_path.read_text(encoding="utf-8")
tree = ast.parse(source, filename=str(file_path))
# Look for module-level assignments of 'agent' or 'workflow'
for node in ast.walk(tree):
if isinstance(node, ast.Assign):
for target in node.targets:
if isinstance(target, ast.Name) and target.id in ("agent", "workflow"):
return True
except Exception as e:
logger.debug(f"Could not parse {file_path} for entity exports: {e}")
return False
return False
def _register_sparse_file_entity(self, file_path: Path) -> None:
"""Register file-based entity with sparse metadata (no import).
Args:
file_path: Entity Python file
"""
# Check if file has valid entity exports using AST parsing
if not self._has_entity_exports(file_path):
logger.debug(f"Skipping {file_path.name} - no 'agent' or 'workflow' exports found")
return
entity_id = file_path.stem
# Check deployment support (file-based entities cannot be deployed)
deployment_supported, deployment_reason = self._check_deployment_support(file_path, "directory")
# File-based entities are typically agents, but we can't know for sure without importing
entity_info = EntityInfo(
id=entity_id,
@@ -574,8 +448,6 @@ class EntityDiscovery:
tools=[],
description="",
source="directory",
deployment_supported=deployment_supported,
deployment_reason=deployment_reason,
metadata={
"path": str(file_path),
"discovered": True,
@@ -9,7 +9,6 @@ from collections.abc import AsyncGenerator
from typing import Any
from agent_framework import AgentProtocol
from agent_framework._workflows._events import RequestInfoEvent
from ._conversations import ConversationStore, InMemoryConversationStore
from ._discovery import EntityDiscovery
@@ -51,11 +50,6 @@ class AgentFrameworkExecutor:
# Use provided conversation store or default to in-memory
self.conversation_store = conversation_store or InMemoryConversationStore()
# Create checkpoint manager (wraps conversation store)
from ._conversations import CheckpointConversationManager
self.checkpoint_manager = CheckpointConversationManager(self.conversation_store)
def _setup_tracing_provider(self) -> None:
"""Set up our own TracerProvider so we can add processors."""
try:
@@ -85,20 +79,10 @@ class AgentFrameworkExecutor:
# Configure Agent Framework tracing only if ENABLE_OTEL is set
if os.environ.get("ENABLE_OTEL"):
try:
from agent_framework.observability import OBSERVABILITY_SETTINGS, setup_observability
from agent_framework.observability import setup_observability
# Only configure if not already executed
if not OBSERVABILITY_SETTINGS._executed_setup:
# Get OTLP endpoint from either custom or standard env var
# This handles the case where env vars are set after ObservabilitySettings was imported
otlp_endpoint = os.environ.get("OTLP_ENDPOINT") or os.environ.get("OTEL_EXPORTER_OTLP_ENDPOINT")
# Pass the endpoint explicitly to setup_observability
# This ensures OTLP exporters are created even if env vars were set late
setup_observability(enable_sensitive_data=True, otlp_endpoint=otlp_endpoint)
logger.info("Enabled Agent Framework observability")
else:
logger.debug("Agent Framework observability already configured")
setup_observability(enable_sensitive_data=True)
logger.info("Enabled Agent Framework observability")
except Exception as e:
logger.warning(f"Failed to enable Agent Framework observability: {e}")
else:
@@ -189,7 +173,7 @@ class AgentFrameworkExecutor:
entity_info = self.get_entity_info(entity_id)
# Trigger lazy loading (will return from cache if already loaded)
entity_obj = await self.entity_discovery.load_entity(entity_id, checkpoint_manager=self.checkpoint_manager)
entity_obj = await self.entity_discovery.load_entity(entity_id)
if not entity_obj:
raise EntityNotFoundError(f"Entity object for '{entity_id}' not found")
@@ -206,15 +190,6 @@ class AgentFrameworkExecutor:
yield event
elif entity_info.type == "workflow":
async for event in self._execute_workflow(entity_obj, request, trace_collector):
# Log RequestInfoEvent for debugging HIL flow
event_class = event.__class__.__name__ if hasattr(event, "__class__") else type(event).__name__
if event_class == "RequestInfoEvent":
logger.info("🔔 [EXECUTOR] RequestInfoEvent detected from workflow!")
logger.info(f" request_id: {getattr(event, 'request_id', 'N/A')}")
logger.info(f" source_executor_id: {getattr(event, 'source_executor_id', 'N/A')}")
logger.info(f" request_type: {getattr(event, 'request_type', 'N/A')}")
data = getattr(event, "data", None)
logger.info(f" data type: {type(data).__name__ if data else 'None'}")
yield event
else:
raise ValueError(f"Unsupported entity type: {entity_info.type}")
@@ -314,7 +289,7 @@ class AgentFrameworkExecutor:
async def _execute_workflow(
self, workflow: Any, request: AgentFrameworkRequest, trace_collector: Any
) -> AsyncGenerator[Any, None]:
"""Execute Agent Framework workflow with checkpoint support via conversation items.
"""Execute Agent Framework workflow with trace collection.
Args:
workflow: Workflow object to execute
@@ -325,199 +300,23 @@ class AgentFrameworkExecutor:
Workflow events and trace events
"""
try:
entity_id = request.get_entity_id() or "unknown"
# Get input data directly from request.input field
input_data = request.input
logger.debug(f"Using input field: {type(input_data)}")
# Get or create session conversation for checkpoint storage
conversation_id = request.get_conversation_id()
if not conversation_id:
# Create default session if not provided
import time
import uuid
# Parse input based on workflow's expected input type
parsed_input = await self._parse_workflow_input(workflow, input_data)
conversation_id = f"session_{entity_id}_{uuid.uuid4().hex[:8]}"
logger.info(f"Created new workflow session: {conversation_id}")
logger.debug(f"Executing workflow with parsed input type: {type(parsed_input)}")
# Create conversation in store
self.conversation_store.create_conversation(
metadata={
"entity_id": entity_id,
"type": "workflow_session",
"created_at": str(int(time.time())),
},
conversation_id=conversation_id,
)
else:
# Validate conversation exists, create if missing (handles deleted conversations)
import time
# Use Agent Framework workflow's native streaming
async for event in workflow.run_stream(parsed_input):
# Yield any pending trace events first
for trace_event in trace_collector.get_pending_events():
yield trace_event
existing = self.conversation_store.get_conversation(conversation_id)
if not existing:
logger.warning(f"Conversation {conversation_id} not found (may have been deleted), recreating")
self.conversation_store.create_conversation(
metadata={
"entity_id": entity_id,
"type": "workflow_session",
"created_at": str(int(time.time())),
},
conversation_id=conversation_id,
)
# Get session-scoped checkpoint storage (InMemoryCheckpointStorage from conv_data)
# Each conversation has its own storage instance, providing automatic session isolation.
# This storage is passed to workflow.run_stream() which sets it as runtime override,
# ensuring all checkpoint operations (save/load) use THIS conversation's storage.
# The framework guarantees runtime storage takes precedence over build-time storage.
checkpoint_storage = self.checkpoint_manager.get_checkpoint_storage(conversation_id)
# Check for HIL responses first
hil_responses = self._extract_workflow_hil_responses(request.input)
# Determine checkpoint_id (explicit or auto-latest for HIL responses)
checkpoint_id = None
if request.extra_body and "checkpoint_id" in request.extra_body:
checkpoint_id = request.extra_body["checkpoint_id"]
logger.debug(f"Using explicit checkpoint_id from request: {checkpoint_id}")
elif hil_responses:
# Only auto-resume from latest checkpoint when we have HIL responses
# Regular "Run" clicks should start fresh, not resume from checkpoints
checkpoints = await checkpoint_storage.list_checkpoints() # No workflow_id filter needed!
if checkpoints:
latest = max(checkpoints, key=lambda cp: cp.timestamp)
checkpoint_id = latest.checkpoint_id
logger.info(f"Auto-resuming from latest checkpoint in session {conversation_id}: {checkpoint_id}")
else:
logger.warning(f"HIL responses received but no checkpoints in session {conversation_id}")
if hil_responses:
# HIL continuation mode requires checkpointing
if not checkpoint_id:
error_msg = (
"Cannot process HIL responses without a checkpoint. "
"Workflows using HIL must be configured with .with_checkpointing() "
"and a checkpoint must exist before sending responses."
)
logger.error(error_msg)
yield {"type": "error", "message": error_msg}
return
logger.info(f"Resuming workflow with HIL responses for {len(hil_responses)} request(s)")
# Unwrap primitive responses if they're wrapped in {response: value} format
from ._utils import parse_input_for_type
unwrapped_responses = {}
for request_id, response_value in hil_responses.items():
if isinstance(response_value, dict) and "response" in response_value:
response_value = response_value["response"]
unwrapped_responses[request_id] = response_value
hil_responses = unwrapped_responses
# NOTE: Two-step approach for stateless HTTP (framework limitation):
# 1. Restore checkpoint to load pending requests into workflow's in-memory state
# 2. Then send responses using send_responses_streaming
# Future: Framework should support run_stream(checkpoint_id, responses) in single call
# (checkpoint_id is guaranteed to exist due to earlier validation)
logger.debug(f"Restoring checkpoint {checkpoint_id} then sending HIL responses")
try:
# Step 1: Restore checkpoint to populate workflow's in-memory pending requests
restored = False
async for _event in workflow.run_stream(
checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage
):
restored = True
break # Stop immediately after restoration, don't process events
if not restored:
raise RuntimeError("Checkpoint restoration did not yield any events")
# Reset running flags so we can call send_responses_streaming
if hasattr(workflow, "_is_running"):
workflow._is_running = False
if hasattr(workflow, "_runner") and hasattr(workflow._runner, "_running"):
workflow._runner._running = False
# Extract response types from restored workflow and convert responses to proper types
try:
if hasattr(workflow, "_runner") and hasattr(workflow._runner, "context"):
runner_context = workflow._runner.context
pending_requests_dict = await runner_context.get_pending_request_info_events()
converted_responses = {}
for request_id, response_value in hil_responses.items():
if request_id in pending_requests_dict:
pending_request = pending_requests_dict[request_id]
if hasattr(pending_request, "response_type"):
response_type = pending_request.response_type
try:
response_value = parse_input_for_type(response_value, response_type)
logger.debug(
f"Converted HIL response for {request_id} to {type(response_value)}"
)
except Exception as e:
logger.warning(f"Failed to convert HIL response for {request_id}: {e}")
converted_responses[request_id] = response_value
hil_responses = converted_responses
except Exception as e:
logger.warning(f"Could not convert HIL responses to proper types: {e}")
# Step 2: Now send responses to the in-memory workflow
async for event in workflow.send_responses_streaming(hil_responses):
for trace_event in trace_collector.get_pending_events():
yield trace_event
yield event
except (AttributeError, ValueError, RuntimeError) as e:
error_msg = f"Failed to send HIL responses: {e}"
logger.error(error_msg)
yield {"type": "error", "message": error_msg}
elif checkpoint_id:
# Resume from checkpoint (explicit or auto-latest) using unified API
logger.info(f"Resuming workflow from checkpoint {checkpoint_id} in session {conversation_id}")
try:
async for event in workflow.run_stream(
checkpoint_id=checkpoint_id, checkpoint_storage=checkpoint_storage
):
if isinstance(event, RequestInfoEvent):
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
yield trace_event
yield event
# Note: Removed break on RequestInfoEvent - continue yielding all events
# The workflow is already paused by ctx.request_info() in the framework
# DevUI should continue yielding events even during HIL pause
except ValueError as e:
error_msg = f"Cannot resume from checkpoint: {e}"
logger.error(error_msg)
yield {"type": "error", "message": error_msg}
else:
# First run - pass DevUI's checkpoint storage to enable checkpointing
logger.info(f"Starting fresh workflow in session {conversation_id}")
parsed_input = await self._parse_workflow_input(workflow, request.input)
async for event in workflow.run_stream(parsed_input, checkpoint_storage=checkpoint_storage):
if isinstance(event, RequestInfoEvent):
self._enrich_request_info_event_with_response_schema(event, workflow)
for trace_event in trace_collector.get_pending_events():
yield trace_event
yield event
# Note: Removed break on RequestInfoEvent - continue yielding all events
# The workflow is already paused by ctx.request_info() in the framework
# DevUI should continue yielding events even during HIL pause
# Then yield the workflow event
yield event
except Exception as e:
logger.error(f"Error in workflow execution: {e}")
@@ -770,59 +569,6 @@ class AgentFrameworkExecutor:
return start_executor, message_types
def _extract_workflow_hil_responses(self, input_data: Any) -> dict[str, Any] | None:
"""Extract workflow HIL responses from OpenAI input format.
Looks for special content type: workflow_hil_response
Args:
input_data: OpenAI ResponseInputParam
Returns:
Dict of {request_id: response_value} if found, None otherwise
"""
if not isinstance(input_data, list):
return None
for item in input_data:
if isinstance(item, dict) and item.get("type") == "message":
message_content = item.get("content", [])
if isinstance(message_content, list):
for content_item in message_content:
if isinstance(content_item, dict):
content_type = content_item.get("type")
if content_type == "workflow_hil_response":
# Extract responses dict
# dict.get() returns Any, so we explicitly type it
responses: dict[str, Any] = content_item.get("responses", {}) # type: ignore[assignment]
logger.info(f"Found workflow HIL responses: {list(responses.keys())}")
return responses
return None
def _get_or_create_conversation(self, conversation_id: str, entity_id: str) -> Any:
"""Get existing conversation or create a new one.
Args:
conversation_id: Conversation ID from frontend
entity_id: Entity ID (e.g., "spam_workflow") for metadata filtering
Returns:
Conversation object
"""
conversation = self.conversation_store.get_conversation(conversation_id)
if not conversation:
# Create conversation with frontend's ID
# Use agent_id in metadata so it can be filtered by list_conversations(agent_id=...)
conversation = self.conversation_store.create_conversation(
metadata={"agent_id": entity_id}, conversation_id=conversation_id
)
logger.info(f"Created conversation {conversation_id} for entity {entity_id}")
return conversation
def _parse_structured_workflow_input(self, workflow: Any, input_data: dict[str, Any]) -> Any:
"""Parse structured input data for workflow execution.
@@ -898,53 +644,3 @@ class AgentFrameworkExecutor:
except Exception as e:
logger.debug(f"Error parsing workflow input: {e}")
return raw_input
def _enrich_request_info_event_with_response_schema(self, event: Any, workflow: Any) -> None:
"""Extract response type from workflow executor and attach response schema to RequestInfoEvent.
Args:
event: RequestInfoEvent to enrich
workflow: Workflow object containing executors
"""
try:
from agent_framework_devui._utils import extract_response_type_from_executor, generate_input_schema
# Get source executor ID and request type from event
source_executor_id = getattr(event, "source_executor_id", None)
request_type = getattr(event, "request_type", None)
if not source_executor_id or not request_type:
logger.debug("RequestInfoEvent missing source_executor_id or request_type")
return
# Find the source executor in the workflow
if not hasattr(workflow, "executors") or not isinstance(workflow.executors, dict):
logger.debug("Workflow doesn't have executors dict")
return
source_executor = workflow.executors.get(source_executor_id)
if not source_executor:
logger.debug(f"Could not find executor '{source_executor_id}' in workflow")
return
# Extract response type from the executor's handler signature
response_type = extract_response_type_from_executor(source_executor, request_type)
if response_type:
# Generate JSON schema for response type
response_schema = generate_input_schema(response_type)
# Attach response_schema to event for mapper to include in output
event._response_schema = response_schema
logger.debug(f"Extracted response schema for {request_type.__name__}: {response_schema}")
else:
# Even if extraction fails, provide a reasonable default to avoid warnings
logger.debug(
f"Could not extract response type for {request_type.__name__}, using default string schema"
)
response_schema = {"type": "string"}
event._response_schema = response_schema
except Exception as e:
logger.warning(f"Failed to enrich RequestInfoEvent with response schema: {e}")
@@ -34,9 +34,6 @@ from .models import (
ResponseFunctionCallArgumentsDeltaEvent,
ResponseFunctionResultComplete,
ResponseFunctionToolCall,
ResponseOutputData,
ResponseOutputFile,
ResponseOutputImage,
ResponseOutputItemAddedEvent,
ResponseOutputMessage,
ResponseOutputText,
@@ -163,7 +160,7 @@ class MessageMapper:
if isinstance(raw_event, ResponseTraceEvent):
return [
ResponseTraceEventComplete(
type="response.trace.completed",
type="response.trace.complete",
data=raw_event.data,
item_id=context["item_id"],
sequence_number=self._next_sequence(context),
@@ -276,7 +273,7 @@ class MessageMapper:
id=f"resp_{uuid.uuid4().hex[:12]}",
object="response",
created_at=datetime.now().timestamp(),
model=request.model or "devui",
model=request.model,
output=[response_output_message],
usage=usage,
parallel_tool_calls=False,
@@ -341,147 +338,6 @@ class MessageMapper:
context["sequence_counter"] += 1
return int(context["sequence_counter"])
def _serialize_value(self, value: Any) -> Any:
"""Recursively serialize a value, handling complex nested objects.
Handles:
- Primitives (str, int, float, bool, None)
- Collections (list, tuple, set, dict)
- SerializationMixin objects (ChatMessage, etc.) - calls to_dict()
- Pydantic models - calls model_dump()
- Dataclasses - recursively serializes with asdict()
- Enums - extracts value
- datetime/date/UUID - converts to ISO string
Args:
value: Value to serialize
Returns:
JSON-serializable representation
"""
from dataclasses import is_dataclass
from datetime import date, datetime
from enum import Enum
from uuid import UUID
# Handle None
if value is None:
return None
# Handle primitives
if isinstance(value, (str, int, float, bool)):
return value
# Handle datetime/date - convert to ISO format
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, date):
return value.isoformat()
# Handle UUID - convert to string
if isinstance(value, UUID):
return str(value)
# Handle Enums - extract value
if isinstance(value, Enum):
return value.value
# Handle lists/tuples/sets - recursively serialize elements
if isinstance(value, (list, tuple)):
return [self._serialize_value(item) for item in value]
if isinstance(value, set):
return [self._serialize_value(item) for item in value]
# Handle dicts - recursively serialize values
if isinstance(value, dict):
return {k: self._serialize_value(v) for k, v in value.items()}
# Handle SerializationMixin (like ChatMessage) - call to_dict()
if hasattr(value, "to_dict") and callable(getattr(value, "to_dict", None)):
try:
return value.to_dict() # type: ignore[attr-defined, no-any-return]
except Exception as e:
logger.debug(f"Failed to serialize with to_dict(): {e}")
return str(value)
# Handle Pydantic models - call model_dump()
if hasattr(value, "model_dump") and callable(getattr(value, "model_dump", None)):
try:
return value.model_dump() # type: ignore[attr-defined, no-any-return]
except Exception as e:
logger.debug(f"Failed to serialize Pydantic model: {e}")
return str(value)
# Handle dataclasses - recursively serialize with asdict
if is_dataclass(value) and not isinstance(value, type):
try:
from dataclasses import asdict
# Use our custom serializer as dict_factory
return asdict(value, dict_factory=lambda items: {k: self._serialize_value(v) for k, v in items})
except Exception as e:
logger.debug(f"Failed to serialize nested dataclass: {e}")
return str(value)
# Fallback: convert to string (for unknown types)
logger.debug(f"Serializing unknown type {type(value).__name__} as string")
return str(value)
def _serialize_request_data(self, request_data: Any) -> dict[str, Any]:
"""Serialize RequestInfoMessage to dict for JSON transmission.
Handles nested SerializationMixin objects (like ChatMessage) within dataclasses.
Args:
request_data: The RequestInfoMessage instance
Returns:
Serialized dict representation
"""
from dataclasses import asdict, fields, is_dataclass
if request_data is None:
return {}
# Handle dict first (most common)
if isinstance(request_data, dict):
return {k: self._serialize_value(v) for k, v in request_data.items()}
# Handle dataclasses with nested SerializationMixin objects
# We can't use asdict() directly because it doesn't handle ChatMessage
if is_dataclass(request_data) and not isinstance(request_data, type):
try:
# Manually serialize each field to handle nested SerializationMixin
result = {}
for field in fields(request_data):
field_value = getattr(request_data, field.name)
result[field.name] = self._serialize_value(field_value)
return result
except Exception as e:
logger.debug(f"Failed to serialize dataclass fields: {e}")
# Fallback to asdict() if our custom serialization fails
try:
return asdict(request_data) # type: ignore[arg-type]
except Exception as e2:
logger.debug(f"Failed to serialize dataclass with asdict(): {e2}")
# Handle Pydantic models (have model_dump method)
if hasattr(request_data, "model_dump") and callable(getattr(request_data, "model_dump", None)):
try:
return request_data.model_dump() # type: ignore[attr-defined, no-any-return]
except Exception as e:
logger.debug(f"Failed to serialize Pydantic model: {e}")
# Handle SerializationMixin (have to_dict method)
if hasattr(request_data, "to_dict") and callable(getattr(request_data, "to_dict", None)):
try:
return request_data.to_dict() # type: ignore[attr-defined, no-any-return]
except Exception as e:
logger.debug(f"Failed to serialize with to_dict(): {e}")
# Fallback: string representation
return {"raw": str(request_data)}
async def _convert_agent_update(self, update: Any, context: dict[str, Any]) -> Sequence[Any]:
"""Convert agent text updates to proper content part events.
@@ -639,9 +495,8 @@ class MessageMapper:
from .models._openai_custom import AgentCompletedEvent, AgentFailedEvent, AgentStartedEvent
try:
# Get model name from request or use 'devui' as default
request_obj = context.get("request")
model_name = request_obj.model if request_obj and request_obj.model else "devui"
# Get model name from context (the agent name)
model_name = context.get("request", {}).model if context.get("request") else "agent"
if isinstance(event, AgentStartedEvent):
execution_id = f"agent_{uuid4().hex[:12]}"
@@ -748,16 +603,16 @@ class MessageMapper:
# Return proper OpenAI event objects
events: list[Any] = []
# Get model name from request or use 'devui' as default
request_obj = context.get("request")
model_name = request_obj.model if request_obj and request_obj.model else "devui"
# Determine the model name - use request model or default to "workflow"
# The request model will be the agent name for agents, workflow name for workflows
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create a full Response object with all required fields
response_obj = Response(
id=f"resp_{workflow_id}",
object="response",
created_at=float(time.time()),
model=model_name,
model=model_name, # Use the actual model/agent name
output=[], # Empty output list initially
status="in_progress",
# Required fields with safe defaults
@@ -782,73 +637,14 @@ class MessageMapper:
return events
# Handle WorkflowOutputEvent separately to preserve output data
if event_class == "WorkflowOutputEvent":
output_data = getattr(event, "data", None)
source_executor_id = getattr(event, "source_executor_id", "unknown")
if output_data is not None:
# Import required types
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
# Increment output index for each yield_output
context["output_index"] = context.get("output_index", -1) + 1
# Extract text from output data based on type
text = None
if hasattr(output_data, "__class__") and output_data.__class__.__name__ == "ChatMessage":
# Handle ChatMessage (from Magentic and AgentExecutor with output_response=True)
text = getattr(output_data, "text", None)
if not text:
# Fallback to string representation
text = str(output_data)
elif isinstance(output_data, str):
# String output
text = output_data
else:
# Object/dict/list → JSON string
try:
text = json.dumps(output_data, indent=2)
except (TypeError, ValueError):
# Fallback to string representation if not JSON serializable
text = str(output_data)
# Create output message with text content
text_content = ResponseOutputText(type="output_text", text=text, annotations=[])
output_message = ResponseOutputMessage(
type="message",
id=f"msg_{uuid4().hex[:8]}",
role="assistant",
content=[text_content],
status="completed",
)
# Emit output_item.added for each yield_output
logger.debug(
f"WorkflowOutputEvent converted to output_item.added "
f"(executor: {source_executor_id}, length: {len(text)})"
)
return [
ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=output_message,
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
]
# Handle WorkflowCompletedEvent - emit response.completed
if event_class == "WorkflowCompletedEvent":
if event_class in ["WorkflowCompletedEvent", "WorkflowOutputEvent"]:
workflow_id = context.get("workflow_id", str(uuid4()))
# Import Response type for proper construction
from openai.types.responses import Response
# Get model name from request or use 'devui' as default
request_obj = context.get("request")
model_name = request_obj.model if request_obj and request_obj.model else "devui"
# Get model name from context
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create a full Response object for completed state
response_obj = Response(
@@ -856,7 +652,7 @@ class MessageMapper:
object="response",
created_at=float(time.time()),
model=model_name,
output=[], # Output items already sent via output_item.added events
output=[], # Output should be populated by this point from text streaming
status="completed",
parallel_tool_calls=False,
tool_choice="none",
@@ -876,9 +672,8 @@ class MessageMapper:
# Import Response and ResponseError types
from openai.types.responses import Response, ResponseError
# Get model name from request or use 'devui' as default
request_obj = context.get("request")
model_name = request_obj.model if request_obj and request_obj.model else "devui"
# Get model name from context
model_name = context.get("request", {}).model if context.get("request") else "workflow"
# Create error object
error_message = str(error_info) if error_info else "Unknown error"
@@ -983,77 +778,8 @@ class MessageMapper:
)
]
# Handle RequestInfoEvent specially - emit as HIL event with schema
if event_class == "RequestInfoEvent":
from .models._openai_custom import ResponseRequestInfoEvent
request_id = getattr(event, "request_id", "")
source_executor_id = getattr(event, "source_executor_id", "")
request_type_class = getattr(event, "request_type", None)
request_data = getattr(event, "data", None)
logger.info("📨 [MAPPER] Processing RequestInfoEvent")
logger.info(f" request_id: {request_id}")
logger.info(f" source_executor_id: {source_executor_id}")
logger.info(f" request_type_class: {request_type_class}")
logger.info(f" request_data: {request_data}")
# Serialize request data
serialized_data = self._serialize_request_data(request_data)
logger.info(f" serialized_data: {serialized_data}")
# Get request type name for debugging
request_type_name = "Unknown"
if request_type_class:
request_type_name = f"{request_type_class.__module__}:{request_type_class.__name__}"
# Get response schema that was attached by executor
# This tells the UI what format to collect from the user
response_schema = getattr(event, "_response_schema", None)
if not response_schema:
# Fallback to string if somehow not set (shouldn't happen with current executor enrichment)
logger.warning(f"⚠️ Response schema not found for {request_type_name}, using default")
response_schema = {"type": "string"}
else:
logger.info(f" response_schema: {response_schema}")
# Wrap primitive schemas in object for form rendering
# The UI's SchemaFormRenderer expects an object with properties
if response_schema.get("type") in ["string", "integer", "number", "boolean"]:
# Wrap primitive type in object with "response" field
wrapped_schema = {
"type": "object",
"properties": {"response": response_schema},
"required": ["response"],
}
logger.info(" wrapped primitive schema in object")
else:
wrapped_schema = response_schema
# Create HIL request event with response schema
hil_event = ResponseRequestInfoEvent(
type="response.request_info.requested",
request_id=request_id,
source_executor_id=source_executor_id,
request_type=request_type_name,
request_data=serialized_data,
request_schema=wrapped_schema, # Send wrapped schema for form rendering
response_schema=response_schema, # Keep original for reference
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
timestamp=datetime.now().isoformat(),
)
logger.info("✅ [MAPPER] Created ResponseRequestInfoEvent:")
logger.info(f" type: {hil_event.type}")
logger.info(f" request_id: {hil_event.request_id}")
logger.info(f" sequence_number: {hil_event.sequence_number}")
return [hil_event]
# Handle other informational workflow events (status, warnings, errors)
if event_class in ["WorkflowStatusEvent", "WorkflowWarningEvent", "WorkflowErrorEvent"]:
# Handle informational workflow events (status, warnings, errors)
if event_class in ["WorkflowStatusEvent", "WorkflowWarningEvent", "WorkflowErrorEvent", "RequestInfoEvent"]:
# These are informational events that don't map to OpenAI lifecycle events
# Convert them to trace events for debugging visibility
event_data: dict[str, Any] = {}
@@ -1066,10 +792,13 @@ class MessageMapper:
elif event_class == "WorkflowErrorEvent":
event_data["message"] = str(getattr(event, "message", ""))
event_data["error"] = str(getattr(event, "error", ""))
elif event_class == "RequestInfoEvent":
request_info = getattr(event, "data", {})
event_data["request_info"] = request_info if isinstance(request_info, dict) else str(request_info)
# Create a trace event for debugging
trace_event = ResponseTraceEventComplete(
type="response.trace.completed",
type="response.trace.complete",
data={
"trace_type": "workflow_info",
"event_type": event_class,
@@ -1084,237 +813,6 @@ class MessageMapper:
return [trace_event]
# Handle Magentic-specific events
if event_class == "MagenticAgentDeltaEvent":
agent_id = getattr(event, "agent_id", "unknown_agent")
text = getattr(event, "text", None)
if text:
events = []
# Track Magentic agent messages separately from regular messages
# Use timestamp to ensure uniqueness for multiple runs of same agent
magentic_key = f"magentic_message_{agent_id}"
# Check if this is the first delta from this agent (need to create message container)
if magentic_key not in context:
# Create a unique message ID for this agent's streaming session
message_id = f"msg_{agent_id}_{uuid4().hex[:8]}"
context[magentic_key] = message_id
context["output_index"] = context.get("output_index", -1) + 1
# Import required types
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_content_part_added_event import (
ResponseContentPartAddedEvent,
)
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
# Emit message output item (container for the agent's message)
# This matches what _convert_agent_update does for regular agents
events.append(
ResponseOutputItemAddedEvent(
type="response.output_item.added",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message",
id=message_id,
role="assistant",
content=[],
status="in_progress",
# Add metadata to identify this as a Magentic agent message
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
),
)
)
# Add content part for text (establishes the text container)
events.append(
ResponseContentPartAddedEvent(
type="response.content_part.added",
output_index=context["output_index"],
content_index=0,
item_id=message_id,
sequence_number=self._next_sequence(context),
part=ResponseOutputText(type="output_text", text="", annotations=[]),
)
)
# Get the message ID for this agent
message_id = context[magentic_key]
# Emit text delta event using the message ID (matches regular agent behavior)
events.append(
ResponseTextDeltaEvent(
type="response.output_text.delta",
output_index=context["output_index"],
content_index=0, # Always 0 for single text content
item_id=message_id,
delta=text,
logprobs=[],
sequence_number=self._next_sequence(context),
)
)
return events
# Handle function calls from Magentic agents
if getattr(event, "function_call_id", None) and getattr(event, "function_call_name", None):
# Handle function call initiation
function_call_id = getattr(event, "function_call_id", None)
function_call_name = getattr(event, "function_call_name", None)
function_call_arguments = getattr(event, "function_call_arguments", None)
# Track function call for accumulating arguments
context["active_function_calls"][function_call_id] = {
"item_id": function_call_id,
"name": function_call_name,
"arguments_chunks": [],
}
# Emit function call output item
return [
ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseFunctionToolCall(
id=function_call_id,
call_id=function_call_id,
name=function_call_name,
arguments=json.dumps(function_call_arguments) if function_call_arguments else "",
type="function_call",
status="in_progress",
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
]
# For other non-text deltas, emit as trace for debugging
return [
ResponseTraceEventComplete(
type="response.trace.completed",
data={
"trace_type": "magentic_delta",
"agent_id": agent_id,
"function_call_id": getattr(event, "function_call_id", None),
"function_call_name": getattr(event, "function_call_name", None),
"function_result_id": getattr(event, "function_result_id", None),
"timestamp": datetime.now().isoformat(),
},
span_id=f"magentic_delta_{uuid4().hex[:8]}",
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
)
]
if event_class == "MagenticAgentMessageEvent":
agent_id = getattr(event, "agent_id", "unknown_agent")
message = getattr(event, "message", None)
# Track Magentic agent messages
magentic_key = f"magentic_message_{agent_id}"
# Check if we were streaming for this agent
if magentic_key in context:
# Mark the streaming message as complete
message_id = context[magentic_key]
# Import required types
from openai.types.responses import ResponseOutputMessage
from openai.types.responses.response_output_item_done_event import ResponseOutputItemDoneEvent
# Extract text from ChatMessage for the completed message
text = None
if message and hasattr(message, "text"):
text = message.text
# Emit output_item.done to mark message as complete
events = [
ResponseOutputItemDoneEvent(
type="response.output_item.done",
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
item=ResponseOutputMessage(
type="message",
id=message_id,
role="assistant",
content=[], # Content already streamed via deltas
status="completed",
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
),
)
]
# Clean up context for this agent
del context[magentic_key]
logger.debug(f"MagenticAgentMessageEvent from {agent_id} marked streaming message as complete")
return events
# No streaming occurred, create a complete message (shouldn't happen normally)
# Extract text from ChatMessage
text = None
if message and hasattr(message, "text"):
text = message.text
if text:
# Emit as output item for this agent
from openai.types.responses import ResponseOutputMessage, ResponseOutputText
from openai.types.responses.response_output_item_added_event import ResponseOutputItemAddedEvent
context["output_index"] = context.get("output_index", -1) + 1
text_content = ResponseOutputText(type="output_text", text=text, annotations=[])
output_message = ResponseOutputMessage(
type="message",
id=f"msg_{agent_id}_{uuid4().hex[:8]}",
role="assistant",
content=[text_content],
status="completed",
metadata={"agent_id": agent_id, "source": "magentic"}, # type: ignore[call-arg]
)
logger.debug(
f"MagenticAgentMessageEvent from {agent_id} converted to output_item.added (non-streaming)"
)
return [
ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=output_message,
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
]
if event_class == "MagenticOrchestratorMessageEvent":
orchestrator_id = getattr(event, "orchestrator_id", "orchestrator")
message = getattr(event, "message", None)
kind = getattr(event, "kind", "unknown")
# Extract text from ChatMessage
text = None
if message and hasattr(message, "text"):
text = message.text
# Emit as trace event for orchestrator messages (typically task ledger, instructions)
return [
ResponseTraceEventComplete(
type="response.trace.completed",
data={
"trace_type": "magentic_orchestrator",
"orchestrator_id": orchestrator_id,
"kind": kind,
"text": text or str(message),
"timestamp": datetime.now().isoformat(),
},
span_id=f"magentic_orch_{uuid4().hex[:8]}",
item_id=context["item_id"],
output_index=context.get("output_index", 0),
sequence_number=self._next_sequence(context),
)
]
# For unknown/legacy events, still emit as workflow event for backward compatibility
# Get event data and serialize if it's a SerializationMixin
raw_event_data = getattr(event, "data", None)
@@ -1329,7 +827,7 @@ class MessageMapper:
# Create structured workflow event (keeping for backward compatibility)
workflow_event = ResponseWorkflowEventComplete(
type="response.workflow_event.completed",
type="response.workflow_event.complete",
data={
"event_type": event.__class__.__name__,
"data": serialized_event_data,
@@ -1555,227 +1053,30 @@ class MessageMapper:
# NO EVENT RETURNED - usage goes in final Response only
return
async def _map_data_content(
self, content: Any, context: dict[str, Any]
) -> ResponseOutputItemAddedEvent | ResponseTraceEventComplete:
"""Map DataContent to proper output item (image/file/data) or fallback to trace.
Maps Agent Framework DataContent to appropriate output types:
- Images (image/*) → ResponseOutputImage
- Common files (pdf, audio, video) → ResponseOutputFile
- Generic data → ResponseOutputData
- Unknown/debugging content → ResponseTraceEventComplete (fallback)
"""
mime_type = getattr(content, "mime_type", "application/octet-stream")
item_id = f"item_{uuid.uuid4().hex[:16]}"
# Extract data/uri
data_value = getattr(content, "data", None)
uri_value = getattr(content, "uri", None)
# Handle images
if mime_type.startswith("image/"):
# Prefer URI, but create data URI from data if needed
if uri_value:
image_url = uri_value
elif data_value:
# Convert bytes to base64 data URI
import base64
if isinstance(data_value, bytes):
b64_data = base64.b64encode(data_value).decode("utf-8")
else:
b64_data = str(data_value)
image_url = f"data:{mime_type};base64,{b64_data}"
else:
# No data available, fallback to trace
logger.warning(f"DataContent with {mime_type} has no data or uri, falling back to trace")
return ResponseTraceEventComplete(
type="response.trace.completed",
data={"content_type": "data", "mime_type": mime_type, "error": "No data or uri"},
item_id=context["item_id"],
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
return ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseOutputImage( # type: ignore[arg-type]
id=item_id,
type="output_image",
image_url=image_url,
mime_type=mime_type,
alt_text=None,
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
# Handle common file types
if mime_type in [
"application/pdf",
"audio/mp3",
"audio/wav",
"audio/m4a",
"audio/ogg",
"audio/flac",
"audio/aac",
"audio/mpeg",
"video/mp4",
"video/webm",
]:
# Determine filename from mime type
ext = mime_type.split("/")[-1]
if ext == "mpeg":
ext = "mp3" # audio/mpeg → .mp3
filename = f"output.{ext}"
# Prefer URI
if uri_value:
file_url = uri_value
file_data = None
elif data_value:
# Convert bytes to base64
import base64
if isinstance(data_value, bytes):
b64_data = base64.b64encode(data_value).decode("utf-8")
else:
b64_data = str(data_value)
file_url = f"data:{mime_type};base64,{b64_data}"
file_data = b64_data
else:
# No data available, fallback to trace
logger.warning(f"DataContent with {mime_type} has no data or uri, falling back to trace")
return ResponseTraceEventComplete(
type="response.trace.completed",
data={"content_type": "data", "mime_type": mime_type, "error": "No data or uri"},
item_id=context["item_id"],
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
return ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseOutputFile( # type: ignore[arg-type]
id=item_id,
type="output_file",
filename=filename,
file_url=file_url,
file_data=file_data,
mime_type=mime_type,
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
# Handle generic data (structured data, JSON, etc.)
data_str = ""
if uri_value:
data_str = uri_value
elif data_value:
if isinstance(data_value, bytes):
try:
data_str = data_value.decode("utf-8")
except UnicodeDecodeError:
# Binary data, encode as base64 for display
import base64
data_str = base64.b64encode(data_value).decode("utf-8")
else:
data_str = str(data_value)
return ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseOutputData( # type: ignore[arg-type]
id=item_id,
type="output_data",
data=data_str,
mime_type=mime_type,
description=None,
),
async def _map_data_content(self, content: Any, context: dict[str, Any]) -> ResponseTraceEventComplete:
"""Map DataContent to structured trace event."""
return ResponseTraceEventComplete(
type="response.trace.complete",
data={
"content_type": "data",
"data": getattr(content, "data", None),
"mime_type": getattr(content, "mime_type", "application/octet-stream"),
"size_bytes": len(str(getattr(content, "data", ""))) if getattr(content, "data", None) else 0,
"timestamp": datetime.now().isoformat(),
},
item_id=context["item_id"],
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
async def _map_uri_content(
self, content: Any, context: dict[str, Any]
) -> ResponseOutputItemAddedEvent | ResponseTraceEventComplete:
"""Map UriContent to proper output item (image/file) based on MIME type.
UriContent has a URI and MIME type, so we can create appropriate output items:
- Images → ResponseOutputImage
- Common files → ResponseOutputFile
- Other URIs → ResponseTraceEventComplete (fallback for debugging)
"""
mime_type = getattr(content, "mime_type", "text/plain")
uri = getattr(content, "uri", "")
item_id = f"item_{uuid.uuid4().hex[:16]}"
if not uri:
# No URI available, fallback to trace
logger.warning("UriContent has no uri, falling back to trace")
return ResponseTraceEventComplete(
type="response.trace.completed",
data={"content_type": "uri", "mime_type": mime_type, "error": "No uri"},
item_id=context["item_id"],
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
# Handle images
if mime_type.startswith("image/"):
return ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseOutputImage( # type: ignore[arg-type]
id=item_id,
type="output_image",
image_url=uri,
mime_type=mime_type,
alt_text=None,
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
# Handle common file types
if mime_type in [
"application/pdf",
"audio/mp3",
"audio/wav",
"audio/m4a",
"audio/ogg",
"audio/flac",
"audio/aac",
"audio/mpeg",
"video/mp4",
"video/webm",
]:
# Extract filename from URI or use generic name
filename = uri.split("/")[-1] if "/" in uri else f"output.{mime_type.split('/')[-1]}"
return ResponseOutputItemAddedEvent(
type="response.output_item.added",
item=ResponseOutputFile( # type: ignore[arg-type]
id=item_id,
type="output_file",
filename=filename,
file_url=uri,
file_data=None,
mime_type=mime_type,
),
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
# For other URI types (text/plain, application/json, etc.), use trace for now
logger.debug(f"UriContent with unsupported MIME type {mime_type}, using trace event")
async def _map_uri_content(self, content: Any, context: dict[str, Any]) -> ResponseTraceEventComplete:
"""Map UriContent to structured trace event."""
return ResponseTraceEventComplete(
type="response.trace.completed",
type="response.trace.complete",
data={
"content_type": "uri",
"uri": uri,
"mime_type": mime_type,
"uri": getattr(content, "uri", ""),
"mime_type": getattr(content, "mime_type", "text/plain"),
"timestamp": datetime.now().isoformat(),
},
item_id=context["item_id"],
@@ -1784,15 +1085,9 @@ class MessageMapper:
)
async def _map_hosted_file_content(self, content: Any, context: dict[str, Any]) -> ResponseTraceEventComplete:
"""Map HostedFileContent to trace event.
HostedFileContent references external file IDs (like OpenAI file IDs).
These remain as traces since they're metadata about hosted resources,
not direct content to display. To display them, agents should return
DataContent or UriContent with the actual file data/URL.
"""
"""Map HostedFileContent to structured trace event."""
return ResponseTraceEventComplete(
type="response.trace.completed",
type="response.trace.complete",
data={
"content_type": "hosted_file",
"file_id": getattr(content, "file_id", "unknown"),
@@ -1806,14 +1101,9 @@ class MessageMapper:
async def _map_hosted_vector_store_content(
self, content: Any, context: dict[str, Any]
) -> ResponseTraceEventComplete:
"""Map HostedVectorStoreContent to trace event.
HostedVectorStoreContent references external vector store IDs.
These remain as traces since they're metadata about hosted resources,
not direct content to display.
"""
"""Map HostedVectorStoreContent to structured trace event."""
return ResponseTraceEventComplete(
type="response.trace.completed",
type="response.trace.complete",
data={
"content_type": "hosted_vector_store",
"vector_store_id": getattr(content, "vector_store_id", "unknown"),
@@ -1918,7 +1208,7 @@ class MessageMapper:
id=f"resp_{uuid.uuid4().hex[:12]}",
object="response",
created_at=datetime.now().timestamp(),
model=request.model or "devui",
model=request.model,
output=[response_output_message],
usage=usage,
parallel_tool_calls=False,
@@ -1,9 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
"""OpenAI integration for DevUI - proxy support for OpenAI Responses API."""
from ._executor import OpenAIExecutor
__all__ = [
"OpenAIExecutor",
]
@@ -1,270 +0,0 @@
# Copyright (c) Microsoft. All rights reserved.
"""OpenAI Executor - proxies requests to OpenAI Responses API.
This executor mirrors the AgentFrameworkExecutor interface but routes
requests to OpenAI's API instead of executing local entities.
"""
import logging
import os
from collections.abc import AsyncGenerator
from typing import Any
from openai import APIStatusError, AsyncOpenAI, AsyncStream, AuthenticationError, PermissionDeniedError, RateLimitError
from openai.types.responses import Response, ResponseStreamEvent
from .._conversations import ConversationStore
from ..models import AgentFrameworkRequest, OpenAIResponse
logger = logging.getLogger(__name__)
class OpenAIExecutor:
"""Executor for OpenAI Responses API - mirrors AgentFrameworkExecutor interface.
This executor provides the same interface as AgentFrameworkExecutor but proxies
requests to OpenAI's Responses API instead of executing local entities.
Key features:
- Same execute_streaming() and execute_sync() interface
- Shares ConversationStore with local executor
- Configured via OPENAI_API_KEY environment variable
- Supports all OpenAI Responses API parameters
"""
def __init__(self, conversation_store: ConversationStore):
"""Initialize OpenAI executor.
Args:
conversation_store: Shared conversation store (works for both local and OpenAI)
"""
self.conversation_store = conversation_store
# Load configuration from environment
self.api_key = os.getenv("OPENAI_API_KEY")
self.base_url = os.getenv("OPENAI_BASE_URL", "https://api.openai.com/v1")
self._client: AsyncOpenAI | None = None
@property
def is_configured(self) -> bool:
"""Check if OpenAI executor is properly configured.
Returns:
True if OPENAI_API_KEY is set
"""
return self.api_key is not None
def _get_client(self) -> AsyncOpenAI:
"""Get or create OpenAI async client.
Returns:
AsyncOpenAI client instance
Raises:
ValueError: If OPENAI_API_KEY not configured
"""
if self._client is None:
if not self.api_key:
raise ValueError("OPENAI_API_KEY environment variable not set")
self._client = AsyncOpenAI(
api_key=self.api_key,
base_url=self.base_url,
)
logger.debug(f"Created OpenAI client with base_url: {self.base_url}")
return self._client
async def execute_streaming(self, request: AgentFrameworkRequest) -> AsyncGenerator[Any, None]:
"""Execute request via OpenAI and stream results in OpenAI format.
This mirrors AgentFrameworkExecutor.execute_streaming() interface.
Args:
request: Request to execute
Yields:
OpenAI ResponseStreamEvent objects (already in correct format!)
"""
if not self.is_configured:
logger.error("OpenAI executor not configured (missing OPENAI_API_KEY)")
# Emit proper response.failed event
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": "OpenAI not configured on server. Set OPENAI_API_KEY environment variable.",
"type": "configuration_error",
"code": "openai_not_configured",
},
},
}
return
try:
client = self._get_client()
# Convert AgentFrameworkRequest to OpenAI params
params = request.to_openai_params()
# Remove DevUI-specific fields that OpenAI doesn't recognize
params.pop("extra_body", None)
# Conversation ID is now from OpenAI (created via /v1/conversations proxy)
# so we can pass it through!
# Force streaming mode (remove if already present to avoid duplicate)
params.pop("stream", None)
logger.info(f"🔀 Proxying to OpenAI Responses API: model={params.get('model')}")
logger.debug(f"Request params: {params}")
# Call OpenAI Responses API - returns AsyncStream[ResponseStreamEvent]
stream: AsyncStream[ResponseStreamEvent] = await client.responses.create(
**params,
stream=True, # Force streaming
)
# Yield events directly - they're already ResponseStreamEvent objects!
# No conversion needed - OpenAI SDK returns proper typed objects
async for event in stream:
yield event
except AuthenticationError as e:
# 401 - Invalid API key or authentication issue
logger.error(f"OpenAI authentication error: {e}", exc_info=True)
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": error_data.get("message", str(e)),
"type": error_data.get("type", "authentication_error"),
"code": error_data.get("code", "invalid_api_key"),
},
},
}
except PermissionDeniedError as e:
# 403 - Permission denied
logger.error(f"OpenAI permission denied: {e}", exc_info=True)
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": error_data.get("message", str(e)),
"type": error_data.get("type", "permission_denied"),
"code": error_data.get("code", "insufficient_permissions"),
},
},
}
except RateLimitError as e:
# 429 - Rate limit exceeded
logger.error(f"OpenAI rate limit exceeded: {e}", exc_info=True)
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": error_data.get("message", str(e)),
"type": error_data.get("type", "rate_limit_error"),
"code": error_data.get("code", "rate_limit_exceeded"),
},
},
}
except APIStatusError as e:
# Other OpenAI API errors
logger.error(f"OpenAI API error: {e}", exc_info=True)
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": error_data.get("message", str(e)),
"type": error_data.get("type", "api_error"),
"code": error_data.get("code", "unknown_error"),
},
},
}
except Exception as e:
# Catch-all for unexpected errors
logger.error(f"Unexpected error in OpenAI proxy: {e}", exc_info=True)
yield {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": f"Unexpected error: {e!s}",
"type": "internal_error",
"code": "unexpected_error",
},
},
}
async def execute_sync(self, request: AgentFrameworkRequest) -> OpenAIResponse:
"""Execute request via OpenAI and return complete response.
This mirrors AgentFrameworkExecutor.execute_sync() interface.
Args:
request: Request to execute
Returns:
Final OpenAI Response object
Raises:
ValueError: If OpenAI not configured
Exception: If OpenAI API call fails
"""
if not self.is_configured:
raise ValueError("OpenAI not configured on server. Set OPENAI_API_KEY environment variable.")
try:
client = self._get_client()
# Convert AgentFrameworkRequest to OpenAI params
params = request.to_openai_params()
# Remove DevUI-specific fields
params.pop("extra_body", None)
# Force non-streaming mode (remove if already present to avoid duplicate)
params.pop("stream", None)
logger.info(f"🔀 Proxying to OpenAI Responses API (non-streaming): model={params.get('model')}")
logger.debug(f"Request params: {params}")
# Call OpenAI Responses API - returns Response object
response: Response = await client.responses.create(
**params,
stream=False, # Force non-streaming
)
return response
except Exception as e:
logger.error(f"OpenAI proxy error: {e}", exc_info=True)
raise
async def close(self) -> None:
"""Close the OpenAI client and release resources."""
if self._client:
await self._client.close()
self._client = None
logger.debug("Closed OpenAI client")
@@ -5,9 +5,7 @@
import inspect
import json
import logging
import os
import secrets
from collections.abc import AsyncGenerator, Awaitable, Callable
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any
@@ -16,20 +14,15 @@ from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.staticfiles import StaticFiles
from ._deployment import DeploymentManager
from ._discovery import EntityDiscovery
from ._executor import AgentFrameworkExecutor
from ._mapper import MessageMapper
from ._openai import OpenAIExecutor
from .models import AgentFrameworkRequest, MetaResponse, OpenAIError
from .models._discovery_models import Deployment, DeploymentConfig, DiscoveryResponse, EntityInfo
from .models import AgentFrameworkRequest, OpenAIError
from .models._discovery_models import DiscoveryResponse, EntityInfo
logger = logging.getLogger(__name__)
# No AuthMiddleware class needed - we'll use the decorator pattern instead
class DevServer:
"""Development Server - OpenAI compatible API server for debugging agents."""
@@ -40,7 +33,6 @@ class DevServer:
host: str = "127.0.0.1",
cors_origins: list[str] | None = None,
ui_enabled: bool = True,
mode: str = "developer",
) -> None:
"""Initialize the development server.
@@ -50,79 +42,16 @@ class DevServer:
host: Host to bind server to
cors_origins: List of allowed CORS origins
ui_enabled: Whether to enable the UI
mode: Server mode - 'developer' (full access, verbose errors) or 'user' (restricted APIs, generic errors)
"""
self.entities_dir = entities_dir
self.port = port
self.host = host
# Smart CORS defaults: permissive for localhost, restrictive for network-exposed deployments
if cors_origins is None:
# Localhost development: allow cross-origin for dev tools (e.g., frontend dev server)
# Network-exposed: empty list (same-origin only, no CORS)
cors_origins = ["*"] if host in ("127.0.0.1", "localhost") else []
self.cors_origins = cors_origins
self.cors_origins = cors_origins or ["*"]
self.ui_enabled = ui_enabled
self.mode = mode
self.executor: AgentFrameworkExecutor | None = None
self.openai_executor: OpenAIExecutor | None = None
self.deployment_manager = DeploymentManager()
self._app: FastAPI | None = None
self._pending_entities: list[Any] | None = None
def _is_dev_mode(self) -> bool:
"""Check if running in developer mode.
Returns:
True if in developer mode, False if in user mode
"""
return self.mode == "developer"
def _format_error(self, error: Exception, context: str = "Operation") -> str:
"""Format error message based on server mode.
In developer mode: Returns detailed error message for debugging.
In user mode: Returns generic message and logs details internally.
Args:
error: The exception that occurred
context: Description of the operation that failed (e.g., "Request execution")
Returns:
Formatted error message appropriate for the current mode
"""
if self._is_dev_mode():
# Developer mode: Show full error details for debugging
return f"{context} failed: {error!s}"
# User mode: Generic message to user, detailed logging internally
logger.error(f"{context} failed: {error}", exc_info=True)
return f"{context} failed"
def _require_developer_mode(self, feature: str = "operation") -> None:
"""Check if current mode allows developer operations.
Args:
feature: Name of the feature being accessed (for error message)
Raises:
HTTPException: If in user mode
"""
if self.mode == "user":
logger.warning(f"Blocked {feature} access in user mode")
raise HTTPException(
status_code=403,
detail={
"error": {
"message": f"Access denied: {feature} requires developer mode",
"type": "permission_denied",
"code": "developer_mode_required",
"current_mode": self.mode,
}
},
)
async def _ensure_executor(self) -> AgentFrameworkExecutor:
"""Ensure executor is initialized."""
if self.executor is None:
@@ -155,29 +84,6 @@ class DevServer:
return self.executor
async def _ensure_openai_executor(self) -> OpenAIExecutor:
"""Ensure OpenAI executor is initialized.
Returns:
OpenAI executor instance
Raises:
ValueError: If OpenAI executor cannot be initialized
"""
if self.openai_executor is None:
# Initialize local executor first to get conversation_store
local_executor = await self._ensure_executor()
# Create OpenAI executor with shared conversation store
self.openai_executor = OpenAIExecutor(local_executor.conversation_store)
if self.openai_executor.is_configured:
logger.info("OpenAI proxy mode available (OPENAI_API_KEY configured)")
else:
logger.info("OpenAI proxy mode disabled (OPENAI_API_KEY not set)")
return self.openai_executor
async def _cleanup_entities(self) -> None:
"""Cleanup entity resources (close clients, MCP tools, credentials, etc.)."""
if not self.executor:
@@ -188,28 +94,12 @@ class DevServer:
closed_count = 0
mcp_tools_closed = 0
credentials_closed = 0
hook_count = 0
for entity_info in entities:
entity_id = entity_info.id
try:
# Step 1: Execute registered cleanup hooks (NEW)
cleanup_hooks = self.executor.entity_discovery.get_cleanup_hooks(entity_id)
for hook in cleanup_hooks:
try:
if inspect.iscoroutinefunction(hook):
await hook()
else:
hook()
hook_count += 1
logger.debug(f"✓ Executed cleanup hook for: {entity_id}")
except Exception as e:
logger.warning(f"⚠ Cleanup hook failed for {entity_id}: {e}")
# Step 2: Close chat clients and their credentials (EXISTING)
entity_obj = self.executor.entity_discovery.get_entity_object(entity_id)
entity_obj = self.executor.entity_discovery.get_entity_object(entity_info.id)
# Close chat clients and their credentials
if entity_obj and hasattr(entity_obj, "chat_client"):
client = entity_obj.chat_client
@@ -254,24 +144,14 @@ class DevServer:
logger.warning(f"Error closing MCP tool for {entity_info.id}: {e}")
except Exception as e:
logger.warning(f"Error cleaning up entity {entity_id}: {e}")
logger.warning(f"Error closing entity {entity_info.id}: {e}")
if hook_count > 0:
logger.info(f"✓ Executed {hook_count} cleanup hook(s)")
if closed_count > 0:
logger.info(f"Closed {closed_count} entity client(s)")
logger.info(f"Closed {closed_count} entity client(s)")
if credentials_closed > 0:
logger.info(f"Closed {credentials_closed} credential(s)")
logger.info(f"Closed {credentials_closed} credential(s)")
if mcp_tools_closed > 0:
logger.info(f"Closed {mcp_tools_closed} MCP tool(s)")
# Close OpenAI executor if it exists
if self.openai_executor:
try:
await self.openai_executor.close()
logger.info("Closed OpenAI executor")
except Exception as e:
logger.warning(f"Error closing OpenAI executor: {e}")
logger.info(f"Closed {mcp_tools_closed} MCP tool(s)")
def create_app(self) -> FastAPI:
"""Create the FastAPI application."""
@@ -281,7 +161,6 @@ class DevServer:
# Startup
logger.info("Starting Agent Framework Server")
await self._ensure_executor()
await self._ensure_openai_executor() # Initialize OpenAI executor
yield
# Shutdown
logger.info("Shutting down Agent Framework Server")
@@ -298,74 +177,14 @@ class DevServer:
)
# Add CORS middleware
# Note: allow_credentials cannot be True when allow_origins is ["*"]
# For localhost dev with wildcard origins, credentials are disabled
# For network deployments with specific origins or empty list, credentials can be enabled
allow_credentials = self.cors_origins != ["*"]
app.add_middleware(
CORSMiddleware,
allow_origins=self.cors_origins,
allow_credentials=allow_credentials,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Add authentication middleware using decorator pattern
# Auth is enabled by presence of DEVUI_AUTH_TOKEN
auth_token = os.getenv("DEVUI_AUTH_TOKEN", "")
auth_required = bool(auth_token)
if auth_required:
logger.info("Authentication middleware enabled")
@app.middleware("http")
async def auth_middleware(request: Request, call_next: Callable[[Request], Awaitable[Any]]) -> Any:
"""Validate Bearer token authentication.
Skips authentication for health, meta, static UI endpoints, and OPTIONS requests.
"""
# Skip auth for OPTIONS (CORS preflight) requests
if request.method == "OPTIONS":
return await call_next(request)
# Skip auth for health checks, meta endpoint, and static files
if request.url.path in ["/health", "/meta", "/"] or request.url.path.startswith("/assets"):
return await call_next(request)
# Check Authorization header
auth_header = request.headers.get("Authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse(
status_code=401,
content={
"error": {
"message": (
"Missing or invalid Authorization header. Expected: Authorization: Bearer <token>"
),
"type": "authentication_error",
"code": "missing_token",
}
},
)
# Extract and validate token
token = auth_header.replace("Bearer ", "", 1).strip()
if not secrets.compare_digest(token, auth_token):
return JSONResponse(
status_code=401,
content={
"error": {
"message": "Invalid authentication token",
"type": "authentication_error",
"code": "invalid_token",
}
},
)
# Token valid, proceed
return await call_next(request)
self._register_routes(app)
self._mount_ui(app)
@@ -383,29 +202,6 @@ class DevServer:
return {"status": "healthy", "entities_count": len(entities), "framework": "agent_framework"}
@app.get("/meta", response_model=MetaResponse)
async def get_meta() -> MetaResponse:
"""Get server metadata and configuration."""
import os
from . import __version__
# Ensure executors are initialized to check capabilities
openai_executor = await self._ensure_openai_executor()
return MetaResponse(
ui_mode=self.mode, # type: ignore[arg-type]
version=__version__,
framework="agent_framework",
runtime="python", # Python DevUI backend
capabilities={
"tracing": os.getenv("ENABLE_OTEL") == "true",
"openai_proxy": openai_executor.is_configured,
"deployment": True, # Deployment feature is available
},
auth_required=bool(os.getenv("DEVUI_AUTH_TOKEN")),
)
@app.get("/v1/entities", response_model=DiscoveryResponse)
async def discover_entities() -> DiscoveryResponse:
"""List all registered entities."""
@@ -430,10 +226,7 @@ class DevServer:
# Trigger lazy loading if entity not yet loaded
# This will import the module and enrich metadata
# Pass checkpoint_manager to ensure workflows get checkpoint storage injected
entity_obj = await executor.entity_discovery.load_entity(
entity_id, checkpoint_manager=executor.checkpoint_manager
)
entity_obj = await executor.entity_discovery.load_entity(entity_id)
# Get updated entity info (may have been enriched during load)
entity_info = executor.get_entity_info(entity_id) or entity_info
@@ -512,7 +305,6 @@ class DevServer:
executor_list = [getattr(ex, "executor_id", str(ex)) for ex in entity_obj.executors]
# Create copy of entity info and populate workflow-specific fields
# Note: DevUI provides runtime checkpoint storage for ALL workflows via conversations
update_payload: dict[str, Any] = {
"workflow_dump": workflow_dump,
"input_schema": input_schema,
@@ -528,13 +320,9 @@ class DevServer:
except HTTPException:
raise
except ValueError as e:
# ValueError from load_entity indicates entity not found or invalid
error_msg = self._format_error(e, "Entity loading")
raise HTTPException(status_code=404, detail=error_msg) from e
except Exception as e:
error_msg = self._format_error(e, "Entity info retrieval")
raise HTTPException(status_code=500, detail=error_msg) from e
logger.error(f"Error getting entity info for {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get entity info: {e!s}") from e
@app.post("/v1/entities/{entity_id}/reload")
async def reload_entity(entity_id: str) -> dict[str, Any]:
@@ -543,7 +331,6 @@ class DevServer:
This enables hot reload during development - edit entity code, call this endpoint,
and the next execution will use the updated code without server restart.
"""
self._require_developer_mode("entity hot reload")
try:
executor = await self._ensure_executor()
@@ -566,150 +353,20 @@ class DevServer:
logger.error(f"Error reloading entity {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reload entity: {e!s}") from e
# ============================================================================
# Deployment Endpoints
# ============================================================================
@app.post("/v1/deployments")
async def create_deployment(config: DeploymentConfig) -> StreamingResponse:
"""Deploy entity to Azure Container Apps with streaming events.
Returns SSE stream of deployment progress events.
"""
self._require_developer_mode("deployment")
try:
executor = await self._ensure_executor()
# Validate entity exists and supports deployment
entity_info = executor.get_entity_info(config.entity_id)
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {config.entity_id} not found")
if not entity_info.deployment_supported:
reason = entity_info.deployment_reason or "Deployment not supported for this entity"
raise HTTPException(status_code=400, detail=reason)
# Get entity path from metadata
from pathlib import Path
entity_path_str = entity_info.metadata.get("path")
if not entity_path_str:
raise HTTPException(
status_code=400,
detail="Entity path not found in metadata (in-memory entities cannot be deployed)",
)
entity_path = Path(entity_path_str)
# Stream deployment events
async def event_generator() -> AsyncGenerator[str, None]:
async for event in self.deployment_manager.deploy(config, entity_path):
# Format as SSE
import json
yield f"data: {json.dumps(event.model_dump())}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
except HTTPException:
raise
except Exception as e:
error_msg = self._format_error(e, "Deployment creation")
raise HTTPException(status_code=500, detail=error_msg) from e
@app.get("/v1/deployments")
async def list_deployments(entity_id: str | None = None) -> list[Deployment]:
"""List all deployments, optionally filtered by entity."""
self._require_developer_mode("deployment listing")
try:
return await self.deployment_manager.list_deployments(entity_id)
except Exception as e:
error_msg = self._format_error(e, "Deployment listing")
raise HTTPException(status_code=500, detail=error_msg) from e
@app.get("/v1/deployments/{deployment_id}")
async def get_deployment(deployment_id: str) -> Deployment:
"""Get deployment by ID."""
self._require_developer_mode("deployment details")
try:
deployment = await self.deployment_manager.get_deployment(deployment_id)
if not deployment:
raise HTTPException(status_code=404, detail=f"Deployment {deployment_id} not found")
return deployment
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting deployment: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get deployment: {e!s}") from e
@app.delete("/v1/deployments/{deployment_id}")
async def delete_deployment(deployment_id: str) -> dict[str, Any]:
"""Delete deployment from Azure Container Apps."""
self._require_developer_mode("deployment deletion")
try:
await self.deployment_manager.delete_deployment(deployment_id)
return {"success": True, "message": f"Deployment {deployment_id} deleted successfully"}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except Exception as e:
logger.error(f"Error deleting deployment: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete deployment: {e!s}") from e
# Convenience endpoint: deploy specific entity
@app.post("/v1/entities/{entity_id}/deploy")
async def deploy_entity(entity_id: str, config: DeploymentConfig) -> StreamingResponse:
"""Convenience endpoint to deploy entity (shortcuts to /v1/deployments)."""
self._require_developer_mode("deployment")
# Override entity_id from path parameter
config.entity_id = entity_id
return await create_deployment(config)
# ============================================================================
# Response/Conversation Endpoints
# ============================================================================
@app.post("/v1/responses")
async def create_response(request: AgentFrameworkRequest, raw_request: Request) -> Any:
"""OpenAI Responses API endpoint - routes to local or OpenAI executor."""
"""OpenAI Responses API endpoint."""
try:
# Check if frontend requested OpenAI proxy mode
proxy_mode = raw_request.headers.get("X-Proxy-Backend")
if proxy_mode == "openai":
# Route to OpenAI executor
logger.info("🔀 Routing to OpenAI proxy mode")
openai_executor = await self._ensure_openai_executor()
if not openai_executor.is_configured:
error = OpenAIError.create(
"OpenAI proxy mode not configured. Set OPENAI_API_KEY environment variable."
)
return JSONResponse(status_code=503, content=error.to_dict())
# Execute via OpenAI with dedicated streaming method
if request.stream:
return StreamingResponse(
self._stream_openai_execution(openai_executor, request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Access-Control-Allow-Origin": "*",
},
)
return await openai_executor.execute_sync(request)
# Route to local Agent Framework executor (original behavior)
raw_body = await raw_request.body()
logger.info(f"Raw request body: {raw_body.decode()}")
logger.info(f"Parsed request: metadata={request.metadata}")
logger.info(f"Parsed request: model={request.model}, extra_body={request.extra_body}")
# Get entity_id from metadata
# Get entity_id using the new method
entity_id = request.get_entity_id()
logger.info(f"Extracted entity_id: {entity_id}")
if not entity_id:
error = OpenAIError.create("Missing entity_id in metadata. Provide metadata.entity_id in request.")
error = OpenAIError.create(f"Missing entity_id. Request extra_body: {request.extra_body}")
return JSONResponse(status_code=400, content=error.to_dict())
# Get executor and validate entity exists
@@ -735,86 +392,18 @@ class DevServer:
return await executor.execute_sync(request)
except Exception as e:
error_msg = self._format_error(e, "Request execution")
error = OpenAIError.create(error_msg)
logger.error(f"Error executing request: {e}")
error = OpenAIError.create(f"Execution failed: {e!s}")
return JSONResponse(status_code=500, content=error.to_dict())
# ========================================
# OpenAI Conversations API (Standard)
# ========================================
@app.post("/v1/conversations", response_model=None)
async def create_conversation(raw_request: Request) -> dict[str, Any] | JSONResponse:
"""Create a new conversation - routes to OpenAI or local based on mode."""
@app.post("/v1/conversations")
async def create_conversation(request_data: dict[str, Any]) -> dict[str, Any]:
"""Create a new conversation - OpenAI standard."""
try:
# Parse request body
request_data = await raw_request.json()
# Check if frontend requested OpenAI proxy mode
proxy_mode = raw_request.headers.get("X-Proxy-Backend")
if proxy_mode == "openai":
# Create conversation in OpenAI
openai_executor = await self._ensure_openai_executor()
if not openai_executor.is_configured:
error = OpenAIError.create(
"OpenAI proxy mode not configured. Set OPENAI_API_KEY environment variable.",
type="configuration_error",
code="openai_not_configured",
)
return JSONResponse(status_code=503, content=error.to_dict())
# Use OpenAI client to create conversation
from openai import APIStatusError, AsyncOpenAI, AuthenticationError, PermissionDeniedError
client = AsyncOpenAI(
api_key=openai_executor.api_key,
base_url=openai_executor.base_url,
)
try:
metadata = request_data.get("metadata")
logger.debug(f"Creating OpenAI conversation with metadata: {metadata}")
conversation = await client.conversations.create(metadata=metadata)
logger.info(f"Created OpenAI conversation: {conversation.id}")
return conversation.model_dump()
except AuthenticationError as e:
# 401 - Invalid API key or authentication issue
logger.error(f"OpenAI authentication error creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "authentication_error"),
code=error_data.get("code", "invalid_api_key"),
)
return JSONResponse(status_code=401, content=error.to_dict())
except PermissionDeniedError as e:
# 403 - Permission denied
logger.error(f"OpenAI permission denied creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "permission_denied"),
code=error_data.get("code", "insufficient_permissions"),
)
return JSONResponse(status_code=403, content=error.to_dict())
except APIStatusError as e:
# Other OpenAI API errors (rate limit, etc.)
logger.error(f"OpenAI API error creating conversation: {e}")
error_body = e.body if hasattr(e, "body") else {}
error_data = error_body.get("error", {}) if isinstance(error_body, dict) else {}
error = OpenAIError.create(
message=error_data.get("message", str(e)),
type=error_data.get("type", "api_error"),
code=error_data.get("code", "unknown_error"),
)
return JSONResponse(
status_code=e.status_code if hasattr(e, "status_code") else 500, content=error.to_dict()
)
# Local mode - use DevUI conversation store
metadata = request_data.get("metadata")
executor = await self._ensure_executor()
conversation = executor.conversation_store.create_conversation(metadata=metadata)
@@ -822,39 +411,22 @@ class DevServer:
except HTTPException:
raise
except Exception as e:
logger.error(f"Error creating conversation: {e}", exc_info=True)
error = OpenAIError.create(f"Failed to create conversation: {e!s}")
return JSONResponse(status_code=500, content=error.to_dict())
logger.error(f"Error creating conversation: {e}")
raise HTTPException(status_code=500, detail=f"Failed to create conversation: {e!s}") from e
@app.get("/v1/conversations")
async def list_conversations(
agent_id: str | None = None,
entity_id: str | None = None,
type: str | None = None,
) -> dict[str, Any]:
"""List conversations, optionally filtered by agent_id, entity_id, and/or type.
Query Parameters:
- agent_id: Filter by agent_id (for agent conversations)
- entity_id: Filter by entity_id (for workflow sessions or other entities)
- type: Filter by conversation type (e.g., "workflow_session")
Multiple filters can be combined (AND logic).
"""
async def list_conversations(agent_id: str | None = None) -> dict[str, Any]:
"""List conversations, optionally filtered by agent_id."""
try:
executor = await self._ensure_executor()
# Build filter criteria
filters = {}
if agent_id:
filters["agent_id"] = agent_id
if entity_id:
filters["entity_id"] = entity_id
if type:
filters["type"] = type
# Apply filters
conversations = executor.conversation_store.list_conversations_by_metadata(filters)
# Filter by agent_id metadata
conversations = executor.conversation_store.list_conversations_by_metadata({"agent_id": agent_id})
else:
# Return all conversations (for InMemoryStore, list all)
# Note: This assumes list_conversations_by_metadata({}) returns all
conversations = executor.conversation_store.list_conversations_by_metadata({})
return {
"object": "list",
@@ -939,20 +511,9 @@ class DevServer:
items, has_more = await executor.conversation_store.list_items(
conversation_id, limit=limit, after=after, order=order
)
# Handle both Pydantic models and dicts (some stores return raw dicts)
serialized_items = []
for item in items:
if hasattr(item, "model_dump"):
serialized_items.append(item.model_dump())
elif isinstance(item, dict):
serialized_items.append(item)
else:
logger.warning(f"Unexpected item type: {type(item)}, converting to dict")
serialized_items.append(dict(item))
return {
"object": "list",
"data": serialized_items,
"data": [item.model_dump() for item in items],
"has_more": has_more,
}
except ValueError as e:
@@ -971,51 +532,13 @@ class DevServer:
item = executor.conversation_store.get_item(conversation_id, item_id)
if not item:
raise HTTPException(status_code=404, detail="Item not found")
result: dict[str, Any] = item.model_dump()
return result
return item.model_dump()
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting item {item_id} from conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get item: {e!s}") from e
@app.delete("/v1/conversations/{conversation_id}/items/{item_id}")
async def delete_conversation_item(conversation_id: str, item_id: str) -> dict[str, Any]:
"""Delete conversation item - supports checkpoint deletion."""
try:
executor = await self._ensure_executor()
# Check if this is a checkpoint item
if item_id.startswith("checkpoint_"):
# Extract checkpoint_id from item_id (format: "checkpoint_{checkpoint_id}")
checkpoint_id = item_id[len("checkpoint_") :]
storage = executor.checkpoint_manager.get_checkpoint_storage(conversation_id)
deleted = await storage.delete_checkpoint(checkpoint_id)
if not deleted:
raise HTTPException(status_code=404, detail="Checkpoint not found")
return {
"id": item_id,
"object": "item.deleted",
"deleted": True,
}
# For other items, delegate to conversation store (if it supports deletion)
raise HTTPException(status_code=501, detail="Deletion of non-checkpoint items not implemented")
except HTTPException:
raise
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e
except Exception as e:
logger.error(f"Error deleting item {item_id} from conversation {conversation_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to delete item: {e!s}") from e
# ============================================================================
# Checkpoint Management - Now handled through conversation items API
# Checkpoints are exposed as conversation items with type="checkpoint"
# ============================================================================
async def _stream_execution(
self, executor: AgentFrameworkExecutor, request: AgentFrameworkRequest
) -> AsyncGenerator[str, None]:
@@ -1064,63 +587,6 @@ class DevServer:
error_event = {"id": "error", "object": "error", "error": {"message": str(e), "type": "execution_error"}}
yield f"data: {json.dumps(error_event)}\n\n"
async def _stream_openai_execution(
self, executor: OpenAIExecutor, request: AgentFrameworkRequest
) -> AsyncGenerator[str, None]:
"""Stream execution through OpenAI executor.
OpenAI events are already in final format - no conversion or aggregation needed.
Just serialize and stream them as SSE.
Args:
executor: OpenAI executor instance
request: Request to execute
Yields:
SSE-formatted event strings
"""
try:
# Stream events from OpenAI - they're already ResponseStreamEvent objects
async for event in executor.execute_streaming(request):
# Handle error dicts from executor
if isinstance(event, dict):
payload = json.dumps(event)
yield f"data: {payload}\n\n"
continue
# OpenAI SDK events have model_dump_json() - use it for single-line JSON
if hasattr(event, "model_dump_json"):
payload = event.model_dump_json() # type: ignore[attr-defined]
yield f"data: {payload}\n\n"
else:
# Fallback (shouldn't happen with OpenAI SDK)
logger.warning(f"Unexpected event type from OpenAI: {type(event)}")
payload = json.dumps(str(event))
yield f"data: {payload}\n\n"
# OpenAI already sends response.completed event - no aggregation needed!
# Just send [DONE] marker
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Error in OpenAI streaming execution: {e}", exc_info=True)
# Emit proper response.failed event
import os
error_event = {
"type": "response.failed",
"response": {
"id": f"resp_{os.urandom(16).hex()}",
"status": "failed",
"error": {
"message": str(e),
"type": "internal_error",
"code": "streaming_error",
},
},
}
yield f"data: {json.dumps(error_event)}\n\n"
def _mount_ui(self, app: FastAPI) -> None:
"""Mount the UI as static files."""
from pathlib import Path
@@ -324,71 +324,6 @@ def generate_schema_from_dataclass(cls: type[Any]) -> dict[str, Any]:
return schema
def extract_response_type_from_executor(executor: Any, request_type: type) -> type | None:
"""Extract the expected response type from an executor's response handler.
Looks for methods decorated with @response_handler that have signature:
async def handler(self, original_request: RequestType, response: ResponseType, ctx)
Args:
executor: Executor object that should have a handler for the request type
request_type: The request message type
Returns:
The response type class, or None if not found
"""
try:
from typing import get_type_hints
# Introspect handler methods for @response_handler pattern
for attr_name in dir(executor):
if attr_name.startswith("_"):
continue
attr = getattr(executor, attr_name, None)
if not callable(attr):
continue
# Get type hints for this method
try:
type_hints = get_type_hints(attr)
# Check for @response_handler pattern:
# async def handler(self, original_request: RequestType, response: ResponseType, ctx)
type_hint_params = {k: v for k, v in type_hints.items() if k not in ("self", "return")}
# Look for at least 2 parameters: original_request, response (ctx is optional)
if len(type_hint_params) >= 2:
param_items = list(type_hint_params.items())
# First param should be original_request matching request_type
_, first_param_type = param_items[0]
_, second_param_type = param_items[1] if len(param_items) > 1 else (None, None)
# Check if first param matches request_type
first_matches_request = first_param_type == request_type or (
hasattr(first_param_type, "__name__")
and hasattr(request_type, "__name__")
and first_param_type.__name__ == request_type.__name__
)
# Verify we have a matching request type and valid response type (must be a type class)
if first_matches_request and second_param_type is not None and isinstance(second_param_type, type):
response_type_class: type = second_param_type
logger.debug(
f"Found response type {response_type_class} for request {request_type} "
f"via @response_handler"
)
return response_type_class
except Exception as e:
logger.debug(f"Failed to get type hints for {attr_name}: {e}")
continue
except Exception as e:
logger.debug(f"Failed to extract response type from executor: {e}")
return None
def generate_input_schema(input_type: type) -> dict[str, Any]:
"""Generate JSON schema for workflow input type.
@@ -27,18 +27,14 @@ from openai.types.responses import (
from openai.types.responses.response_usage import InputTokensDetails, OutputTokensDetails
from openai.types.shared import Metadata, ResponsesModel
from ._discovery_models import Deployment, DeploymentConfig, DeploymentEvent, DiscoveryResponse, EntityInfo
from ._discovery_models import DiscoveryResponse, EntityInfo
from ._openai_custom import (
AgentFrameworkRequest,
CustomResponseOutputItemAddedEvent,
CustomResponseOutputItemDoneEvent,
ExecutorActionItem,
MetaResponse,
OpenAIError,
ResponseFunctionResultComplete,
ResponseOutputData,
ResponseOutputFile,
ResponseOutputImage,
ResponseTraceEvent,
ResponseTraceEventComplete,
ResponseWorkflowEventComplete,
@@ -55,14 +51,10 @@ __all__ = [
"ConversationItem",
"CustomResponseOutputItemAddedEvent",
"CustomResponseOutputItemDoneEvent",
"Deployment",
"DeploymentConfig",
"DeploymentEvent",
"DiscoveryResponse",
"EntityInfo",
"ExecutorActionItem",
"InputTokensDetails",
"MetaResponse",
"Metadata",
"OpenAIError",
"OpenAIResponse",
@@ -75,9 +67,6 @@ __all__ = [
"ResponseFunctionToolCall",
"ResponseFunctionToolCallOutputItem",
"ResponseInputParam",
"ResponseOutputData",
"ResponseOutputFile",
"ResponseOutputImage",
"ResponseOutputItemAddedEvent",
"ResponseOutputItemDoneEvent",
"ResponseOutputMessage",
@@ -4,10 +4,9 @@
from __future__ import annotations
import re
from typing import Any
from pydantic import BaseModel, Field, field_validator
from pydantic import BaseModel, Field
class EnvVarRequirement(BaseModel):
@@ -37,10 +36,6 @@ class EntityInfo(BaseModel):
# Environment variable requirements
required_env_vars: list[EnvVarRequirement] | None = None
# Deployment support
deployment_supported: bool = False # Whether entity can be deployed
deployment_reason: str | None = None # Explanation of why/why not entity can be deployed
# Agent-specific fields (optional, populated when available)
instructions: str | None = None
model_id: str | None = None
@@ -60,144 +55,3 @@ class DiscoveryResponse(BaseModel):
"""Response model for entity discovery."""
entities: list[EntityInfo] = Field(default_factory=list)
# ============================================================================
# Deployment Models
# ============================================================================
class DeploymentConfig(BaseModel):
"""Configuration for deploying an entity."""
entity_id: str = Field(description="Entity ID to deploy")
resource_group: str = Field(description="Azure resource group name")
app_name: str = Field(description="Azure Container App name")
region: str = Field(default="eastus", description="Azure region")
ui_mode: str = Field(default="user", description="UI mode (user or developer)")
ui_enabled: bool = Field(default=True, description="Whether to enable web interface")
stream: bool = Field(default=True, description="Stream deployment events")
@field_validator("app_name")
@classmethod
def validate_app_name(cls, v: str) -> str:
"""Validate Azure Container App name format.
Azure Container App names must:
- Be 3-32 characters long
- Contain only lowercase letters, numbers, and hyphens
- Start with a lowercase letter
- End with a lowercase letter or number
- Not contain consecutive hyphens
"""
if not v:
raise ValueError("app_name cannot be empty")
if len(v) < 3 or len(v) > 32:
raise ValueError("app_name must be between 3 and 32 characters")
if not re.match(r"^[a-z][a-z0-9-]*[a-z0-9]$", v):
raise ValueError(
"app_name must start with a lowercase letter, "
"end with a letter or number, and contain only lowercase letters, numbers, and hyphens"
)
if "--" in v:
raise ValueError("app_name cannot contain consecutive hyphens")
return v
@field_validator("resource_group")
@classmethod
def validate_resource_group(cls, v: str) -> str:
"""Validate Azure resource group name format.
Azure resource group names must:
- Be 1-90 characters long
- Contain only alphanumeric, underscore, parentheses, hyphen, period (except at end)
- Not end with a period
"""
if not v:
raise ValueError("resource_group cannot be empty")
if len(v) > 90:
raise ValueError("resource_group must be 90 characters or less")
if not re.match(r"^[a-zA-Z0-9._()-]+$", v):
raise ValueError(
"resource_group can only contain alphanumeric characters, "
"underscores, hyphens, periods, and parentheses"
)
if v.endswith("."):
raise ValueError("resource_group cannot end with a period")
return v
@field_validator("region")
@classmethod
def validate_region(cls, v: str) -> str:
"""Validate Azure region format.
Validates that the region string is a reasonable format.
Does not validate against the full list of Azure regions (which changes).
"""
if not v:
raise ValueError("region cannot be empty")
if len(v) > 50:
raise ValueError("region name too long")
# Azure regions are typically lowercase with no spaces (e.g., eastus, westeurope)
if not re.match(r"^[a-z0-9]+$", v):
raise ValueError("region must contain only lowercase letters and numbers (e.g., eastus, westeurope)")
return v
@field_validator("entity_id")
@classmethod
def validate_entity_id(cls, v: str) -> str:
"""Validate entity_id format to prevent injection attacks."""
if not v:
raise ValueError("entity_id cannot be empty")
if len(v) > 256:
raise ValueError("entity_id too long")
# Allow alphanumeric, hyphens, underscores, and periods
if not re.match(r"^[a-zA-Z0-9._-]+$", v):
raise ValueError("entity_id contains invalid characters")
return v
@field_validator("ui_mode")
@classmethod
def validate_ui_mode(cls, v: str) -> str:
"""Validate ui_mode is one of the allowed values."""
if v not in ("user", "developer"):
raise ValueError("ui_mode must be 'user' or 'developer'")
return v
class DeploymentEvent(BaseModel):
"""Real-time deployment event (SSE)."""
type: str = Field(description="Event type (e.g., deploy.validating, deploy.building)")
message: str = Field(description="Human-readable message")
url: str | None = Field(default=None, description="Deployment URL (on completion)")
auth_token: str | None = Field(default=None, description="Auth token (on completion, shown once)")
class Deployment(BaseModel):
"""Deployment record."""
id: str = Field(description="Deployment ID (UUID)")
entity_id: str = Field(description="Entity ID that was deployed")
resource_group: str = Field(description="Azure resource group")
app_name: str = Field(description="Azure Container App name")
region: str = Field(description="Azure region")
url: str = Field(description="Deployment URL")
status: str = Field(description="Deployment status (deploying, deployed, failed)")
created_at: str = Field(description="ISO 8601 timestamp")
error: str | None = Field(default=None, description="Error message if failed")
@@ -80,16 +80,9 @@ class CustomResponseOutputItemDoneEvent(BaseModel):
class ResponseWorkflowEventComplete(BaseModel):
"""Complete workflow event data.
"""Complete workflow event data."""
DevUI extension for workflow execution events (debugging/observability).
Uses past-tense 'completed' to follow OpenAI's event naming pattern.
Workflow events are shown in the debug panel for monitoring execution flow,
not in main chat. Use response.output_item.added for user-facing content.
"""
type: Literal["response.workflow_event.completed"] = "response.workflow_event.completed"
type: Literal["response.workflow_event.complete"] = "response.workflow_event.complete"
data: dict[str, Any] # Complete event data, not delta
executor_id: str | None = None
item_id: str
@@ -98,17 +91,9 @@ class ResponseWorkflowEventComplete(BaseModel):
class ResponseTraceEventComplete(BaseModel):
"""Complete trace event data.
"""Complete trace event data."""
DevUI extension for non-displayable debugging/metadata events.
Uses past-tense 'completed' to follow OpenAI's event naming pattern
(e.g., response.completed, response.output_item.added).
Trace events are shown in the Traces debug panel, not in main chat.
Use response.output_item.added for user-facing content.
"""
type: Literal["response.trace.completed"] = "response.trace.completed"
type: Literal["response.trace.complete"] = "response.trace.complete"
data: dict[str, Any] # Complete trace data, not delta
span_id: str | None = None
item_id: str
@@ -139,139 +124,6 @@ class ResponseFunctionResultComplete(BaseModel):
timestamp: str | None = None # Optional timestamp for UI display
class ResponseRequestInfoEvent(BaseModel):
"""DevUI extension: Workflow requests human input.
This is a DevUI extension because:
- OpenAI Responses API doesn't have a concept of workflow human-in-the-loop pausing
- Agent Framework workflows can pause via RequestInfoExecutor to collect external information
- Clients need to render forms and submit responses to continue workflow execution
When a workflow emits this event, it enters IDLE_WITH_PENDING_REQUESTS state.
Client should render a form based on request_schema and submit responses via
a new request with workflow_hil_response content type.
"""
type: Literal["response.request_info.requested"] = "response.request_info.requested"
request_id: str
"""Unique identifier for correlating this request with the response."""
source_executor_id: str
"""ID of the executor that is waiting for this response."""
request_type: str
"""Fully qualified type name of the request (e.g., 'module.path:ClassName')."""
request_data: dict[str, Any]
"""Current data from the RequestInfoMessage (may contain defaults/context)."""
request_schema: dict[str, Any]
"""JSON schema describing the request data structure (what the workflow is asking about)."""
response_schema: dict[str, Any] | None = None
"""JSON schema describing the expected response structure for form rendering (what user should provide)."""
item_id: str
"""OpenAI item ID for correlation."""
output_index: int = 0
"""Output index for OpenAI compatibility."""
sequence_number: int
"""Sequence number for ordering events."""
timestamp: str
"""ISO timestamp when the request was made."""
# DevUI Output Content Types - for agent-generated media/data
# These extend ResponseOutputItem to support rich content outputs that OpenAI's API doesn't natively support
class ResponseOutputImage(BaseModel):
"""DevUI extension: Agent-generated image output.
This is a DevUI extension because:
- OpenAI Responses API only supports text output in ResponseOutputMessage.content
- ImageGenerationCall exists but is for tool calls (generating images), not returning existing images
- Agent Framework agents can return images via DataContent/UriContent that need proper display
This type allows images to be displayed inline in chat rather than hidden in trace logs.
"""
id: str
"""The unique ID of the image output."""
image_url: str
"""The URL or data URI of the image (e.g., data:image/png;base64,...)"""
type: Literal["output_image"] = "output_image"
"""The type of the output. Always `output_image`."""
alt_text: str | None = None
"""Optional alt text for accessibility."""
mime_type: str = "image/png"
"""The MIME type of the image (e.g., image/png, image/jpeg)."""
class ResponseOutputFile(BaseModel):
"""DevUI extension: Agent-generated file output.
This is a DevUI extension because:
- OpenAI Responses API only supports text output in ResponseOutputMessage.content
- Agent Framework agents can return files via DataContent/UriContent that need proper display
- Supports PDFs, audio files, and other media types
This type allows files to be displayed inline in chat with appropriate renderers.
"""
id: str
"""The unique ID of the file output."""
filename: str
"""The filename (used to determine rendering and download)."""
type: Literal["output_file"] = "output_file"
"""The type of the output. Always `output_file`."""
file_url: str | None = None
"""Optional URL to the file."""
file_data: str | None = None
"""Optional base64-encoded file data."""
mime_type: str = "application/octet-stream"
"""The MIME type of the file (e.g., application/pdf, audio/mp3)."""
class ResponseOutputData(BaseModel):
"""DevUI extension: Agent-generated generic data output.
This is a DevUI extension because:
- OpenAI Responses API only supports text output in ResponseOutputMessage.content
- Agent Framework agents can return arbitrary structured data that needs display
- Useful for debugging and displaying non-text content
This type allows generic data to be displayed inline in chat.
"""
id: str
"""The unique ID of the data output."""
data: str
"""The data payload (string representation)."""
type: Literal["output_data"] = "output_data"
"""The type of the output. Always `output_data`."""
mime_type: str
"""The MIME type of the data."""
description: str | None = None
"""Optional description of the data."""
# Agent Framework extension fields
class AgentFrameworkExtraBody(BaseModel):
"""Agent Framework specific routing fields for OpenAI requests."""
@@ -292,7 +144,7 @@ class AgentFrameworkRequest(BaseModel):
"""
# All OpenAI fields from ResponseCreateParams
model: str | None = None
model: str # Used as entity_id in DevUI!
input: str | list[Any] | dict[str, Any] # ResponseInputParam + dict for workflow structured input
stream: bool | None = False
@@ -304,25 +156,20 @@ class AgentFrameworkRequest(BaseModel):
metadata: dict[str, Any] | None = None
temperature: float | None = None
max_output_tokens: int | None = None
top_p: float | None = None
tools: list[dict[str, Any]] | None = None
# Reasoning parameters (for o-series models)
reasoning: dict[str, Any] | None = None # {"effort": "low" | "medium" | "high" | "minimal"}
# Optional extra_body for advanced use cases
extra_body: dict[str, Any] | None = None
model_config = ConfigDict(extra="allow")
def get_entity_id(self) -> str | None:
"""Get entity_id from metadata.entity_id.
def get_entity_id(self) -> str:
"""Get entity_id from model field.
In DevUI, entity_id is specified in metadata for routing.
In DevUI, model IS the entity_id (agent/workflow name).
Simple and clean!
"""
if self.metadata:
return self.metadata.get("entity_id")
return None
return self.model
def get_conversation_id(self) -> str | None:
"""Extract conversation_id from conversation parameter.
@@ -371,40 +218,11 @@ class OpenAIError(BaseModel):
return self.model_dump_json()
class MetaResponse(BaseModel):
"""Server metadata response for /meta endpoint.
Provides information about the DevUI server configuration and capabilities.
"""
ui_mode: Literal["developer", "user"] = "developer"
"""UI interface mode - 'developer' shows debug tools, 'user' shows simplified interface."""
version: str
"""DevUI version string."""
framework: str = "agent_framework"
"""Backend framework identifier."""
runtime: Literal["python", "dotnet"] = "python"
"""Backend runtime/language - 'python' or 'dotnet' for deployment guides and feature availability."""
capabilities: dict[str, bool] = {}
"""Server capabilities (e.g., tracing, openai_proxy)."""
auth_required: bool = False
"""Whether the server requires Bearer token authentication."""
# Export all custom types
__all__ = [
"AgentFrameworkRequest",
"MetaResponse",
"OpenAIError",
"ResponseFunctionResultComplete",
"ResponseOutputData",
"ResponseOutputFile",
"ResponseOutputImage",
"ResponseTraceEvent",
"ResponseTraceEventComplete",
"ResponseWorkflowEventComplete",
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long