Python: Add Function Approval UI to DevUI (#1401)

* ensure function aproval is parsed correctly

* udpate ui, add deployment guide button, other debug panel fixes

* feat(devui): Implement lazy loading architecture with enhanced security and state management

Major architectural improvements to DevUI for better performance, security, and developer experience:

Performance & Architecture:
- Implement lazy loading for entity discovery - entities loaded on-demand instead of at startup
- Add hot reload capability for development workflow via new reload endpoint
- Reduce startup time and memory footprint by deferring module imports

Security Enhancements:
- Remove remote entity loading capabilities (POST /v1/entities/add, DELETE endpoints)
- DevUI now strictly local development tool - no remote code execution
- Add explicit security documentation and best practices in README

Frontend Improvements:
- Migrate to Zustand for centralized state management (replacing prop drilling)
- Add lightweight zero-dependency markdown renderer with code block copy support
- Improve gallery UX with setup instructions modal instead of direct URL loading
- Enhanced message UI with copy functionality and better token usage display

Testing & Quality:
- Expand test coverage for lazy loading, type detection, and cache invalidation
- Add comprehensive tests for new behaviors (+231 lines of test code)
- Improve type safety and documentation throughout

Breaking Changes:
- Remote entity loading via URLs is no longer supported
- Entities must be loaded from local filesystem only

* update ui issues, uupdate test descripion
This commit is contained in:
Victor Dibia
2025-10-15 14:36:29 -07:00
committed by GitHub
Unverified
parent 331c750515
commit b64358df7e
38 changed files with 3726 additions and 1814 deletions
@@ -4,7 +4,6 @@
from __future__ import annotations
import hashlib
import importlib
import importlib.util
import logging
@@ -13,17 +12,12 @@ import uuid
from pathlib import Path
from typing import Any
import httpx
from dotenv import load_dotenv
from .models._discovery_models import EntityInfo
logger = logging.getLogger(__name__)
# Constants for remote entity fetching
REMOTE_FETCH_TIMEOUT_SECONDS = 30.0
REMOTE_FETCH_MAX_SIZE_MB = 10
class EntityDiscovery:
"""Discovery for Agent Framework entities - agents and workflows."""
@@ -37,7 +31,6 @@ class EntityDiscovery:
self.entities_dir = entities_dir
self._entities: dict[str, EntityInfo] = {}
self._loaded_objects: dict[str, Any] = {}
self._remote_cache_dir = Path.home() / ".agent_framework_devui" / "remote_cache"
async def discover_entities(self) -> list[EntityInfo]:
"""Scan for Agent Framework entities.
@@ -77,6 +70,115 @@ class EntityDiscovery:
"""
return self._loaded_objects.get(entity_id)
async def load_entity(self, entity_id: str) -> Any:
"""Load entity on-demand (lazy loading).
This method implements lazy loading by importing the entity module only when needed.
In-memory entities are returned from cache immediately.
Args:
entity_id: Entity identifier
Returns:
Loaded entity object
Raises:
ValueError: If entity not found or cannot be loaded
"""
# Check if already loaded (includes in-memory entities)
if entity_id in self._loaded_objects:
logger.debug(f"Entity {entity_id} already loaded (cache hit)")
return self._loaded_objects[entity_id]
# Get entity metadata
entity_info = self._entities.get(entity_id)
if not entity_info:
raise ValueError(f"Entity {entity_id} not found in registry")
# In-memory entities should never reach here (they're pre-loaded)
if entity_info.source == "in_memory":
raise ValueError(f"In-memory entity {entity_id} missing from loaded objects cache")
logger.info(f"Lazy loading entity: {entity_id} (source: {entity_info.source})")
# Load based on source - only directory and in-memory are supported
if entity_info.source == "directory":
entity_obj = await self._load_directory_entity(entity_id, entity_info)
else:
raise ValueError(
f"Unsupported entity source: {entity_info.source}. "
f"Only 'directory' and 'in_memory' sources are supported."
)
# Enrich metadata with actual entity data
# Don't pass entity_type if it's "unknown" - let inference determine the real type
enriched_info = await self.create_entity_info_from_object(
entity_obj,
entity_type=entity_info.type if entity_info.type != "unknown" else None,
source=entity_info.source,
)
# IMPORTANT: Preserve the original entity_id (enrichment generates a new one)
enriched_info.id = entity_id
# Preserve the original path from sparse metadata
if "path" in entity_info.metadata:
enriched_info.metadata["path"] = entity_info.metadata["path"]
enriched_info.metadata["lazy_loaded"] = True
self._entities[entity_id] = enriched_info
# Cache the loaded object
self._loaded_objects[entity_id] = entity_obj
logger.info(f"✅ Successfully loaded entity: {entity_id} (type: {enriched_info.type})")
return entity_obj
async def _load_directory_entity(self, entity_id: str, entity_info: EntityInfo) -> Any:
"""Load entity from directory (imports module).
Args:
entity_id: Entity identifier
entity_info: Entity metadata
Returns:
Loaded entity object
"""
# Get directory path from metadata
dir_path = Path(entity_info.metadata.get("path", ""))
if not dir_path.exists(): # noqa: ASYNC240
raise ValueError(f"Entity directory not found: {dir_path}")
# Load .env if it exists
if dir_path.is_dir(): # noqa: ASYNC240
self._load_env_for_entity(dir_path)
else:
self._load_env_for_entity(dir_path.parent)
# Import the module
if dir_path.is_dir(): # noqa: ASYNC240
# Directory-based entity - try different import patterns
import_patterns = [
entity_id,
f"{entity_id}.agent",
f"{entity_id}.workflow",
]
for pattern in import_patterns:
module = self._load_module_from_pattern(pattern)
if module:
# Find entity in module - pass entity_id so registration uses correct ID
entity_obj = await self._find_entity_in_module(module, entity_id, str(dir_path))
if entity_obj:
return entity_obj
raise ValueError(f"No valid entity found in {dir_path}")
# File-based entity
module = self._load_module_from_file(dir_path, entity_id)
if module:
entity_obj = await self._find_entity_in_module(module, entity_id, str(dir_path))
if entity_obj:
return entity_obj
raise ValueError(f"No valid entity found in {dir_path}")
def list_entities(self) -> list[EntityInfo]:
"""List all discovered entities.
@@ -85,6 +187,48 @@ class EntityDiscovery:
"""
return list(self._entities.values())
def invalidate_entity(self, entity_id: str) -> None:
"""Invalidate (clear cache for) an entity to enable hot reload.
This removes the entity from the loaded objects cache and clears its module
from Python's sys.modules cache. The entity metadata remains, so it will be
reimported on next access.
Args:
entity_id: Entity identifier to invalidate
"""
# Remove from loaded objects cache
if entity_id in self._loaded_objects:
del self._loaded_objects[entity_id]
logger.info(f"Cleared loaded object cache for: {entity_id}")
# Clear from Python's module cache (including submodules)
keys_to_delete = [
module_name
for module_name in sys.modules
if module_name == entity_id or module_name.startswith(f"{entity_id}.")
]
for key in keys_to_delete:
del sys.modules[key]
logger.debug(f"Cleared module cache: {key}")
# Reset lazy_loaded flag in metadata
entity_info = self._entities.get(entity_id)
if entity_info and "lazy_loaded" in entity_info.metadata:
entity_info.metadata["lazy_loaded"] = False
logger.info(f"♻️ Entity invalidated: {entity_id} (will reload on next access)")
def invalidate_all(self) -> None:
"""Invalidate all cached entities.
Useful for forcing a complete reload of all entities.
"""
entity_ids = list(self._loaded_objects.keys())
for entity_id in entity_ids:
self.invalidate_entity(entity_id)
logger.info(f"Invalidated {len(entity_ids)} entities")
def register_entity(self, entity_id: str, entity_info: EntityInfo, entity_object: Any) -> None:
"""Register an entity with both metadata and object.
@@ -187,7 +331,10 @@ class EntityDiscovery:
)
async def _scan_entities_directory(self, entities_dir: Path) -> None:
"""Scan the entities directory for Agent Framework entities.
"""Scan the entities directory for Agent Framework entities (lazy loading).
This method scans the filesystem WITHOUT importing modules, creating sparse
metadata that will be enriched on-demand when entities are accessed.
Args:
entities_dir: Directory to scan for entities
@@ -196,78 +343,120 @@ class EntityDiscovery:
logger.warning(f"Entities directory not found: {entities_dir}")
return
logger.info(f"Scanning {entities_dir} for Agent Framework entities...")
logger.info(f"Scanning {entities_dir} for Agent Framework entities (lazy mode)...")
# Add entities directory to Python path if not already there
entities_dir_str = str(entities_dir)
if entities_dir_str not in sys.path:
sys.path.insert(0, entities_dir_str)
# Scan for directories and Python files
# Scan for directories and Python files WITHOUT importing
for item in entities_dir.iterdir(): # noqa: ASYNC240
if item.name.startswith(".") or item.name == "__pycache__":
continue
if item.is_dir():
# Directory-based entity
await self._discover_entities_in_directory(item)
if item.is_dir() and self._looks_like_entity(item):
# Directory-based entity - create sparse metadata
self._register_sparse_entity(item)
elif item.is_file() and item.suffix == ".py" and not item.name.startswith("_"):
# Single file entity
await self._discover_entities_in_file(item)
# Single file entity - create sparse metadata
self._register_sparse_file_entity(item)
async def _discover_entities_in_directory(self, dir_path: Path) -> None:
"""Discover entities in a directory using module import.
def _looks_like_entity(self, dir_path: Path) -> bool:
"""Check if directory contains an entity (without importing).
Args:
dir_path: Directory containing entity
dir_path: Directory to check
Returns:
True if directory appears to contain an entity
"""
return (
(dir_path / "agent.py").exists()
or (dir_path / "workflow.py").exists()
or (dir_path / "__init__.py").exists()
)
def _detect_entity_type(self, dir_path: Path) -> str:
"""Detect entity type from directory structure (without importing).
Uses filename conventions to determine entity type:
- workflow.py → "workflow"
- agent.py → "agent"
- both or neither → "unknown"
Args:
dir_path: Directory to analyze
Returns:
Entity type: "workflow", "agent", or "unknown"
"""
has_agent = (dir_path / "agent.py").exists()
has_workflow = (dir_path / "workflow.py").exists()
if has_agent and has_workflow:
# Both files exist - ambiguous, mark as unknown
return "unknown"
if has_workflow:
return "workflow"
if has_agent:
return "agent"
# Has __init__.py but no specific file
return "unknown"
def _register_sparse_entity(self, dir_path: Path) -> None:
"""Register entity with sparse metadata (no import).
Args:
dir_path: Entity directory
"""
entity_id = dir_path.name
logger.debug(f"Scanning directory: {entity_id}")
entity_type = self._detect_entity_type(dir_path)
try:
# Load environment variables for this entity first
self._load_env_for_entity(dir_path)
entity_info = EntityInfo(
id=entity_id,
name=entity_id.replace("_", " ").title(),
type=entity_type,
framework="agent_framework",
tools=[], # Sparse - will be populated on load
description="", # Sparse - will be populated on load
source="directory",
metadata={
"path": str(dir_path),
"discovered": True,
"lazy_loaded": False,
},
)
# Try different import patterns
import_patterns = [
entity_id, # Direct module import
f"{entity_id}.agent", # agent.py submodule
f"{entity_id}.workflow", # workflow.py submodule
]
self._entities[entity_id] = entity_info
logger.debug(f"Registered sparse entity: {entity_id} (type: {entity_type})")
for pattern in import_patterns:
module = self._load_module_from_pattern(pattern)
if module:
entities_found = await self._find_entities_in_module(module, entity_id, str(dir_path))
if entities_found:
logger.debug(f"Found {len(entities_found)} entities in {pattern}")
break
except Exception as e:
logger.warning(f"Error scanning directory {entity_id}: {e}")
async def _discover_entities_in_file(self, file_path: Path) -> None:
"""Discover entities in a single Python file.
def _register_sparse_file_entity(self, file_path: Path) -> None:
"""Register file-based entity with sparse metadata (no import).
Args:
file_path: Python file to scan
file_path: Entity Python file
"""
try:
# Load environment variables for this entity's directory first
self._load_env_for_entity(file_path.parent)
entity_id = file_path.stem
# Create module name from file path
base_name = file_path.stem
# File-based entities are typically agents, but we can't know for sure without importing
entity_info = EntityInfo(
id=entity_id,
name=entity_id.replace("_", " ").title(),
type="unknown", # Will be determined on load
framework="agent_framework",
tools=[],
description="",
source="directory",
metadata={
"path": str(file_path),
"discovered": True,
"lazy_loaded": False,
},
)
# Load the module directly from file
module = self._load_module_from_file(file_path, base_name)
if module:
entities_found = await self._find_entities_in_module(module, base_name, str(file_path))
if entities_found:
logger.debug(f"Found {len(entities_found)} entities in {file_path.name}")
except Exception as e:
logger.warning(f"Error scanning file {file_path}: {e}")
self._entities[entity_id] = entity_info
logger.debug(f"Registered sparse file entity: {entity_id}")
def _load_env_for_entity(self, entity_path: Path) -> bool:
"""Load .env file for an entity.
@@ -359,19 +548,17 @@ class EntityDiscovery:
logger.warning(f"Error loading module from {file_path}: {e}")
return None
async def _find_entities_in_module(self, module: Any, base_id: str, module_path: str) -> list[str]:
"""Find agent and workflow entities in a loaded module.
async def _find_entity_in_module(self, module: Any, entity_id: str, module_path: str) -> Any:
"""Find agent or workflow entity in a loaded module.
Args:
module: Loaded Python module
base_id: Base identifier for entities
entity_id: Expected entity identifier to register with
module_path: Path to module for metadata
Returns:
List of entity IDs that were found and registered
Loaded entity object, or None if not found
"""
entities_found = []
# Look for explicit variable names first
candidates = [
("agent", getattr(module, "agent", None)),
@@ -383,11 +570,12 @@ class EntityDiscovery:
continue
if self._is_valid_entity(obj, obj_type):
# Pass source as "directory" for directory-discovered entities
await self._register_entity_from_object(obj, obj_type, module_path, source="directory")
entities_found.append(obj_type)
# Register with the correct entity_id (from directory name)
# Store the object directly in _loaded_objects so we can return it
self._loaded_objects[entity_id] = obj
return obj
return entities_found
return None
def _is_valid_entity(self, obj: Any, expected_type: str) -> bool:
"""Check if object is a valid agent or workflow using duck typing.
@@ -602,173 +790,3 @@ class EntityDiscovery:
full_uuid = uuid.uuid4().hex
return f"{entity_type}_{source}_{base_name}_{full_uuid}"
async def fetch_remote_entity(
self, url: str, metadata: dict[str, Any] | None = None
) -> tuple[EntityInfo | None, str | None]:
"""Fetch and register entity from URL.
Args:
url: URL to Python file containing entity
metadata: Additional metadata (source, sampleId, etc.)
Returns:
Tuple of (EntityInfo if successful, error_message if failed)
"""
try:
normalized_url = self._normalize_url(url)
logger.info(f"Normalized URL: {normalized_url}")
content = await self._fetch_url_content(normalized_url)
if not content:
error_msg = "Failed to fetch content from URL. The file may not exist or is not accessible."
logger.warning(error_msg)
return None, error_msg
if not self._validate_python_syntax(content):
error_msg = "Invalid Python syntax in the file. Please check the file contains valid Python code."
logger.warning(error_msg)
return None, error_msg
entity_object = await self._load_entity_from_content(content, url)
if not entity_object:
error_msg = (
"No valid agent or workflow found in the file. "
"Make sure the file contains an 'agent' or 'workflow' variable."
)
logger.warning(error_msg)
return None, error_msg
entity_info = await self.create_entity_info_from_object(
entity_object,
entity_type=None, # Auto-detect
source="remote",
)
entity_info.source = metadata.get("source", "remote_gallery") if metadata else "remote_gallery"
entity_info.original_url = url
if metadata:
entity_info.metadata.update(metadata)
self.register_entity(entity_info.id, entity_info, entity_object)
logger.info(f"Successfully added remote entity: {entity_info.id}")
return entity_info, None
except Exception as e:
error_msg = f"Unexpected error: {e!s}"
logger.error(f"Error fetching remote entity from {url}: {e}", exc_info=True)
return None, error_msg
def _normalize_url(self, url: str) -> str:
"""Convert various Git hosting URLs to raw content URLs."""
# GitHub: blob -> raw
if "github.com" in url and "/blob/" in url:
return url.replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")
# GitLab: blob -> raw
if "gitlab.com" in url and "/-/blob/" in url:
return url.replace("/-/blob/", "/-/raw/")
# Bitbucket: src -> raw
if "bitbucket.org" in url and "/src/" in url:
return url.replace("/src/", "/raw/")
return url
async def _fetch_url_content(self, url: str, max_size_mb: int = REMOTE_FETCH_MAX_SIZE_MB) -> str | None:
"""Fetch content from URL with size and timeout limits."""
try:
async with httpx.AsyncClient(timeout=REMOTE_FETCH_TIMEOUT_SECONDS) as client:
response = await client.get(url)
if response.status_code != 200:
logger.warning(f"HTTP {response.status_code} for {url}")
return None
# Check content length
content_length = response.headers.get("content-length")
if content_length and int(content_length) > max_size_mb * 1024 * 1024:
logger.warning(f"File too large: {content_length} bytes")
return None
# Read with size limit
content = response.text
if len(content.encode("utf-8")) > max_size_mb * 1024 * 1024:
logger.warning("Content too large after reading")
return None
return content
except Exception as e:
logger.error(f"Error fetching {url}: {e}")
return None
def _validate_python_syntax(self, content: str) -> bool:
"""Validate that content is valid Python code."""
try:
compile(content, "<remote>", "exec")
return True
except SyntaxError as e:
logger.warning(f"Python syntax error: {e}")
return False
async def _load_entity_from_content(self, content: str, source_url: str) -> Any | None:
"""Load entity object from Python content string using disk-based import.
This method caches remote entities to disk and uses importlib for loading,
making it consistent with local entity discovery and avoiding exec() security warnings.
"""
try:
# Create cache directory if it doesn't exist
self._remote_cache_dir.mkdir(parents=True, exist_ok=True)
# Generate a unique filename based on URL hash
url_hash = hashlib.sha256(source_url.encode()).hexdigest()[:16]
module_name = f"remote_entity_{url_hash}"
cached_file = self._remote_cache_dir / f"{module_name}.py"
# Write content to cache file
cached_file.write_text(content, encoding="utf-8")
logger.debug(f"Cached remote entity to {cached_file}")
# Load module from cached file using importlib (same as local scanning)
module = self._load_module_from_file(cached_file, module_name)
if not module:
logger.warning(f"Failed to load module from cached file: {cached_file}")
return None
# Look for agent or workflow objects in the loaded module
for name in dir(module):
if name.startswith("_"):
continue
obj = getattr(module, name)
# Check for explicitly named entities first
if name in ["agent", "workflow"] and self._is_valid_entity(obj, name):
return obj
# Also check if any object looks like an agent/workflow
if self._is_valid_agent(obj) or self._is_valid_workflow(obj):
return obj
return None
except Exception as e:
logger.error(f"Error loading entity from content: {e}")
return None
def remove_remote_entity(self, entity_id: str) -> bool:
"""Remove a remote entity by ID."""
if entity_id in self._entities:
entity_info = self._entities[entity_id]
if entity_info.source in ["remote_gallery", "remote"]:
del self._entities[entity_id]
if entity_id in self._loaded_objects:
del self._loaded_objects[entity_id]
logger.info(f"Removed remote entity: {entity_id}")
return True
logger.warning(f"Cannot remove local entity: {entity_id}")
return False
return False
@@ -169,9 +169,11 @@ class AgentFrameworkExecutor:
Raw Agent Framework events and trace events
"""
try:
# Get entity info and object
# Get entity info
entity_info = self.get_entity_info(entity_id)
entity_obj = self.entity_discovery.get_entity_object(entity_id)
# Trigger lazy loading (will return from cache if already loaded)
entity_obj = await self.entity_discovery.load_entity(entity_id)
if not entity_obj:
raise EntityNotFoundError(f"Entity object for '{entity_id}' not found")
@@ -524,21 +524,15 @@ class MessageMapper:
async def _map_function_result_content(
self, content: Any, context: dict[str, Any]
) -> ResponseFunctionResultComplete:
"""Map FunctionResultContent to custom DevUI event.
"""Map FunctionResultContent to DevUI custom event.
This is a DevUI extension - OpenAI doesn't stream function execution results
because in their model, applications execute functions, not the API.
Agent Framework executes functions, so we emit this event for debugging visibility.
IMPORTANT: Always use Agent Framework's call_id from the content.
Do NOT generate a new call_id - it must match the one from the function call event.
DevUI extension: The OpenAI Responses API doesn't stream function execution results
(in OpenAI's model, the application executes functions, not the API).
"""
# Get call_id from content - this MUST match the call_id from the function call
# Get call_id from content
call_id = getattr(content, "call_id", None)
if not call_id:
logger.warning("FunctionResultContent missing call_id - this will break call/result pairing")
call_id = f"call_{uuid.uuid4().hex[:8]}" # Fallback only if truly missing
call_id = f"call_{uuid.uuid4().hex[:8]}"
# Extract result
result = getattr(content, "result", None)
@@ -547,16 +541,19 @@ class MessageMapper:
# Convert result to string
output = result if isinstance(result, str) else json.dumps(result) if result is not None else ""
# Determine status
# Determine status based on exception
status = "incomplete" if exception else "completed"
# Return custom DevUI event
# Generate item_id
item_id = f"item_{uuid.uuid4().hex[:8]}"
# Return DevUI custom event
return ResponseFunctionResultComplete(
type="response.function_result.complete",
call_id=call_id,
output=output,
status=status,
item_id=context["item_id"],
item_id=item_id,
output_index=context["output_index"],
sequence_number=self._next_sequence(context),
)
@@ -663,15 +660,24 @@ class MessageMapper:
async def _map_approval_request_content(self, content: Any, context: dict[str, Any]) -> dict[str, Any]:
"""Map FunctionApprovalRequestContent to custom event."""
# Parse arguments to ensure they're always a dict, not a JSON string
# This prevents double-escaping when the frontend calls JSON.stringify()
arguments: dict[str, Any] = {}
if hasattr(content, "function_call"):
if hasattr(content.function_call, "parse_arguments"):
# Use parse_arguments() to convert string arguments to dict
arguments = content.function_call.parse_arguments() or {}
else:
# Fallback to direct access if parse_arguments doesn't exist
arguments = getattr(content.function_call, "arguments", {})
return {
"type": "response.function_approval.requested",
"request_id": getattr(content, "id", "unknown"),
"function_call": {
"id": getattr(content.function_call, "call_id", "") if hasattr(content, "function_call") else "",
"name": getattr(content.function_call, "name", "") if hasattr(content, "function_call") else "",
"arguments": getattr(content.function_call, "arguments", {})
if hasattr(content, "function_call")
else {},
"arguments": arguments,
},
"item_id": context["item_id"],
"output_index": context["output_index"],
@@ -174,7 +174,7 @@ class DevServer:
@app.get("/v1/entities/{entity_id}/info", response_model=EntityInfo)
async def get_entity_info(entity_id: str) -> EntityInfo:
"""Get detailed information about a specific entity."""
"""Get detailed information about a specific entity (triggers lazy loading)."""
try:
executor = await self._ensure_executor()
entity_info = executor.get_entity_info(entity_id)
@@ -182,90 +182,96 @@ class DevServer:
if not entity_info:
raise HTTPException(status_code=404, detail=f"Entity {entity_id} not found")
# Trigger lazy loading if entity not yet loaded
# This will import the module and enrich metadata
entity_obj = await executor.entity_discovery.load_entity(entity_id)
# Get updated entity info (may have been enriched during load)
entity_info = executor.get_entity_info(entity_id) or entity_info
# For workflows, populate additional detailed information
if entity_info.type == "workflow":
entity_obj = executor.entity_discovery.get_entity_object(entity_id)
if entity_obj:
# Get workflow structure
workflow_dump = None
if hasattr(entity_obj, "to_dict") and callable(getattr(entity_obj, "to_dict", None)):
try:
workflow_dump = entity_obj.to_dict() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
elif hasattr(entity_obj, "to_json") and callable(getattr(entity_obj, "to_json", None)):
try:
raw_dump = entity_obj.to_json() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
else:
if isinstance(raw_dump, (bytes, bytearray)):
try:
raw_dump = raw_dump.decode()
except Exception:
raw_dump = raw_dump.decode(errors="replace")
if isinstance(raw_dump, str):
try:
parsed_dump = json.loads(raw_dump)
except Exception:
workflow_dump = raw_dump
else:
workflow_dump = parsed_dump if isinstance(parsed_dump, dict) else raw_dump
else:
workflow_dump = raw_dump
elif hasattr(entity_obj, "__dict__"):
workflow_dump = {k: v for k, v in entity_obj.__dict__.items() if not k.startswith("_")}
# Get input schema information
input_schema = {}
input_type_name = "Unknown"
start_executor_id = ""
if entity_info.type == "workflow" and entity_obj:
# Entity object already loaded by load_entity() above
# Get workflow structure
workflow_dump = None
if hasattr(entity_obj, "to_dict") and callable(getattr(entity_obj, "to_dict", None)):
try:
from ._utils import (
extract_executor_message_types,
generate_input_schema,
select_primary_input_type,
workflow_dump = entity_obj.to_dict() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
elif hasattr(entity_obj, "to_json") and callable(getattr(entity_obj, "to_json", None)):
try:
raw_dump = entity_obj.to_json() # type: ignore[attr-defined]
except Exception:
workflow_dump = None
else:
if isinstance(raw_dump, (bytes, bytearray)):
try:
raw_dump = raw_dump.decode()
except Exception:
raw_dump = raw_dump.decode(errors="replace")
if isinstance(raw_dump, str):
try:
parsed_dump = json.loads(raw_dump)
except Exception:
workflow_dump = raw_dump
else:
workflow_dump = parsed_dump if isinstance(parsed_dump, dict) else raw_dump
else:
workflow_dump = raw_dump
elif hasattr(entity_obj, "__dict__"):
workflow_dump = {k: v for k, v in entity_obj.__dict__.items() if not k.startswith("_")}
# Get input schema information
input_schema = {}
input_type_name = "Unknown"
start_executor_id = ""
try:
from ._utils import (
extract_executor_message_types,
generate_input_schema,
select_primary_input_type,
)
start_executor = entity_obj.get_start_executor()
except Exception as e:
logger.debug(f"Could not extract input info for workflow {entity_id}: {e}")
else:
if start_executor:
start_executor_id = getattr(start_executor, "executor_id", "") or getattr(
start_executor, "id", ""
)
start_executor = entity_obj.get_start_executor()
except Exception as e:
logger.debug(f"Could not extract input info for workflow {entity_id}: {e}")
else:
if start_executor:
start_executor_id = getattr(start_executor, "executor_id", "") or getattr(
start_executor, "id", ""
)
message_types = extract_executor_message_types(start_executor)
input_type = select_primary_input_type(message_types)
message_types = extract_executor_message_types(start_executor)
input_type = select_primary_input_type(message_types)
if input_type:
input_type_name = getattr(input_type, "__name__", str(input_type))
if input_type:
input_type_name = getattr(input_type, "__name__", str(input_type))
# Generate schema using comprehensive schema generation
input_schema = generate_input_schema(input_type)
# Generate schema using comprehensive schema generation
input_schema = generate_input_schema(input_type)
if not input_schema:
input_schema = {"type": "string"}
if input_type_name == "Unknown":
input_type_name = "string"
if not input_schema:
input_schema = {"type": "string"}
if input_type_name == "Unknown":
input_type_name = "string"
# Get executor list
executor_list = []
if hasattr(entity_obj, "executors") and entity_obj.executors:
executor_list = [getattr(ex, "executor_id", str(ex)) for ex in entity_obj.executors]
# Get executor list
executor_list = []
if hasattr(entity_obj, "executors") and entity_obj.executors:
executor_list = [getattr(ex, "executor_id", str(ex)) for ex in entity_obj.executors]
# Create copy of entity info and populate workflow-specific fields
update_payload: dict[str, Any] = {
"workflow_dump": workflow_dump,
"input_schema": input_schema,
"input_type_name": input_type_name,
"start_executor_id": start_executor_id,
}
if executor_list:
update_payload["executors"] = executor_list
return entity_info.model_copy(update=update_payload)
# Create copy of entity info and populate workflow-specific fields
update_payload: dict[str, Any] = {
"workflow_dump": workflow_dump,
"input_schema": input_schema,
"input_type_name": input_type_name,
"start_executor_id": start_executor_id,
}
if executor_list:
update_payload["executors"] = executor_list
return entity_info.model_copy(update=update_payload)
# For non-workflow entities, return as-is
return entity_info
@@ -276,70 +282,34 @@ class DevServer:
logger.error(f"Error getting entity info for {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to get entity info: {e!s}") from e
@app.post("/v1/entities/add")
async def add_entity(request: dict[str, Any]) -> dict[str, Any]:
"""Add entity from URL."""
@app.post("/v1/entities/{entity_id}/reload")
async def reload_entity(entity_id: str) -> dict[str, Any]:
"""Hot reload entity (clears cache, will reimport on next access).
This enables hot reload during development - edit entity code, call this endpoint,
and the next execution will use the updated code without server restart.
"""
try:
url = request.get("url")
metadata = request.get("metadata", {})
if not url:
raise HTTPException(status_code=400, detail="URL is required")
logger.info(f"Attempting to add entity from URL: {url}")
executor = await self._ensure_executor()
entity_info, error_msg = await executor.entity_discovery.fetch_remote_entity(url, metadata)
# Check if entity exists
entity_info = executor.get_entity_info(entity_id)
if not entity_info:
# Sanitize error message - only return safe, user-friendly errors
logger.error(f"Failed to fetch or validate entity from {url}: {error_msg}")
safe_error = error_msg if error_msg else "Failed to fetch or validate entity"
raise HTTPException(status_code=400, detail=safe_error)
raise HTTPException(status_code=404, detail=f"Entity {entity_id} not found")
logger.info(f"Successfully added entity: {entity_info.id}")
return {"success": True, "entity": entity_info.model_dump()}
# Invalidate cache
executor.entity_discovery.invalidate_entity(entity_id)
return {
"success": True,
"message": f"Entity '{entity_id}' cache cleared. Will reload on next access.",
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error adding entity: {e}", exc_info=True)
# Don't expose internal error details to client
raise HTTPException(
status_code=500, detail="An unexpected error occurred while adding the entity"
) from e
@app.delete("/v1/entities/{entity_id}")
async def remove_entity(entity_id: str) -> dict[str, Any]:
"""Remove entity by ID."""
try:
executor = await self._ensure_executor()
# Cleanup entity resources before removal
try:
entity_obj = executor.entity_discovery.get_entity_object(entity_id)
if entity_obj and hasattr(entity_obj, "chat_client"):
client = entity_obj.chat_client
if hasattr(client, "close") and callable(client.close):
if inspect.iscoroutinefunction(client.close):
await client.close()
else:
client.close()
logger.info(f"Closed client for entity: {entity_id}")
except Exception as e:
logger.warning(f"Error closing entity {entity_id} during removal: {e}")
# Remove entity from registry
success = executor.entity_discovery.remove_remote_entity(entity_id)
if success:
return {"success": True}
raise HTTPException(status_code=404, detail="Entity not found or cannot be removed")
except HTTPException:
raise
except Exception as e:
logger.error(f"Error removing entity {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to remove entity: {e!s}") from e
logger.error(f"Error reloading entity {entity_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to reload entity: {e!s}") from e
@app.post("/v1/responses")
async def create_response(request: AgentFrameworkRequest, raw_request: Request) -> Any:
@@ -31,8 +31,7 @@ class EntityInfo(BaseModel):
metadata: dict[str, Any] = Field(default_factory=dict)
# Source information
source: str = "directory" # "directory", "in_memory", "remote_gallery"
original_url: str | None = None
source: str = "directory" # "directory" or "in_memory"
# Environment variable requirements
required_env_vars: list[EnvVarRequirement] | None = None
@@ -38,11 +38,16 @@ class ResponseTraceEventComplete(BaseModel):
class ResponseFunctionResultComplete(BaseModel):
"""Custom DevUI event for function execution results.
"""DevUI extension: Stream function execution results.
This is a DevUI extension - OpenAI doesn't stream function execution results
because in their model, the application executes functions, not the API.
Agent Framework executes functions, so we emit this event for debugging visibility.
This is a DevUI extension because:
- OpenAI Responses API doesn't stream function results (clients execute functions)
- Agent Framework executes functions server-side, so we stream results for debugging visibility
- ResponseFunctionToolCallOutputItem exists in OpenAI SDK but isn't in ResponseOutputItem union
(it's for Conversations API input, not Responses API streaming output)
This event provides the same structure as OpenAI's function output items but wrapped
in a custom event type since standard events don't support streaming function results.
"""
type: Literal["response.function_result.complete"] = "response.function_result.complete"
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
File diff suppressed because one or more lines are too long
@@ -5,8 +5,8 @@
<link rel="icon" type="image/svg+xml" href="/agentframework.svg" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Agent Framework Dev UI</title>
<script type="module" crossorigin src="/assets/index-ZIs_B0ln.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-BhFnsoso.css">
<script type="module" crossorigin src="/assets/index-DmL7WSFa.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-CE4pGoXh.css">
</head>
<body>
<div id="root"></div>