Files
agent-framework/python/samples/04-hosting/durabletask/03_single_agent_streaming
T
Copilot b05fc9e849 Python: Add load_dotenv() to samples for .env file support (#4043)
* Initial plan

* Add load_dotenv() to 303 Python samples for environment variable loading

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update SAMPLE_GUIDELINES.md to document load_dotenv() requirement

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Update samples README.md to document .env file usage

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Run ruff format on all changed sample files

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Clarify load_dotenv() usage in README - local dev vs production

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove deprecated getting_started folder as requested

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Document env_file_path parameter for per-client configuration

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge main branch to resolve conflicts

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py file that was empty in merge commit

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove dotnet changes from merge - out of scope for this PR

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove package and test changes from merge - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove test_func_utils.py - only sample changes needed

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert sample files not in original changeset - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Move load_dotenv() outside snippet tag in 06_host_your_agent.py

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix comment placement - move load_dotenv before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix load_dotenv() placement across all samples - after docstring, before code comments

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Merge latest main branch with load_dotenv changes

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Remove non-sample changes from merge - keep only load_dotenv additions

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Revert non-load_dotenv sample changes from merge

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Fix run_evaluation.py - use main's improved version (file already had load_dotenv)

Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>

* Manual update

* Manual update 2

* Fix Role usage and load_dotenv placement per PR review feedback

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix Role usage - use string literals not enum attributes

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix SAMPLE_GUIDELINES.md example - load_dotenv before docstring per guidance

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Move load_dotenv() before docstrings in all samples per SAMPLE_GUIDELINES ordering

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address PR review: rename files, fix placement, add session usage, remove note

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Update Redis README to reference renamed file redis_history_provider.py

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: TaoChenOSU <12570346+TaoChenOSU@users.noreply.github.com>
Co-authored-by: Tao Chen <taochen@microsoft.com>
Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>
Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
b05fc9e849 · 2026-02-19 10:55:13 +00:00
History
..

Single Agent with Reliable Streaming

This sample demonstrates how to use Redis Streams with agent response callbacks to enable reliable, resumable streaming for durable agents. Streaming responses are persisted to Redis, allowing clients to disconnect and reconnect without losing messages.

Key Concepts Demonstrated

  • Using AgentResponseCallbackProtocol to capture streaming agent responses.
  • Persisting streaming chunks to Redis Streams for reliable delivery.
  • Non-blocking agent execution with options={"wait_for_response": False} (fire-and-forget mode).
  • Cursor-based resumption for disconnected clients.
  • Decoupling agent execution from response streaming.

Prerequisites

In addition to the common setup in the parent README.md, this sample requires Redis:

docker run -d --name redis -p 6379:6379 redis:latest

Environment Setup

See the README.md file in the parent directory for more information on how to configure the environment, including how to install and run common sample dependencies.

Additional environment variables for this sample:

# Optional: Redis Configuration
REDIS_CONNECTION_STRING=redis://localhost:6379
REDIS_STREAM_TTL_MINUTES=10

Running the Sample

With the environment setup, you can run the sample using the combined approach or separate worker and client processes:

Option 1: Combined (Recommended for Testing)

cd samples/04-hosting/durabletask/03_single_agent_streaming
python sample.py

Option 2: Separate Processes

Start the worker in one terminal:

python worker.py

In a new terminal, run the client:

python client.py

The client will send a travel planning request to the TravelPlanner agent and stream the response from Redis in real-time:

================================================================================
TravelPlanner Agent - Redis Streaming Demo
================================================================================

You: Plan a 3-day trip to Tokyo with emphasis on culture and food

TravelPlanner (streaming from Redis):
--------------------------------------------------------------------------------
# Your Amazing 3-Day Tokyo Adventure! 🗾

Let me create the perfect cultural and culinary journey through Tokyo...

## Day 1: Traditional Tokyo & First Impressions
...
(continues streaming)
...

✓ Response complete!

How It Works

Redis Streaming Callback

The RedisStreamCallback class implements AgentResponseCallbackProtocol to capture streaming updates and persist them to Redis:

class RedisStreamCallback(AgentResponseCallbackProtocol):
    async def on_streaming_response_update(self, update, context):
        # Write chunk to Redis Stream
        async with await get_stream_handler() as handler:
            await handler.write_chunk(thread_id, update.text, sequence)

    async def on_agent_response(self, response, context):
        # Write end-of-stream marker
        async with await get_stream_handler() as handler:
            await handler.write_completion(thread_id, sequence)

Worker Registration

The worker registers the agent with the Redis streaming callback:

redis_callback = RedisStreamCallback()
agent_worker = DurableAIAgentWorker(worker, callback=redis_callback)
agent_worker.add_agent(create_travel_agent())

Client Streaming

The client uses fire-and-forget mode to start the agent and streams from Redis:

# Start agent run with wait_for_response=False for non-blocking execution
travel_planner.run(user_message, thread=thread, options={"wait_for_response": False})

# Stream response from Redis while the agent is processing
async with await get_stream_handler() as stream_handler:
    async for chunk in stream_handler.read_stream(thread_id):
        if chunk.text:
            print(chunk.text, end="", flush=True)
        elif chunk.is_done:
            break

Fire-and-Forget Mode: Use options={"wait_for_response": False} to enable non-blocking execution. The run() method signals the agent and returns immediately, allowing the client to stream from Redis without blocking.

Cursor-Based Resumption

Clients can resume streaming from any point after disconnection:

cursor = "1734649123456-0"  # Entry ID from previous stream
async with await get_stream_handler() as stream_handler:
    async for chunk in stream_handler.read_stream(thread_id, cursor=cursor):
        # Process chunk

Viewing Agent State

You can view the state of the TravelPlanner agent in the Durable Task Scheduler dashboard:

  1. Open your browser and navigate to http://localhost:8082
  2. In the dashboard, you can view:
    • The state of the TravelPlanner agent entity (dafx-TravelPlanner)
    • Conversation history and current state
    • How the durable agents extension manages conversation context with streaming