Files
agent-framework/python/samples/04-hosting/azure_functions/03_reliable_streaming
T
Eduard van Valkenburg 5e056b672e Python: [BREAKING] Python: Provider-leading client design & OpenAI package extraction (#4818)
* Python: Provider-leading client design & OpenAI package extraction

Major refactoring of the Python Agent Framework client architecture:

- Extract OpenAI clients into new `agent-framework-openai` package
- Core package no longer depends on openai, azure-identity, azure-ai-projects
- Rename clients for discoverability: OpenAIResponsesClient → OpenAIChatClient,
  OpenAIChatClient → OpenAIChatCompletionClient
- Unify `model_id`/`deployment_name`/`model_deployment_name` → `model` param
- New FoundryChatClient for Azure AI Foundry Responses API
- New FoundryAgent/FoundryAgentClient for connecting to pre-configured Foundry agents
- Remove OpenAIBase/OpenAIConfigMixin from non-deprecated client MRO
- Deprecate AzureOpenAI* clients, AzureAIClient, OpenAIAssistantsClient
- Reorganize samples: azure_openai+azure_ai+azure_ai_agent → azure/
- ADR-0020: Provider-Leading Client Design

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: missing Agent imports in samples, .model_id → .model in foundry_local sample

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: CI failures — mypy errors, coverage targets, sample imports

- azure-ai mypy: add type ignores for TypedDict total=, model arg, forward ref
- Coverage: replace core.azure/openai targets with openai package target
- project_provider: add type annotation for opts dict

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix: populate openai .pyi stub, fix broken README links, coverage targets

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fixes

* updated observabilitty

* reset azure init.pyi

* fix errors

* updated adr number

* fix foundry local

* fixed not renamed docstrings and comments, and added deprecated markers to old classes

* fix tests and pyprojects

* fix test vars

* updated function tests

* update durable

* updated test setup for functions

* Fix Foundry auth in workflow samples

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Stabilize Python integration workflows

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Update hosting samples for Foundry

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Trigger full CI rerun

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Trigger CI rerun again

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* trigger rerun

* trigger rerun

* fix for litellm

* undo durabletask changes

* Move Foundry APIs into foundry namespace

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Foundry pyproject formatting

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Split provider samples by Foundry surface

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Restore hosting sample requirements

Also fix the Foundry Local sample link after the provider sample move.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updated tests

* udpated foundry integration tests

* removed dist from azurefunctions tests

* Use separate Foundry clients for concurrent agents

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix client setup in azfunc and durable

* disabled two tests

* updated setup for some function and durable tests

* improved azure openai setup with new clients

* ignore deprecated

* fixes

* skip 11

* remove openai assistants int tests

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
5e056b672e · 2026-03-25 09:56:29 +00:00
History
..

Agent Response Callbacks with Redis Streaming

This sample demonstrates how to use Redis Streams with agent response callbacks to enable reliable, resumable streaming for durable agents. Clients can disconnect and reconnect without losing messages by using cursor-based pagination.

Key Concepts Demonstrated

  • Using AgentResponseCallbackProtocol to capture streaming agent responses
  • Persisting streaming chunks to Redis Streams for reliable delivery
  • Building a custom HTTP endpoint to read from Redis with Server-Sent Events (SSE) format
  • Supporting cursor-based resumption for disconnected clients
  • Managing Redis client lifecycle with async context managers

Prerequisites

In addition to the common setup steps in ../README.md, this sample requires Redis:

# Start Redis
docker run -d --name redis -p 6379:6379 redis:latest

Update local.settings.json with your Redis connection string:

{
  "Values": {
    "REDIS_CONNECTION_STRING": "redis://localhost:6379"
  }
}

Running the Sample

Start the agent run

The agent executes in the background via durable orchestration. The RedisStreamCallback persists streaming chunks to Redis:

curl -X POST http://localhost:7071/api/agents/TravelPlanner/run \
  -H "Content-Type: text/plain" \
  -d "Plan a 3-day trip to Tokyo"

Response (202 Accepted):

{
  "status": "accepted",
  "response": "Agent request accepted",
  "conversation_id": "abc-123-def-456",
  "correlation_id": "xyz-789"
}

Stream the response from Redis

Use the custom /api/agent/stream/{conversation_id} endpoint to read persisted chunks:

curl http://localhost:7071/api/agent/stream/abc-123-def-456 \
  -H "Accept: text/event-stream"

Response (SSE format):

id: 1734649123456-0
event: message
data: Here's a wonderful 3-day Tokyo itinerary...

id: 1734649123789-0
event: message
data: Day 1: Arrival and Shibuya...

id: 1734649124012-0
event: done
data: [DONE]

Resume from a cursor

Use a cursor ID from an SSE event to skip already-processed messages:

curl "http://localhost:7071/api/agent/stream/abc-123-def-456?cursor=1734649123456-0" \
  -H "Accept: text/event-stream"

How It Works

1. Redis Callback

The RedisStreamCallback class implements AgentResponseCallbackProtocol to capture streaming updates:

class RedisStreamCallback(AgentResponseCallbackProtocol):
    async def on_streaming_response_update(self, update, context):
        # Write chunk to Redis Stream
        async with await get_stream_handler() as handler:
            await handler.write_chunk(thread_id, update.text, sequence)

    async def on_agent_response(self, response, context):
        # Write end-of-stream marker
        async with await get_stream_handler() as handler:
            await handler.write_completion(thread_id, sequence)

2. Custom Streaming Endpoint

The /api/agent/stream/{conversation_id} endpoint reads from Redis:

@app.route(route="agent/stream/{conversation_id}", methods=["GET"])
async def stream(req):
    conversation_id = req.route_params.get("conversation_id")
    cursor = req.params.get("cursor")  # Optional

    async with await get_stream_handler() as handler:
        async for chunk in handler.read_stream(conversation_id, cursor):
            # Format and return chunks

3. Redis Streams

Messages are stored in Redis Streams with automatic TTL (default: 10 minutes):

Stream Key: agent-stream:{conversation_id}
Entry: {
  "text": "chunk content",
  "sequence": "0",
  "timestamp": "1734649123456"
}