* Fix _deduplicate_messages catch-all branch dropping valid repeated messages (#4682) Remove the catch-all dedup branch that used (role, hash(content_str)) as a dedup key. This incorrectly treated any two messages with the same role and identical content as duplicates, dropping valid repeated messages (e.g., a user saying 'yes' to confirm two separate things). The tool-specific dedup branches (tool results by call_id, assistant tool calls by call_id tuple) remain unchanged as they correctly identify true protocol-level duplicates. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address review: consecutive-duplicate detection for non-tool messages (#4682) - Replace blanket dedup removal with consecutive-duplicate detection: only skip a message if the immediately preceding message has the same role and content, preserving protection against upstream replays while allowing identical messages at different conversation points. - Strengthen test assertions to verify message identity and order, not just list length. - Add tests for consecutive duplicate skipping, non-consecutive preservation, and messages with contents=None. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Apply pre-commit auto-fixes * Use message_id for deduplication instead of content hashing Deduplicate general messages by message_id when available, replacing the consecutive-duplicate content check. Two messages with the same id are definitively the same message (upstream replay), while identical content with distinct ids (e.g. repeated "yes" confirmations) is preserved. Messages without a message_id are always kept. * Fix message_id dedup: truthy check, content-hash fallback, log safety - Use truthy check (`if msg.message_id`) instead of `is not None` so empty-string IDs fall through to content-hash dedup rather than collapsing unrelated messages. - Add content-hash fallback for messages without message_id, preventing false negatives from integrations that don't set IDs. - Remove raw message_id from log format string (addresses log-injection surface with control characters). - Add tests for empty-string message_id edge cases. - Update existing tests to reflect content-hash dedup behavior. Fixes #4682 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <copilot@github.com> Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Agent Framework AG-UI Integration
AG-UI protocol integration for Agent Framework, enabling seamless integration with AG-UI's web interface and streaming protocol.
Installation
pip install agent-framework-ag-ui
Quick Start
Server (Host an AI Agent)
from fastapi import FastAPI
from agent_framework import Agent
from agent_framework.azure import AzureOpenAIChatClient
from agent_framework.ag_ui import add_agent_framework_fastapi_endpoint
# Create your agent
agent = Agent(
name="my_agent",
instructions="You are a helpful assistant.",
client=AzureOpenAIChatClient(
endpoint="https://your-resource.openai.azure.com/",
deployment_name="gpt-4o-mini",
api_key="your-api-key",
),
)
# Create FastAPI app and add AG-UI endpoint
app = FastAPI()
add_agent_framework_fastapi_endpoint(app, agent, "/")
# Run with: uvicorn main:app --reload
Server (Host a Workflow)
from fastapi import FastAPI
from agent_framework import WorkflowBuilder, WorkflowContext, executor
from agent_framework.ag_ui import add_agent_framework_fastapi_endpoint
@executor(id="start")
async def start(message: str, ctx: WorkflowContext) -> None:
await ctx.yield_output(f"Workflow received: {message}")
workflow = WorkflowBuilder(start_executor=start).build()
app = FastAPI()
add_agent_framework_fastapi_endpoint(app, workflow, "/")
Server (Thread-Scoped WorkflowBuilder)
Use workflow_factory when your workflow keeps runtime state (for example pending request_info interrupts) and must be isolated per AG-UI thread:
from fastapi import FastAPI
from agent_framework import Workflow, WorkflowBuilder
from agent_framework.ag_ui import AgentFrameworkWorkflow, add_agent_framework_fastapi_endpoint
def build_workflow_for_thread(thread_id: str) -> Workflow:
# Build a fresh workflow instance for each thread id.
return WorkflowBuilder(start_executor=...).build()
app = FastAPI()
thread_scoped_workflow = AgentFrameworkWorkflow(
workflow_factory=build_workflow_for_thread,
name="my_workflow",
)
add_agent_framework_fastapi_endpoint(app, thread_scoped_workflow, "/")
Client (Connect to an AG-UI Server)
import asyncio
from agent_framework.ag_ui import AGUIChatClient
async def main():
async with AGUIChatClient(endpoint="http://localhost:8000/") as client:
# Stream responses
async for update in client.get_response("Hello!", stream=True):
for content in update.contents:
if content.type == "text" and content.text:
print(content.text, end="", flush=True)
print()
asyncio.run(main())
The AGUIChatClient supports:
- Streaming and non-streaming responses
- Hybrid tool execution (client-side + server-side tools)
- Automatic thread management for conversation continuity
- Integration with
Agentfor client-side history management - Interrupt metadata passthrough (
availableInterruptsandresume)
Documentation
- Getting Started Tutorial - Step-by-step guide to building AG-UI servers and clients
- Server setup with FastAPI
- Client examples using
AGUIChatClient - Hybrid tool execution (client-side + server-side)
- Thread management and conversation continuity
- Examples - Complete examples for AG-UI features
Features
This integration supports all 7 AG-UI features:
- Agentic Chat: Basic streaming chat with tool calling support
- Backend Tool Rendering: Tools executed on backend with results streamed to client
- Human in the Loop: Function approval requests for user confirmation before tool execution
- Agentic Generative UI: Async tools for long-running operations with progress updates
- Tool-based Generative UI: Custom UI components rendered on frontend based on tool calls
- Shared State: Bidirectional state sync between client and server
- Predictive State Updates: Stream tool arguments as optimistic state updates during execution
Additional compatibility and draft support:
- Native
Workflowendpoint registration viaadd_agent_framework_fastapi_endpoint(...) - Workflow-to-AG-UI event mapping (run/step/activity/tool/custom events)
- Custom event compatibility for inbound
CUSTOM,CUSTOM_EVENT, andcustom_event - Pragmatic multimodal input parsing for both legacy (
binary) and draft media-part shapes - Pragmatic interrupt/resume handling (
availableInterrupts,resume, andRUN_FINISHED.interrupt)
Security: Authentication & Authorization
The AG-UI endpoint does not enforce authentication by default. For production deployments, you should add authentication using FastAPI's dependency injection system via the dependencies parameter.
API Key Authentication Example
import os
from fastapi import Depends, FastAPI, HTTPException, Security
from fastapi.security import APIKeyHeader
from agent_framework import Agent
from agent_framework.ag_ui import add_agent_framework_fastapi_endpoint
# Configure API key authentication
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
EXPECTED_API_KEY = os.environ.get("AG_UI_API_KEY")
async def verify_api_key(api_key: str | None = Security(API_KEY_HEADER)) -> None:
"""Verify the API key provided in the request header."""
if not api_key or api_key != EXPECTED_API_KEY:
raise HTTPException(status_code=401, detail="Invalid or missing API key")
# Create agent and app
agent = Agent(name="my_agent", instructions="...", client=...)
app = FastAPI()
# Register endpoint WITH authentication
add_agent_framework_fastapi_endpoint(
app,
agent,
"/",
dependencies=[Depends(verify_api_key)], # Authentication enforced here
)
Other Authentication Options
The dependencies parameter accepts any FastAPI dependency, enabling integration with:
- OAuth 2.0 / OpenID Connect - Use
fastapi.security.OAuth2PasswordBearer - JWT Tokens - Validate tokens with libraries like
python-jose - Azure AD / Entra ID - Use
azure-identityfor Microsoft identity platform - Rate Limiting - Add request throttling dependencies
- Custom Authentication - Implement your organization's auth requirements
For a complete authentication example, see getting_started/server.py.
Architecture
The package uses a clean, orchestrator-based architecture:
- AgentFrameworkAgent: Lightweight wrapper that delegates to orchestrators
- Orchestrators: Handle different execution flows (default, human-in-the-loop, etc.)
- Confirmation Strategies: Domain-specific confirmation messages (extensible)
- AgentFrameworkEventBridge: Converts Agent Framework events to AG-UI events
- Message Adapters: Bidirectional conversion between AG-UI and Agent Framework message formats
- FastAPI Endpoint: Streaming HTTP endpoint with Server-Sent Events (SSE)
Next Steps
- New to AG-UI? Start with the Getting Started Tutorial
- Want to see examples? Check out the Examples for AG-UI features
License
MIT