Files
Eduard van Valkenburg 1e350ea22f Python: [BREAKING] PR2 — Wire context provider pipeline, remove old types, update all consumers (#3850)
* PR2: Wire context provider pipeline and update all internal consumers

- Replace AgentThread with AgentSession across all packages
- Replace ContextProvider with BaseContextProvider across all packages
- Replace context_provider param with context_providers (Sequence)
- Replace thread= with session= in run() signatures
- Replace get_new_thread() with create_session()
- Add get_session(service_session_id) to agent interface
- DurableAgentThread -> DurableAgentSession
- Remove _notify_thread_of_new_messages from WorkflowAgent
- Wire before_run/after_run context provider pipeline in RawAgent
- Auto-inject InMemoryHistoryProvider when no providers configured

* fix: update all tests for context provider pipeline, fix lazy-loaders, remove old test files

* refactor: update all sample files for context provider pipeline (AgentThread→AgentSession, ContextProvider→BaseContextProvider)

* fix: update remaining ag-ui references (client docstring, getting_started sample)

* fix: make get_session service_session_id keyword-only to avoid confusion with session_id

* refactor: rename _RunContext.thread_messages to session_messages

* refactor: remove _threads.py, _memory.py, and old provider files; migrate devui to use plain message lists

* rename: remove _new_ prefix from test files

* refactor: rewrite SlidingWindowChatMessageStore as SlidingWindowHistoryProvider(InMemoryHistoryProvider)

* fix: read full history from session state directly instead of reaching into provider internals

* fix: update stale .pyi stubs, sample imports, and README references for new provider types

* fix: remove stale message_store, _notify_thread_of_new_messages, and session_id.key references in samples

* refactor: merge context_providers and sessions sample folders into sessions, remove aggregate_context_provider

* refactor: UserInfoMemory stores state in session.state instead of instance attributes

* feat: add Pydantic BaseModel support to session state serialization

Pydantic models stored in session.state are now automatically serialized
via model_dump() and restored via model_validate() during to_dict()/from_dict()
round-trips. Models are auto-registered on first serialization; use
register_state_type() for cold-start deserialization.

Also export register_state_type as a public API.

* fix mem0

* Update sample README links and descriptions for session terminology

- Replace 'thread' with 'session' in sample descriptions across all READMEs
- Update file links for renamed samples (mem0_sessions, redis_sessions, etc.)
- Fix Threads section → Sessions section in main samples/README.md
- Update tools, middleware, workflows, durabletask, azure_functions READMEs
- Update architecture diagrams in concepts/tools/README.md
- Update migration guides (autogen, semantic-kernel)

* Fix broken Redis README link to renamed sample

* Fix Mem0 OSS client search: pass scoping params as direct kwargs

AsyncMemory (OSS) expects user_id/agent_id/run_id as direct kwargs,
while AsyncMemoryClient (Platform) expects them in a filters dict.
Adds tests for both client types.

Port of fix from #3844 to new Mem0ContextProvider.

* Fix rebase issues: restore missing _conversation_state.py and checkpoint decode logic

- Add back _conversation_state.py (encode/decode_chat_messages) lost in rebase
- Fix on_checkpoint_restore to decode cache/conversation with decode_chat_messages
- Fix on_checkpoint_restore to use decode_checkpoint_value for pending requests
- Add tests/workflow/__init__.py for relative import support
- Fix test_agent_executor checkpoint selection (checkpoints[1] not superstep)

* Add STORES_BY_DEFAULT ClassVar to skip redundant InMemoryHistoryProvider injection

Chat clients that store history server-side by default (OpenAI Responses API,
Azure AI Agent) now declare STORES_BY_DEFAULT = True. The agent checks this
during auto-injection and skips InMemoryHistoryProvider unless the user
explicitly sets store=False.

* Fix broken markdown links in azure_ai and redis READMEs

* Fix getting-started samples to use session API instead of removed thread/ContextProvider API

* updates to workflow as agent

* fix group chat import

* Rename Thread→Session throughout, fix service_session_id propagation, remove stale AGUIThread

- Fix: Propagate conversation_id from ChatResponse back to session.service_session_id
  in both streaming and non-streaming paths in _agents.py
- Rename AgentThreadException → AgentSessionException
- Remove stale AGUIThread from ag_ui lazy-loader
- Rename use_service_thread → use_service_session in ag-ui package
- Rename test functions from *_thread_* to *_session_*
- Rename sample files from *_thread* to *_session*
- Update docstrings and comments: thread → session
- Update _mcp.py kwargs filter: add 'session' alongside 'thread'
- Fix ContinuationToken docstring example: thread=thread → session=session
- Fix _clients.py docstring: 'Agent threads' → 'Agent sessions'

* Fix broken markdown links after thread→session file renames

* fix azure ai test
2026-02-12 21:00:32 +00:00

257 lines
9.8 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Redis Context Provider: Basic usage and agent integration
This example demonstrates how to use the Redis context provider to persist and
retrieve conversational memory for agents. It covers three progressively more
realistic scenarios:
1) Standalone provider usage ("basic cache")
- Write messages to Redis and retrieve relevant context using full-text or
hybrid vector search.
2) Agent + provider
- Connect the provider to an agent so the agent can store user preferences
and recall them across turns.
3) Agent + provider + tool memory
- Expose a simple tool to the agent, then verify that details from the tool
outputs are captured and retrievable as part of the agent's memory.
Requirements:
- A Redis instance with RediSearch enabled (e.g., Redis Stack)
- agent-framework with the Redis extra installed: pip install "agent-framework-redis"
- Optionally an OpenAI API key if enabling embeddings for hybrid search
Run:
python redis_basics.py
"""
import asyncio
import os
from agent_framework import Message, tool
from agent_framework.openai import OpenAIChatClient
from agent_framework.redis import RedisContextProvider
from redisvl.extensions.cache.embeddings import EmbeddingsCache
from redisvl.utils.vectorize import OpenAITextVectorizer
# NOTE: approval_mode="never_require" is for sample brevity. Use "always_require" in production; see samples/02-agents/tools/function_tool_with_approval.py and samples/02-agents/tools/function_tool_with_approval_and_sessions.py.
@tool(approval_mode="never_require")
def search_flights(origin_airport_code: str, destination_airport_code: str, detailed: bool = False) -> str:
"""Simulated flight-search tool to demonstrate tool memory.
The agent can call this function, and the returned details can be stored
by the Redis context provider. We later ask the agent to recall facts from
these tool results to verify memory is working as expected.
"""
# Minimal static catalog used to simulate a tool's structured output
flights = {
("JFK", "LAX"): {
"airline": "SkyJet",
"duration": "6h 15m",
"price": 325,
"cabin": "Economy",
"baggage": "1 checked bag",
},
("SFO", "SEA"): {
"airline": "Pacific Air",
"duration": "2h 5m",
"price": 129,
"cabin": "Economy",
"baggage": "Carry-on only",
},
("LHR", "DXB"): {
"airline": "EuroWings",
"duration": "6h 50m",
"price": 499,
"cabin": "Business",
"baggage": "2 bags included",
},
}
route = (origin_airport_code.upper(), destination_airport_code.upper())
if route not in flights:
return f"No flights found between {origin_airport_code} and {destination_airport_code}"
flight = flights[route]
if not detailed:
return f"Flights available from {origin_airport_code} to {destination_airport_code}."
return (
f"{flight['airline']} operates flights from {origin_airport_code} to {destination_airport_code}. "
f"Duration: {flight['duration']}. "
f"Price: ${flight['price']}. "
f"Cabin: {flight['cabin']}. "
f"Baggage policy: {flight['baggage']}."
)
async def main() -> None:
"""Walk through provider-only, agent integration, and tool-memory scenarios.
Helpful debugging (uncomment when iterating):
- print(await provider.redis_index.info())
- print(await provider.search_all())
"""
print("1. Standalone provider usage:")
print("-" * 40)
# Create a provider with partition scope and OpenAI embeddings
# Please set the OPENAI_API_KEY and OPENAI_CHAT_MODEL_ID environment variables to use the OpenAI vectorizer
# Recommend default for OPENAI_CHAT_MODEL_ID is gpt-4o-mini
# We attach an embedding vectorizer so the provider can perform hybrid (text + vector)
# retrieval. If you prefer text-only retrieval, instantiate RedisContextProvider without the
# 'vectorizer' and vector_* parameters.
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
# The provider manages persistence and retrieval. application_id/agent_id/user_id
# scope data for multi-tenant separation; thread_id (set later) narrows to a
# specific conversation.
provider = RedisContextProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
# Build sample chat messages to persist to Redis
messages = [
Message("user", ["runA CONVO: User Message"]),
Message("assistant", ["runA CONVO: Assistant Message"]),
Message("system", ["runA CONVO: System Message"]),
]
# Use the provider's before_run/after_run API to store and retrieve messages.
# In practice, the agent handles this automatically; this shows the low-level API.
from agent_framework import AgentSession, SessionContext
session = AgentSession(session_id="runA")
context = SessionContext()
context.extend_messages("input", messages)
state = session.state
# Store messages via after_run
await provider.after_run(agent=None, session=session, context=context, state=state)
# Retrieve relevant memories via before_run
query_context = SessionContext()
query_context.extend_messages("input", [Message("system", ["B: Assistant Message"])])
await provider.before_run(agent=None, session=session, context=query_context, state=state)
# Inspect retrieved memories that would be injected into instructions
# (Debug-only output so you can verify retrieval works as expected.)
print("Before Run Result:")
print(query_context)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
# --- Agent + provider: teach and recall a preference ---
print("\n2. Agent + provider: teach and recall a preference")
print("-" * 40)
# Fresh provider for the agent demo (recreates index)
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
# Recreate a clean index so the next scenario starts fresh
provider = RedisContextProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics_2",
prefix="context_2",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
# Create chat client for the agent
client = OpenAIChatClient(model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
# Create agent wired to the Redis context provider. The provider automatically
# persists conversational details and surfaces relevant context on each turn.
agent = client.as_agent(
name="MemoryEnhancedAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context"
),
tools=[],
context_providers=[provider],
)
# Teach a user preference; the agent writes this to the provider's memory
query = "Remember that I enjoy glugenflorgle"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Ask the agent to recall the stored preference; it should retrieve from memory
query = "What do I enjoy?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
# --- Agent + provider + tool: store and recall tool-derived context ---
print("\n3. Agent + provider + tool: store and recall tool-derived context")
print("-" * 40)
# Text-only provider (full-text search only). Omits vectorizer and related params.
provider = RedisContextProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics_3",
prefix="context_3",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
)
# Create agent exposing the flight search tool. Tool outputs are captured by the
# provider and become retrievable context for later turns.
client = OpenAIChatClient(model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
agent = client.as_agent(
name="MemoryEnhancedAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context"
),
tools=search_flights,
context_providers=[provider],
)
# Invoke the tool; outputs become part of memory/context
query = "Are there any flights from new york city (jfk) to la? Give me details"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Verify the agent can recall tool-derived context
query = "Which flight did I ask about?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
if __name__ == "__main__":
asyncio.run(main())