* Python: Provider-leading client design & OpenAI package extraction Major refactoring of the Python Agent Framework client architecture: - Extract OpenAI clients into new `agent-framework-openai` package - Core package no longer depends on openai, azure-identity, azure-ai-projects - Rename clients for discoverability: OpenAIResponsesClient → OpenAIChatClient, OpenAIChatClient → OpenAIChatCompletionClient - Unify `model_id`/`deployment_name`/`model_deployment_name` → `model` param - New FoundryChatClient for Azure AI Foundry Responses API - New FoundryAgent/FoundryAgentClient for connecting to pre-configured Foundry agents - Remove OpenAIBase/OpenAIConfigMixin from non-deprecated client MRO - Deprecate AzureOpenAI* clients, AzureAIClient, OpenAIAssistantsClient - Reorganize samples: azure_openai+azure_ai+azure_ai_agent → azure/ - ADR-0020: Provider-Leading Client Design Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: missing Agent imports in samples, .model_id → .model in foundry_local sample Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: CI failures — mypy errors, coverage targets, sample imports - azure-ai mypy: add type ignores for TypedDict total=, model arg, forward ref - Coverage: replace core.azure/openai targets with openai package target - project_provider: add type annotation for opts dict Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: populate openai .pyi stub, fix broken README links, coverage targets Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fixes * updated observabilitty * reset azure init.pyi * fix errors * updated adr number * fix foundry local * fixed not renamed docstrings and comments, and added deprecated markers to old classes * fix tests and pyprojects * fix test vars * updated function tests * update durable * updated test setup for functions * Fix Foundry auth in workflow samples Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Stabilize Python integration workflows Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Update hosting samples for Foundry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Trigger full CI rerun Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Trigger CI rerun again Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * trigger rerun * trigger rerun * fix for litellm * undo durabletask changes * Move Foundry APIs into foundry namespace Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix Foundry pyproject formatting Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Split provider samples by Foundry surface Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Restore hosting sample requirements Also fix the Foundry Local sample link after the provider sample move. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * updated tests * udpated foundry integration tests * removed dist from azurefunctions tests * Use separate Foundry clients for concurrent agents Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix client setup in azfunc and durable * disabled two tests * updated setup for some function and durable tests * improved azure openai setup with new clients * ignore deprecated * fixes * skip 11 * remove openai assistants int tests --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Single Agent with Reliable Streaming
This sample demonstrates how to use Redis Streams with agent response callbacks to enable reliable, resumable streaming for durable agents. Streaming responses are persisted to Redis, allowing clients to disconnect and reconnect without losing messages.
Key Concepts Demonstrated
- Using
AgentResponseCallbackProtocolto capture streaming agent responses. - Persisting streaming chunks to Redis Streams for reliable delivery.
- Non-blocking agent execution with
options={"wait_for_response": False}(fire-and-forget mode). - Cursor-based resumption for disconnected clients.
- Decoupling agent execution from response streaming.
Prerequisites
In addition to the common setup in the parent README.md, this sample requires Redis:
docker run -d --name redis -p 6379:6379 redis:latest
Environment Setup
See the README.md file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies.
Additional environment variables for this sample:
# Optional: Redis Configuration
REDIS_CONNECTION_STRING=redis://localhost:6379
REDIS_STREAM_TTL_MINUTES=10
Running the Sample
With the environment setup, you can run the sample using the combined approach or separate worker and client processes:
Option 1: Combined (Recommended for Testing)
cd samples/04-hosting/durabletask/03_single_agent_streaming
python sample.py
Option 2: Separate Processes
Start the worker in one terminal:
python worker.py
In a new terminal, run the client:
python client.py
The client will send a travel planning request to the TravelPlanner agent and stream the response from Redis in real-time:
================================================================================
TravelPlanner Agent - Redis Streaming Demo
================================================================================
You: Plan a 3-day trip to Tokyo with emphasis on culture and food
TravelPlanner (streaming from Redis):
--------------------------------------------------------------------------------
# Your Amazing 3-Day Tokyo Adventure! 🗾
Let me create the perfect cultural and culinary journey through Tokyo...
## Day 1: Traditional Tokyo & First Impressions
...
(continues streaming)
...
✓ Response complete!
How It Works
Redis Streaming Callback
The RedisStreamCallback class implements AgentResponseCallbackProtocol to capture streaming updates and persist them to Redis:
class RedisStreamCallback(AgentResponseCallbackProtocol):
async def on_streaming_response_update(self, update, context):
# Write chunk to Redis Stream
async with await get_stream_handler() as handler:
await handler.write_chunk(thread_id, update.text, sequence)
async def on_agent_response(self, response, context):
# Write end-of-stream marker
async with await get_stream_handler() as handler:
await handler.write_completion(thread_id, sequence)
Worker Registration
The worker registers the agent with the Redis streaming callback:
redis_callback = RedisStreamCallback()
agent_worker = DurableAIAgentWorker(worker, callback=redis_callback)
agent_worker.add_agent(create_travel_agent())
Client Streaming
The client uses fire-and-forget mode to start the agent and streams from Redis:
# Start agent run with wait_for_response=False for non-blocking execution
travel_planner.run(user_message, thread=thread, options={"wait_for_response": False})
# Stream response from Redis while the agent is processing
async with await get_stream_handler() as stream_handler:
async for chunk in stream_handler.read_stream(thread_id):
if chunk.text:
print(chunk.text, end="", flush=True)
elif chunk.is_done:
break
Fire-and-Forget Mode: Use options={"wait_for_response": False} to enable non-blocking execution. The run() method signals the agent and returns immediately, allowing the client to stream from Redis without blocking.
Cursor-Based Resumption
Clients can resume streaming from any point after disconnection:
cursor = "1734649123456-0" # Entry ID from previous stream
async with await get_stream_handler() as stream_handler:
async for chunk in stream_handler.read_stream(thread_id, cursor=cursor):
# Process chunk
Viewing Agent State
You can view the state of the TravelPlanner agent in the Durable Task Scheduler dashboard:
- Open your browser and navigate to
http://localhost:8082 - In the dashboard, you can view:
- The state of the TravelPlanner agent entity (dafx-TravelPlanner)
- Conversation history and current state
- How the durable agents extension manages conversation context with streaming