Python: add RedisContextProvider (#716)

* Setting up

* Readme

* Add redis tests path to all-tests

* First pass integration

* Keep provider convention

* First pass integration

* add redis integration tests

* update README.md

* Add basic sample for redis integration

* Add partitioning, add partition-aware tests, improve sample script

* Fix code quality check

* Try to resolve pytest check

* Try to identify if pytest is the cause of failed checks

* Re-enable tests

* Rename redis test file

* Removing some tests to narrow down issue

* Revert, no difference

* Delete temp files

* Starting refactor of RedisProvider

* Build dynamic schema builder, still need to do dynamic embedding model config

* Add scope control

* Complete first pass functionality with OpenAI + HF vectors -> Tests, Samples, Demo to follow

* Fix code quality

* attempt to identify rootcause of failed test

* attempt to identify rootcause of failed test

* Attempt to resolve code quality fail

* Update pyproject.toml for foundry to pin     azure-ai-projects == 1.1.0b3,azure-ai-agents == 1.2.0b3

* Add tests for redisprovider

* Remove invalid tests

* Add API key handling for openai vectorizer

* Update uv.locl

* Use master uv.lock

* Begin sample file, add lazy index creation, fix faulty override

* Index drop and reinit depends on drop_redis_index not overwrite

* Add samples, threading included, escaped queries, verify threading works, sample README.md

* Refactor filters

* Opinionated vars

* Allow filter expression combination

* Try inline stubs for mypy

* Address mypy errors

* Better docstrings, tweaks for feedback

* Tweak example 3 in redis_threads.py sample

* adjust confusing name

* Enrich docstrings

* Add descriptions and comments to samples, externalize vectorizer choice, remove nltk and sentencetransformers dependnecy

* Add descriptions and comments to samples, externalize vectorizer choice, remove nltk and sentencetransformers dependnecy

* Incorporate initial feedback from dmytrostruk

* Fix uv.lock

* Attempt to resolve conflict

* Use remote .tomls

* Sanity check

* fix tests

* Remove hardcoded API key from samples

* Fix incorrect env var

* Make add and redis_search private

* Fix tests relying on private funcs

* Expand tests

* Explainer comments to each test

* Add a 'get_conversation_history' function to RedisProvider - This just returns messages in sequential order. Added 'created_at_*' timestamps to facilitate sequential recovery. function has to be manually invoked by user

* Add agent-framework-redis to  python/pyproject.toml

* Remove get_conversation_history

* improve redis context provider with pydantic techniques and safe index handling patterns

* add RedisChatMessageStore

* remove integration test :(

* fix mypy error

* Remove unused params

* Redo schema validation to be order-invariant, handle attrs (previously throwing errors due to strict ==)

* Expand explanation

* Add ChatMessageStore example

* Fix comments in redis_conversation.py

* Resolving uv.lock conflict, update to match main

* Fix test in redis provider

* Apply suggestion from @ekzhu

* Update python/packages/main/pyproject.toml

---------

Co-authored-by: Tyler Hutcherson <tyler.hutcherson@redis.com>
Co-authored-by: Eric Zhu <ekzhu@users.noreply.github.com>
This commit is contained in:
Han
2025-09-23 08:36:27 +08:00
committed by GitHub
Unverified
parent 1ef24d3e91
commit ca810076e8
23 changed files with 3406 additions and 10 deletions
@@ -6,6 +6,7 @@ from abc import ABC, abstractmethod
from collections.abc import MutableSequence, Sequence
from contextlib import AsyncExitStack
from types import TracebackType
from typing import ClassVar
from ._pydantic import AFBaseModel
from ._types import ChatMessage, Contents
@@ -47,6 +48,11 @@ class ContextProvider(AFBaseModel, ABC):
just before invocation.
"""
# Default prompt to be used by all context providers when assembling memories/instructions
DEFAULT_CONTEXT_PROMPT: ClassVar[str] = (
"## Memories\nConsider the following memories when answering user questions:"
)
async def thread_created(self, thread_id: str | None) -> None:
"""Called just after a new thread is created.
@@ -0,0 +1,24 @@
# Copyright (c) Microsoft. All rights reserved.
import importlib
from typing import Any
PACKAGE_NAME = "agent_framework_redis"
PACKAGE_EXTRA = "redis"
_IMPORTS = ["__version__", "RedisProvider", "RedisChatMessageStore"]
def __getattr__(name: str) -> Any:
if name in _IMPORTS:
try:
return getattr(importlib.import_module(PACKAGE_NAME), name)
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
f"The '{PACKAGE_EXTRA}' extra is not installed, "
f"please do `pip install agent-framework[{PACKAGE_EXTRA}]`"
) from exc
raise AttributeError(f"Module {PACKAGE_NAME} has no attribute {name}.")
def __dir__() -> list[str]:
return _IMPORTS
@@ -0,0 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework_redis import RedisChatMessageStore, RedisProvider, __version__
__all__ = ["RedisChatMessageStore", "RedisProvider", "__version__"]
+4
View File
@@ -44,6 +44,9 @@ azure = [
foundry = [
"agent-framework-foundry"
]
redis = [
"agent-framework-redis"
]
viz = [
"graphviz>=0.20.0",
]
@@ -61,6 +64,7 @@ all = [
"agent-framework-foundry",
"agent-framework-runtime",
"agent-framework-mem0",
"agent-framework-redis",
"agent-framework-devui"
]
@@ -2,7 +2,7 @@
import sys
from collections.abc import MutableSequence, Sequence
from typing import Any, Final
from typing import Any
from agent_framework import ChatMessage, Context, ContextProvider, TextContent
from agent_framework.exceptions import ServiceInitializationError
@@ -15,9 +15,6 @@ else:
from typing_extensions import Self # pragma: no cover
DEFAULT_CONTEXT_PROMPT: Final[str] = "## Memories\nConsider the following memories when answering user questions:"
class Mem0Provider(ContextProvider):
mem0_client: AsyncMemoryClient
api_key: str | None = None
@@ -26,7 +23,7 @@ class Mem0Provider(ContextProvider):
thread_id: str | None = None
user_id: str | None = None
scope_to_per_operation_thread_id: bool = False
context_prompt: str = DEFAULT_CONTEXT_PROMPT
context_prompt: str = ContextProvider.DEFAULT_CONTEXT_PROMPT
_should_close_client: bool = PrivateAttr(default=False) # Track whether we should close client connection
@@ -38,7 +35,7 @@ class Mem0Provider(ContextProvider):
thread_id: str | None = None,
user_id: str | None = None,
scope_to_per_operation_thread_id: bool = False,
context_prompt: str = DEFAULT_CONTEXT_PROMPT,
context_prompt: str = ContextProvider.DEFAULT_CONTEXT_PROMPT,
mem0_client: AsyncMemoryClient | None = None,
) -> None:
"""Initializes a new instance of the Mem0Provider class.
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
+53
View File
@@ -0,0 +1,53 @@
# Get Started with Microsoft Agent Framework Redis
Please install this package as the extra for `agent-framework`:
```bash
pip install agent-framework[redis]
```
## Components
### Memory Context Provider
The `RedisProvider` enables persistent context & memory capabilities for your agents, allowing them to remember user preferences and conversation context across sessions and threads.
#### Basic Usage Examples
Review the set of [getting started examples](../../samples/getting_started/context_providers/redis/README.md) for using the Redis context provider.
### Redis Chat Message Store
The `RedisChatMessageStore` provides persistent conversation storage using Redis Lists, enabling chat history to survive application restarts and support distributed applications.
#### Key Features
- **Persistent Storage**: Messages survive application restarts
- **Thread Isolation**: Each conversation thread has its own Redis key
- **Message Limits**: Configurable automatic trimming of old messages
- **Serialization Support**: Full compatibility with Agent Framework thread serialization
- **Production Ready**: Connection pooling, error handling, and performance optimized
#### Basic Usage Examples
See the complete [Redis chat message store examples](../../samples/getting_started/threads/redis_chat_message_store_thread.py) including:
- User session management
- Conversation persistence across restarts
- Thread serialization and deserialization
- Automatic message trimming
- Error handling patterns
### Installing and running Redis
You have 3 options to set-up Redis:
#### Option A: Local Redis with Docker
```bash
docker run --name redis -p 6379:6379 -d redis:8.0.3
```
#### Option B: Redis Cloud
Get a free db at https://redis.io/cloud/
#### Option C: Azure Managed Redis
Here's a quickstart guide to create **Azure Managed Redis** for as low as $12 monthly: https://learn.microsoft.com/en-us/azure/redis/quickstart-create-managed-redis
@@ -0,0 +1,16 @@
# Copyright (c) Microsoft. All rights reserved.
import importlib.metadata
from ._chat_message_store import RedisChatMessageStore
from ._provider import RedisProvider
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
__all__ = [
"RedisChatMessageStore",
"RedisProvider",
"__version__",
]
@@ -0,0 +1,507 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import json
from collections.abc import Sequence
from typing import Any
from uuid import uuid4
import redis.asyncio as redis
from agent_framework import ChatMessage
from agent_framework._pydantic import AFBaseModel
class RedisStoreState(AFBaseModel):
"""State model for serializing and deserializing Redis chat message store data."""
thread_id: str
redis_url: str | None = None
key_prefix: str = "chat_messages"
max_messages: int | None = None
class RedisChatMessageStore:
"""Redis-backed implementation of ChatMessageStore using Redis Lists.
This implementation provides persistent, thread-safe chat message storage using Redis Lists.
Messages are stored as JSON-serialized strings in chronological order, with each conversation
thread isolated by a unique Redis key.
Key Features:
============
- **Persistent Storage**: Messages survive application restarts and crashes
- **Thread Isolation**: Each conversation thread has its own Redis key namespace
- **Auto Message Limits**: Configurable automatic trimming of old messages using LTRIM
- **Performance Optimized**: Uses native Redis operations for efficiency
- **State Serialization**: Full compatibility with Agent Framework thread serialization
- **Initial Message Support**: Pre-load conversations with existing message history
- **Production Ready**: Atomic operations, error handling, connection pooling
Redis Operations:
- RPUSH: Add messages to the end of the list (chronological order)
- LRANGE: Retrieve messages in chronological order
- LTRIM: Maintain message limits by trimming old messages
- DELETE: Clear all messages for a thread
"""
def __init__(
self,
redis_url: str | None = None,
thread_id: str | None = None,
key_prefix: str = "chat_messages",
max_messages: int | None = None,
messages: Sequence[ChatMessage] | None = None,
) -> None:
"""Initialize the Redis chat message store.
Creates a Redis-backed chat message store for a specific conversation thread.
The store will automatically create a Redis connection and manage message
persistence using Redis List operations.
Args:
redis_url: Redis connection URL (e.g., "redis://localhost:6379").
Required for establishing Redis connection.
thread_id: Unique identifier for this conversation thread.
If not provided, a UUID will be auto-generated.
This becomes part of the Redis key: {key_prefix}:{thread_id}
key_prefix: Prefix for Redis keys to namespace different applications.
Defaults to 'chat_messages'. Useful for multi-tenant scenarios.
max_messages: Maximum number of messages to retain in Redis.
When exceeded, oldest messages are automatically trimmed using LTRIM.
None means unlimited storage.
messages: Initial messages to pre-populate the conversation.
These are added to Redis on first access if the Redis key is empty.
Useful for resuming conversations or seeding with context.
Raises:
ValueError: If redis_url is None (Redis connection is required).
redis.ConnectionError: If unable to connect to Redis server.
"""
# Validate required parameters
if redis_url is None:
raise ValueError("redis_url is required for Redis connection")
# Store configuration
self.redis_url = redis_url
self.thread_id = thread_id or f"thread_{uuid4()}"
self.key_prefix = key_prefix
self.max_messages = max_messages
# Initialize Redis client with connection pooling and async support
self._redis_client = redis.from_url(redis_url, decode_responses=True) # type: ignore[no-untyped-call]
# Handle initial messages (will be moved to Redis on first access)
self._initial_messages = list(messages) if messages else []
self._initial_messages_added = False
@property
def redis_key(self) -> str:
"""Get the Redis key for this thread's messages.
The key format is: {key_prefix}:{thread_id}
Returns:
Redis key string used for storing this thread's messages.
Example:
For key_prefix="chat_messages" and thread_id="user_123_session_456":
Returns "chat_messages:user_123_session_456"
"""
return f"{self.key_prefix}:{self.thread_id}"
async def _ensure_initial_messages_added(self) -> None:
"""Ensure initial messages are added to Redis if not already present.
This method is called before any Redis operations to guarantee that
initial messages provided during construction are persisted to Redis.
"""
if not self._initial_messages or self._initial_messages_added:
return
# Check if Redis key already has messages (prevents duplicate additions)
existing_count = await self._redis_client.llen(self.redis_key) # type: ignore[misc] # type: ignore[misc]
if existing_count == 0:
# Add initial messages using atomic pipeline operation
await self._add_redis_messages(self._initial_messages)
# Mark as completed and free memory
self._initial_messages_added = True
self._initial_messages.clear()
async def _add_redis_messages(self, messages: Sequence[ChatMessage]) -> None:
"""Add multiple messages to Redis using atomic pipeline operation.
This internal method efficiently adds multiple messages to the Redis list
using a single atomic transaction to ensure consistency.
Args:
messages: Sequence of ChatMessage objects to add to Redis.
"""
if not messages:
return
# Pre-serialize all messages for efficient pipeline operation
serialized_messages = [self._serialize_message(message) for message in messages]
# Use Redis pipeline for atomic batch operation
async with self._redis_client.pipeline(transaction=True) as pipe:
for serialized_message in serialized_messages:
await pipe.rpush(self.redis_key, serialized_message) # type: ignore[misc]
await pipe.execute()
async def add_messages(self, messages: Sequence[ChatMessage]) -> None:
"""Add messages to the Redis store (ChatMessageStore protocol method).
This method implements the required ChatMessageStore protocol for adding messages.
Messages are appended to the Redis list in chronological order, with automatic
trimming if message limits are configured.
Args:
messages: Sequence of ChatMessage objects to add to the store.
Can be empty (no-op) or contain multiple messages.
Thread Safety:
- Atomic pipeline ensures all messages are added together
- LTRIM operation is atomic for consistent message limits
Example:
```python
messages = [ChatMessage(role=Role.USER, text="Hello"), ChatMessage(role=Role.ASSISTANT, text="Hi there!")]
await store.add_messages(messages)
```
"""
if not messages:
return
# Ensure any initial messages are persisted first
await self._ensure_initial_messages_added()
# Add new messages using atomic pipeline operation
await self._add_redis_messages(messages)
# Apply message limit if configured (automatic cleanup)
if self.max_messages is not None:
current_count = await self._redis_client.llen(self.redis_key) # type: ignore[misc]
if current_count > self.max_messages:
# Keep only the most recent max_messages using LTRIM
await self._redis_client.ltrim(self.redis_key, -self.max_messages, -1) # type: ignore[misc]
async def list_messages(self) -> list[ChatMessage]:
"""Get all messages from the store in chronological order (ChatMessageStore protocol method).
This method implements the required ChatMessageStore protocol for retrieving messages.
Returns all messages stored in Redis, ordered from oldest (index 0) to newest (index -1).
Returns:
List of ChatMessage objects in chronological order (oldest first).
Returns empty list if no messages exist or if Redis connection fails.
Example:
```python
# Get all conversation history
messages = await store.list_messages()
```
"""
# Ensure any initial messages are persisted to Redis first
await self._ensure_initial_messages_added()
messages = []
# Retrieve all messages from Redis list (oldest to newest)
redis_messages = await self._redis_client.lrange(self.redis_key, 0, -1) # type: ignore[misc]
if redis_messages:
for serialized_message in redis_messages:
# Deserialize each JSON message back to ChatMessage
message = self._deserialize_message(serialized_message)
messages.append(message)
return messages
async def serialize_state(self, **kwargs: Any) -> Any:
"""Serialize the current store state for persistence (ChatMessageStore protocol method).
This method implements the required ChatMessageStore protocol for state serialization.
Captures the Redis connection configuration and thread information needed to
reconstruct the store and reconnect to the same conversation data.
Args:
**kwargs: Additional arguments passed to Pydantic model_dump() for serialization.
Common options: exclude_none=True, by_alias=True
Returns:
Dictionary containing serialized store configuration that can be persisted
to databases, files, or other storage mechanisms.
"""
state = RedisStoreState(
thread_id=self.thread_id,
redis_url=self.redis_url,
key_prefix=self.key_prefix,
max_messages=self.max_messages,
)
return state.model_dump(**kwargs)
async def deserialize_state(self, serialized_store_state: Any, **kwargs: Any) -> None:
"""Deserialize state data into this store instance (ChatMessageStore protocol method).
This method implements the required ChatMessageStore protocol for state deserialization.
Restores the store configuration from previously serialized data, allowing the store
to reconnect to the same conversation data in Redis.
Args:
serialized_store_state: Previously serialized state data from serialize_state().
Should be a dictionary with thread_id, redis_url, etc.
**kwargs: Additional arguments passed to Pydantic model validation.
"""
if not serialized_store_state:
return
# Validate and parse the serialized state using Pydantic
state = RedisStoreState.model_validate(serialized_store_state, **kwargs)
# Update store configuration from deserialized state
self.thread_id = state.thread_id
if state.redis_url is not None:
self.redis_url = state.redis_url
self.key_prefix = state.key_prefix
self.max_messages = state.max_messages
# Recreate Redis client if the URL changed
if state.redis_url and state.redis_url != getattr(self, "_last_redis_url", None):
self._redis_client = redis.from_url(state.redis_url, decode_responses=True) # type: ignore[no-untyped-call]
self._last_redis_url = state.redis_url
# Reset initial message state since we're connecting to existing data
self._initial_messages_added = False
async def clear(self) -> None:
"""Remove all messages from the store.
Permanently deletes all messages for this conversation thread by removing
the Redis key. This operation cannot be undone.
Warning:
- This permanently deletes all conversation history
- Consider exporting messages before clearing if backup is needed
Example:
```python
# Clear conversation history
await store.clear()
# Verify messages are gone
messages = await store.list_messages()
assert len(messages) == 0
```
"""
await self._redis_client.delete(self.redis_key)
def _serialize_message(self, message: ChatMessage) -> str:
"""Serialize a ChatMessage to JSON string.
Args:
message: ChatMessage to serialize.
Returns:
JSON string representation of the message.
"""
# Convert ChatMessage to dictionary using Pydantic serialization
message_dict = message.model_dump()
# Serialize to compact JSON (no extra whitespace for Redis efficiency)
return json.dumps(message_dict, separators=(",", ":"))
def _deserialize_message(self, serialized_message: str) -> ChatMessage:
"""Deserialize a JSON string to ChatMessage.
Args:
serialized_message: JSON string representation of a message.
Returns:
ChatMessage object.
"""
# Parse JSON string back to dictionary
message_dict = json.loads(serialized_message)
# Reconstruct ChatMessage using Pydantic validation
return ChatMessage.model_validate(message_dict)
# ============================================================================
# List-like Convenience Methods (Redis-optimized async versions)
# ============================================================================
def __bool__(self) -> bool:
"""Return True since the store always exists once created.
This method is called by Python's truthiness checks (if store:).
Since a RedisChatMessageStore instance always represents a valid store,
this always returns True.
Returns:
Always True - the store exists and is ready for operations.
Note:
This is used by the Agent Framework to check if a message store
is configured: `if thread.message_store:`
"""
return True
async def __len__(self) -> int:
"""Return the number of messages in the Redis store.
Provides efficient message counting using Redis LLEN command.
This is the async equivalent of Python's built-in len() function.
Returns:
The count of messages currently stored in Redis.
"""
await self._ensure_initial_messages_added()
return await self._redis_client.llen(self.redis_key) # type: ignore[misc,no-any-return]
async def getitem(self, index: int) -> ChatMessage:
"""Get a message by index using Redis LINDEX.
Args:
index: The index of the message to retrieve.
Returns:
The ChatMessage at the specified index.
Raises:
IndexError: If the index is out of range.
"""
await self._ensure_initial_messages_added()
# Use Redis LINDEX for efficient single-item access
serialized_message = await self._redis_client.lindex(self.redis_key, index) # type: ignore[misc]
if serialized_message is None:
raise IndexError("list index out of range")
return self._deserialize_message(serialized_message)
async def setitem(self, index: int, item: ChatMessage) -> None:
"""Set a message at the specified index using Redis LSET.
Args:
index: The index at which to set the message.
item: The ChatMessage to set at the specified index.
Raises:
IndexError: If the index is out of range.
"""
await self._ensure_initial_messages_added()
# Validate index exists using LLEN
current_count = await self._redis_client.llen(self.redis_key) # type: ignore[misc]
if index < 0:
index = current_count + index
if index < 0 or index >= current_count:
raise IndexError("list index out of range")
# Use Redis LSET for efficient single-item update
serialized_message = self._serialize_message(item)
await self._redis_client.lset(self.redis_key, index, serialized_message) # type: ignore[misc]
async def append(self, item: ChatMessage) -> None:
"""Append a message to the end of the store.
Args:
item: The ChatMessage to append.
"""
await self.add_messages([item])
async def count(self) -> int:
"""Return the number of messages in the Redis store.
Returns:
The count of messages currently stored in Redis.
"""
await self._ensure_initial_messages_added()
return await self._redis_client.llen(self.redis_key) # type: ignore[misc,no-any-return]
async def index(self, item: ChatMessage) -> int:
"""Return the index of the first occurrence of the specified message.
Uses Redis LINDEX to iterate through the list without loading all messages.
Still O(N) but more memory efficient for large lists.
Args:
item: The ChatMessage to find.
Returns:
The index of the first occurrence of the message.
Raises:
ValueError: If the message is not found in the store.
"""
await self._ensure_initial_messages_added()
target_serialized = self._serialize_message(item)
list_length = await self._redis_client.llen(self.redis_key) # type: ignore[misc]
# Iterate through Redis list using LINDEX
for i in range(list_length):
redis_message = await self._redis_client.lindex(self.redis_key, i) # type: ignore[misc]
if redis_message == target_serialized:
return i
raise ValueError("ChatMessage not found in store")
async def remove(self, item: ChatMessage) -> None:
"""Remove the first occurrence of the specified message from the store.
Uses Redis LREM command for efficient removal by value.
O(N) but performed natively in Redis without data transfer.
Args:
item: The ChatMessage to remove.
Raises:
ValueError: If the message is not found in the store.
"""
await self._ensure_initial_messages_added()
# Serialize the message to match Redis storage format
target_serialized = self._serialize_message(item)
# Use LREM to remove first occurrence (count=1)
removed_count = await self._redis_client.lrem(self.redis_key, 1, target_serialized) # type: ignore[misc]
if removed_count == 0:
raise ValueError("ChatMessage not found in store")
async def extend(self, items: Sequence[ChatMessage]) -> None:
"""Extend the store by appending all messages from the iterable.
Args:
items: Sequence of ChatMessage objects to append.
"""
await self.add_messages(items)
async def ping(self) -> bool:
"""Test the Redis connection.
Returns:
True if the connection is successful, False otherwise.
"""
try:
await self._redis_client.ping() # type: ignore[misc]
return True
except Exception:
return False
async def aclose(self) -> None:
"""Close the Redis connection.
This method provides a clean way to close the underlying Redis connection
when the store is no longer needed. This is particularly useful in samples
and applications where explicit resource cleanup is desired.
"""
await self._redis_client.aclose() # type: ignore[misc]
def __repr__(self) -> str:
"""String representation of the store."""
return (
f"RedisChatMessageStore(thread_id='{self.thread_id}', "
f"redis_key='{self.redis_key}', max_messages={self.max_messages})"
)
@@ -0,0 +1,547 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import sys
from collections.abc import MutableSequence, Sequence
from functools import reduce
from operator import and_
from typing import Any, Literal, cast
from agent_framework import ChatMessage, Context, ContextProvider, Role, TextContent
from agent_framework.exceptions import (
ServiceInitializationError,
ServiceInvalidRequestError,
)
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
else:
from typing_extensions import Self # pragma: no cover
import json
import numpy as np
from redisvl.index import AsyncSearchIndex
from redisvl.query import FilterQuery, HybridQuery, TextQuery
from redisvl.query.filter import FilterExpression, Tag
from redisvl.utils.token_escaper import TokenEscaper
from redisvl.utils.vectorize import BaseVectorizer
class RedisProvider(ContextProvider):
"""Redis context provider with dynamic, filterable schema.
Stores context in Redis and retrieves scoped context.
Uses full-text or optional hybrid vector search to ground model responses.
"""
# Connection and indexing
redis_url: str = "redis://localhost:6379"
index_name: str = "context"
prefix: str = "context"
# Redis vectorizer configuration (optional, injected by client)
redis_vectorizer: BaseVectorizer | None = None
vector_field_name: str | None = None
vector_algorithm: Literal["flat", "hnsw"] | None = None
vector_distance_metric: Literal["cosine", "ip", "l2"] | None = None
# Partition fields (indexed for filtering)
application_id: str | None = None
agent_id: str | None = None
user_id: str | None = None
thread_id: str | None = None
scope_to_per_operation_thread_id: bool = False
# Prompt and runtime
context_prompt: str = ContextProvider.DEFAULT_CONTEXT_PROMPT
redis_index: Any = None
overwrite_index: bool = False
_per_operation_thread_id: str | None = None
_token_escaper: TokenEscaper = TokenEscaper()
_conversation_id: str | None = None
_index_initialized: bool = False
_schema_dict: dict[str, Any] | None = None
def model_post_init(self, __context: Any) -> None:
"""Post-initialization hook to set up computed fields after Pydantic initialization.
This is called automatically by Pydantic after the model is initialized.
"""
# Create Redis index using the cached schema_dict property
self.redis_index = AsyncSearchIndex.from_dict(self.schema_dict, redis_url=self.redis_url, validate_on_load=True)
@property
def schema_dict(self) -> dict[str, Any]:
"""Get the Redis schema dictionary, computing and caching it on first access."""
if self._schema_dict is None:
# Get vector configuration from vectorizer if available
vector_dims = self.redis_vectorizer.dims if self.redis_vectorizer is not None else None
vector_datatype = self.redis_vectorizer.dtype if self.redis_vectorizer is not None else None
self._schema_dict = self._build_schema_dict(
index_name=self.index_name,
prefix=self.prefix,
vector_field_name=self.vector_field_name,
vector_dims=vector_dims,
vector_datatype=vector_datatype,
vector_algorithm=self.vector_algorithm,
vector_distance_metric=self.vector_distance_metric,
)
return self._schema_dict
def _build_filter_from_dict(self, filters: dict[str, str | None]) -> Any | None:
"""Builds a combined filter expression from simple equality tags.
This ANDs non-empty tag filters and is used to scope all operations to app/agent/user/thread partitions.
Args:
filters: Mapping of field name to value; falsy values are ignored.
Returns:
A combined filter expression or None if no filters are provided.
"""
parts = [Tag(k) == v for k, v in filters.items() if v]
return reduce(and_, parts) if parts else None
def _build_schema_dict(
self,
*,
index_name: str,
prefix: str,
vector_field_name: str | None,
vector_dims: int | None,
vector_datatype: str | None,
vector_algorithm: Literal["flat", "hnsw"] | None,
vector_distance_metric: Literal["cosine", "ip", "l2"] | None,
) -> dict[str, Any]:
"""Builds the RediSearch schema configuration dictionary.
Defines text and tag fields for messages plus an optional vector field enabling KNN/hybrid search.
Args:
index_name: Index name.
prefix: Key prefix.
vector_field_name: Vector field name or None.
vector_dims: Vector dimensionality or None.
vector_datatype: Vector datatype or None.
vector_algorithm: Vector index algorithm or None.
vector_distance_metric: Vector distance metric or None.
Returns:
Dict representing the index and fields configuration.
"""
fields: list[dict[str, Any]] = [
{"name": "role", "type": "tag"},
{"name": "mime_type", "type": "tag"},
{"name": "content", "type": "text"},
# Conversation tracking
{"name": "conversation_id", "type": "tag"},
{"name": "message_id", "type": "tag"},
{"name": "author_name", "type": "tag"},
# Partition fields (TAG for fast filtering)
{"name": "application_id", "type": "tag"},
{"name": "agent_id", "type": "tag"},
{"name": "user_id", "type": "tag"},
{"name": "thread_id", "type": "tag"},
]
# Add vector field only if configured (keeps provider runnable with no params)
if vector_field_name is not None and vector_dims is not None:
fields.append({
"name": vector_field_name,
"type": "vector",
"attrs": {
"algorithm": (vector_algorithm or "hnsw"),
"dims": int(vector_dims),
"distance_metric": (vector_distance_metric or "cosine"),
"datatype": (vector_datatype or "float32"),
},
})
return {
"index": {
"name": index_name,
"prefix": prefix,
"key_separator": ":",
"storage_type": "hash",
},
"fields": fields,
}
async def _ensure_index(self) -> None:
"""Initialize the search index.
- Connect to existing index if it exists and schema matches
- Create new index if it doesn't exist
- Overwrite if requested via overwrite_index=True
- Validate schema compatibility to prevent accidental data loss
"""
if self._index_initialized:
return
# Check if index already exists
index_exists = await self.redis_index.exists()
if not self.overwrite_index and index_exists:
# Validate schema compatibility before connecting
await self._validate_schema_compatibility()
# Create the index (will connect to existing or create new)
await self.redis_index.create(overwrite=self.overwrite_index, drop=False)
self._index_initialized = True
async def _validate_schema_compatibility(self) -> None:
"""Validate that existing index schema matches current configuration.
Raises ServiceInitializationError if schemas don't match, with helpful guidance.
self._build_schema_dict returns a minimal schema while Redis returns an expanded
schema with all defaults filled in. To compare for incompatibilities, compare
significant parts of the schema by creating signatures with normalized default values.
"""
# Defaults for attr normalization
TAG_DEFAULTS = {"separator": ",", "case_sensitive": False, "withsuffixtrie": False}
TEXT_DEFAULTS = {"weight": 1.0, "no_stem": False}
def _significant_index(i: dict[str, Any]) -> dict[str, Any]:
return {k: i.get(k) for k in ("name", "prefix", "key_separator", "storage_type")}
def _sig_tag(attrs: dict[str, Any] | None) -> dict[str, Any]:
a = {**TAG_DEFAULTS, **(attrs or {})}
return {k: a[k] for k in ("separator", "case_sensitive", "withsuffixtrie")}
def _sig_text(attrs: dict[str, Any] | None) -> dict[str, Any]:
a = {**TEXT_DEFAULTS, **(attrs or {})}
return {k: a[k] for k in ("weight", "no_stem")}
def _sig_vector(attrs: dict[str, Any] | None) -> dict[str, Any]:
a = {**(attrs or {})}
# Require these to exist if vector field is present
return {k: a.get(k) for k in ("algorithm", "dims", "distance_metric", "datatype")}
def _schema_signature(schema: dict[str, Any]) -> dict[str, Any]:
# Order-independent, minimal signature
sig: dict[str, Any] = {"index": _significant_index(schema.get("index", {})), "fields": {}}
for f in schema.get("fields", []):
name, ftype = f.get("name"), f.get("type")
if not name:
continue
if ftype == "tag":
sig["fields"][name] = {"type": "tag", "attrs": _sig_tag(f.get("attrs"))}
elif ftype == "text":
sig["fields"][name] = {"type": "text", "attrs": _sig_text(f.get("attrs"))}
elif ftype == "vector":
sig["fields"][name] = {"type": "vector", "attrs": _sig_vector(f.get("attrs"))}
else:
# Unknown field types: compare by type only
sig["fields"][name] = {"type": ftype}
return sig
existing_index = await AsyncSearchIndex.from_existing(self.index_name, redis_url=self.redis_url)
existing_schema = existing_index.schema.to_dict()
current_schema = self.schema_dict
existing_sig = _schema_signature(existing_schema)
current_sig = _schema_signature(current_schema)
if existing_sig != current_sig:
# Add sigs to error message
raise ServiceInitializationError(
"Existing Redis index schema is incompatible with the current configuration.\n"
f"Existing (significant): {json.dumps(existing_sig, indent=2, sort_keys=True)}\n"
f"Current (significant): {json.dumps(current_sig, indent=2, sort_keys=True)}\n"
"Set overwrite_index=True to rebuild if this change is intentional."
)
async def _add(
self,
*,
data: dict[str, Any] | list[dict[str, Any]],
metadata: dict[str, Any] | None = None,
) -> None:
"""Inserts one or many documents with partition fields populated.
Fills default partition fields, optionally embeds content when configured, and loads documents in a batch.
Args:
data: Single document or list of documents to insert.
metadata: Optional metadata dictionary (unused placeholder).
Raises:
ServiceInvalidRequestError: If required fields are missing or invalid.
"""
# Ensure provider has at least one scope set (symmetry with Mem0Provider)
self._validate_filters()
await self._ensure_index()
docs = data if isinstance(data, list) else [data]
prepared: list[dict[str, Any]] = []
for doc in docs:
d = dict(doc) # shallow copy
# Partition defaults
d.setdefault("application_id", self.application_id)
d.setdefault("agent_id", self.agent_id)
d.setdefault("user_id", self.user_id)
d.setdefault("thread_id", self._effective_thread_id)
# Conversation defaults
d.setdefault("conversation_id", self._conversation_id)
# Logical requirement
if "content" not in d:
raise ServiceInvalidRequestError("add() requires a 'content' field in data")
# Vector field requirement (only if schema has one)
if self.vector_field_name:
d.setdefault(self.vector_field_name, None)
prepared.append(d)
# Batch embed contents for every message
if self.redis_vectorizer and self.vector_field_name:
text_list = [d["content"] for d in prepared]
embeddings = await self.redis_vectorizer.aembed_many(text_list, batch_size=len(text_list))
for i, d in enumerate(prepared):
vec = np.asarray(embeddings[i], dtype=np.float32).tobytes()
field_name: str = self.vector_field_name
d[field_name] = vec
# Load all at once if supported
await self.redis_index.load(prepared)
return
async def _redis_search(
self,
text: str,
*,
text_scorer: str = "BM25STD",
filter_expression: Any | None = None,
return_fields: list[str] | None = None,
num_results: int = 10,
alpha: float = 0.7,
) -> list[dict[str, Any]]:
"""Runs a text or hybrid vector-text search with optional filters.
Builds a TextQuery or HybridQuery and automatically ANDs partition filters to keep results scoped and safe.
Args:
text: Query text.
text_scorer: Scorer to use for text ranking.
filter_expression: Additional filter expression to AND with partition filters.
return_fields: Fields to return in results.
num_results: Maximum number of results.
alpha: Hybrid balancing parameter when vectors are enabled.
Returns:
List of result dictionaries.
Raises:
ServiceInvalidRequestError: If input is invalid or the query fails.
"""
# Enforce presence of at least one provider-level filter (symmetry with Mem0Provider)
await self._ensure_index()
self._validate_filters()
q = (text or "").strip()
if not q:
raise ServiceInvalidRequestError("text_search() requires non-empty text")
num_results = max(int(num_results or 10), 1)
combined_filter = self._build_filter_from_dict({
"application_id": self.application_id,
"agent_id": self.agent_id,
"user_id": self.user_id,
"thread_id": self._effective_thread_id,
"conversation_id": self._conversation_id,
})
if filter_expression is not None:
combined_filter = (combined_filter & filter_expression) if combined_filter else filter_expression
# Choose return fields
return_fields = (
return_fields
if return_fields is not None
else ["content", "role", "application_id", "agent_id", "user_id", "thread_id"]
)
try:
if self.redis_vectorizer and self.vector_field_name:
# Build hybrid query: combine full-text and vector similarity
vector = await self.redis_vectorizer.aembed(q)
query = HybridQuery(
text=q,
text_field_name="content",
vector=vector,
vector_field_name=self.vector_field_name,
text_scorer=text_scorer,
filter_expression=combined_filter,
alpha=alpha,
dtype=self.redis_vectorizer.dtype,
num_results=num_results,
return_fields=return_fields,
stopwords=None,
)
hybrid_results = await self.redis_index.query(query)
return cast(list[dict[str, Any]], hybrid_results)
# Text-only search
query = TextQuery(
text=q,
text_field_name="content",
text_scorer=text_scorer,
filter_expression=combined_filter,
num_results=num_results,
return_fields=return_fields,
stopwords=None,
)
text_results = await self.redis_index.query(query)
return cast(list[dict[str, Any]], text_results)
except Exception as exc: # pragma: no cover - surface as framework error
raise ServiceInvalidRequestError(f"Redis text search failed: {exc}") from exc
async def search_all(self, page_size: int = 200) -> list[dict[str, Any]]:
"""Returns all documents in the index.
Streams results via pagination to avoid excessive memory and response sizes.
Args:
page_size: Page size used for pagination under the hood.
Returns:
List of all documents.
"""
out: list[dict[str, Any]] = []
async for batch in self.redis_index.paginate(
FilterQuery(FilterExpression("*"), return_fields=[], num_results=page_size),
page_size=page_size,
):
out.extend(batch)
return out
@property
def _effective_thread_id(self) -> str | None:
"""Resolves the active thread id.
Returns per-operation thread id when scoping is enabled; otherwise the provider's thread id.
"""
return self._per_operation_thread_id if self.scope_to_per_operation_thread_id else self.thread_id
async def thread_created(self, thread_id: str | None) -> None:
"""Called when a new thread is created.
Captures the per-operation thread id when scoping is enabled to enforce single-thread usage.
Args:
thread_id: The ID of the thread or None.
"""
self._validate_per_operation_thread_id(thread_id)
self._per_operation_thread_id = self._per_operation_thread_id or thread_id
# Track current conversation id (Agent passes conversation_id here)
self._conversation_id = thread_id or self._conversation_id
async def messages_adding(self, thread_id: str | None, new_messages: ChatMessage | Sequence[ChatMessage]) -> None:
"""Called when a new message is being added to the thread.
Validates scope, normalizes allowed roles, and persists messages to Redis via add().
Args:
thread_id: The ID of the thread or None.
new_messages: New messages to add.
"""
self._validate_filters()
self._validate_per_operation_thread_id(thread_id)
self._per_operation_thread_id = self._per_operation_thread_id or thread_id
messages_list = [new_messages] if isinstance(new_messages, ChatMessage) else list(new_messages)
messages: list[dict[str, Any]] = []
for message in messages_list:
if (
message.role.value in {Role.USER.value, Role.ASSISTANT.value, Role.SYSTEM.value}
and message.text
and message.text.strip()
):
shaped: dict[str, Any] = {
"role": message.role.value,
"content": message.text,
"conversation_id": self._conversation_id,
"message_id": message.message_id,
"author_name": message.author_name,
}
messages.append(shaped)
if messages:
await self._add(data=messages)
async def model_invoking(self, messages: ChatMessage | MutableSequence[ChatMessage]) -> Context:
"""Called before invoking the model to provide scoped context.
Concatenates recent messages into a query, fetches matching memories from Redis.
Prepends them as instructions.
Args:
messages: List of new messages in the thread.
Returns:
Context: Context object containing instructions with memories.
"""
self._validate_filters()
messages_list = [messages] if isinstance(messages, ChatMessage) else list(messages)
input_text = "\n".join(msg.text for msg in messages_list if msg and msg.text and msg.text.strip())
memories = await self._redis_search(text=input_text)
line_separated_memories = "\n".join(
str(memory.get("content", "")) for memory in memories if memory.get("content")
)
content = TextContent(f"{self.context_prompt}\n{line_separated_memories}") if line_separated_memories else None
return Context(contents=[content] if content else None)
async def __aenter__(self) -> Self:
"""Async context manager entry.
No special setup is required; provided for symmetry with the Mem0 provider.
"""
return self
async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
"""Async context manager exit.
No cleanup is required; indexes/keys remain unless explicitly cleared.
"""
return
def _validate_filters(self) -> None:
"""Validates that at least one filter is provided.
Prevents unbounded operations by requiring a partition filter before reads or writes.
Raises:
ServiceInitializationError: If no filters are provided.
"""
if not self.agent_id and not self.user_id and not self.application_id and not self.thread_id:
raise ServiceInitializationError(
"At least one of the filters: agent_id, user_id, application_id, or thread_id is required."
)
def _validate_per_operation_thread_id(self, thread_id: str | None) -> None:
"""Validates that a new thread ID doesn't conflict when scoped.
Prevents cross-thread data leakage by enforcing single-thread usage when per-operation scoping is enabled.
Args:
thread_id: The new thread ID or None.
Raises:
ValueError: If a new thread ID conflicts with the existing one.
"""
if (
self.scope_to_per_operation_thread_id
and thread_id
and self._per_operation_thread_id
and thread_id != self._per_operation_thread_id
):
raise ValueError(
"RedisProvider can only be used with one thread, when scope_to_per_operation_thread_id is True."
)
+95
View File
@@ -0,0 +1,95 @@
[project]
name = "agent-framework-redis"
description = "Redis integration for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "SK-Support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "0.0.0b1"
license-files = ["LICENSE"]
urls.homepage = "https://learn.microsoft.com/en-us/semantic-kernel/overview/"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Framework :: Pydantic :: 2",
"Typing :: Typed",
]
dependencies = [
"agent-framework",
"redis>=6.4.0",
"redisvl>=0.8.2",
"numpy>=2.2.6"
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*"
]
timeout = 120
[tool.ruff]
extend = "../../pyproject.toml"
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extend = "../../pyproject.toml"
exclude = ['tests']
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_redis"]
exclude_dirs = ["tests"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks]
mypy = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_redis"
test = "pytest --cov=agent_framework_redis --cov-report=term-missing:skip-covered tests"
[tool.uv.build-backend]
module-name = "agent_framework_redis"
module-root = ""
[build-system]
requires = ["uv_build>=0.8.2,<0.9.0"]
build-backend = "uv_build"
@@ -0,0 +1,496 @@
# Copyright (c) Microsoft. All rights reserved.
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from agent_framework import ChatMessage, Role, TextContent
from agent_framework_redis import RedisChatMessageStore
class TestRedisChatMessageStore:
"""Unit tests for RedisChatMessageStore using mocked Redis client.
These tests use mocked Redis operations to verify the logic and behavior
of the RedisChatMessageStore without requiring a real Redis server.
"""
@pytest.fixture
def sample_messages(self):
"""Sample chat messages for testing."""
return [
ChatMessage(role=Role.USER, text="Hello", message_id="msg1"),
ChatMessage(role=Role.ASSISTANT, text="Hi there!", message_id="msg2"),
ChatMessage(role=Role.USER, text="How are you?", message_id="msg3"),
]
@pytest.fixture
def mock_redis_client(self):
"""Mock Redis client with all required methods."""
client = MagicMock()
# Core list operations
client.lrange = AsyncMock(return_value=[])
client.llen = AsyncMock(return_value=0)
client.lindex = AsyncMock(return_value=None)
client.lset = AsyncMock(return_value=True)
client.lrem = AsyncMock(return_value=0)
client.lpop = AsyncMock(return_value=None)
client.rpop = AsyncMock(return_value=None)
client.ltrim = AsyncMock(return_value=True)
client.delete = AsyncMock(return_value=1)
# Pipeline operations
mock_pipeline = AsyncMock()
mock_pipeline.rpush = AsyncMock()
mock_pipeline.execute = AsyncMock()
client.pipeline.return_value.__aenter__.return_value = mock_pipeline
return client
@pytest.fixture
def redis_store(self, mock_redis_client):
"""Redis chat message store with mocked client."""
with patch("agent_framework_redis._chat_message_store.redis.from_url") as mock_from_url:
mock_from_url.return_value = mock_redis_client
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test_thread_123")
store._redis_client = mock_redis_client
return store
def test_init_with_thread_id(self):
"""Test initialization with explicit thread ID."""
thread_id = "user123_session456"
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id=thread_id)
assert store.thread_id == thread_id
assert store.redis_url == "redis://localhost:6379"
assert store.key_prefix == "chat_messages"
assert store.redis_key == f"chat_messages:{thread_id}"
def test_init_auto_generate_thread_id(self):
"""Test initialization with auto-generated thread ID."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(redis_url="redis://localhost:6379")
assert store.thread_id is not None
assert store.thread_id.startswith("thread_")
assert len(store.thread_id) > 10 # Should be a UUID
def test_init_with_custom_prefix(self):
"""Test initialization with custom key prefix."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(
redis_url="redis://localhost:6379", thread_id="test123", key_prefix="custom_messages"
)
assert store.redis_key == "custom_messages:test123"
def test_init_with_max_messages(self):
"""Test initialization with message limit."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test123", max_messages=100)
assert store.max_messages == 100
def test_init_with_redis_url_required(self):
"""Test that redis_url is required for initialization."""
with pytest.raises(ValueError, match="redis_url is required for Redis connection"):
# Should raise an exception since redis_url is required
RedisChatMessageStore(thread_id="test123")
def test_init_with_initial_messages(self, sample_messages):
"""Test initialization with initial messages."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(
redis_url="redis://localhost:6379", thread_id="test123", messages=sample_messages
)
assert store._initial_messages == sample_messages
async def test_add_messages_single(self, redis_store, mock_redis_client, sample_messages):
"""Test adding a single message using pipeline operations."""
message = sample_messages[0]
await redis_store.add_messages([message])
# Verify pipeline operations were called
mock_redis_client.pipeline.assert_called_with(transaction=True)
# Get the pipeline mock and verify it was used correctly
pipeline_mock = mock_redis_client.pipeline.return_value.__aenter__.return_value
pipeline_mock.rpush.assert_called()
pipeline_mock.execute.assert_called()
async def test_add_messages_multiple(self, redis_store, mock_redis_client, sample_messages):
"""Test adding multiple messages using pipeline operations."""
await redis_store.add_messages(sample_messages)
# Verify pipeline operations
mock_redis_client.pipeline.assert_called_with(transaction=True)
# Verify rpush was called for each message
pipeline_mock = mock_redis_client.pipeline.return_value.__aenter__.return_value
assert pipeline_mock.rpush.call_count == len(sample_messages)
async def test_add_messages_with_max_limit(self, mock_redis_client):
"""Test adding messages with max limit triggers trimming."""
with patch("agent_framework_redis._chat_message_store.redis.from_url") as mock_from_url:
mock_from_url.return_value = mock_redis_client
# Mock llen to return count that exceeds limit after adding
mock_redis_client.llen.return_value = 5
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test123", max_messages=3)
store._redis_client = mock_redis_client
message = ChatMessage(role=Role.USER, text="Test")
await store.add_messages([message])
# Should trim after adding to keep only last 3 messages
mock_redis_client.ltrim.assert_called_once_with("chat_messages:test123", -3, -1)
async def test_list_messages_empty(self, redis_store, mock_redis_client):
"""Test listing messages when store is empty."""
mock_redis_client.lrange.return_value = []
messages = await redis_store.list_messages()
assert messages == []
mock_redis_client.lrange.assert_called_once_with("chat_messages:test_thread_123", 0, -1)
async def test_list_messages_with_data(self, redis_store, mock_redis_client, sample_messages):
"""Test listing messages with data in Redis."""
# Create proper serialized messages using the actual serialization method
test_messages = [
ChatMessage(role=Role.USER, text="Hello", message_id="msg1"),
ChatMessage(role=Role.ASSISTANT, text="Hi there!", message_id="msg2"),
]
serialized_messages = [redis_store._serialize_message(msg) for msg in test_messages]
mock_redis_client.lrange.return_value = serialized_messages
messages = await redis_store.list_messages()
assert len(messages) == 2
assert messages[0].role == Role.USER
assert messages[0].text == "Hello"
assert messages[1].role == Role.ASSISTANT
assert messages[1].text == "Hi there!"
async def test_list_messages_with_initial_messages(self, sample_messages):
"""Test that initial messages are added to Redis and retrieved correctly."""
with patch("agent_framework_redis._chat_message_store.redis.from_url") as mock_from_url:
mock_redis_client = MagicMock()
mock_redis_client.llen = AsyncMock(return_value=0) # Redis key is empty
mock_redis_client.lrange = AsyncMock(return_value=[])
# Mock pipeline for adding initial messages
mock_pipeline = AsyncMock()
mock_pipeline.rpush = AsyncMock()
mock_pipeline.execute = AsyncMock()
mock_redis_client.pipeline.return_value.__aenter__.return_value = mock_pipeline
mock_from_url.return_value = mock_redis_client
store = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id="test123",
messages=sample_messages[:1], # One initial message
)
store._redis_client = mock_redis_client
# Mock Redis to return the initial message after it's added
initial_message_json = store._serialize_message(sample_messages[0])
mock_redis_client.lrange.return_value = [initial_message_json]
messages = await store.list_messages()
assert len(messages) == 1
assert messages[0].text == "Hello"
# Verify initial message was added to Redis via pipeline
mock_pipeline.rpush.assert_called()
async def test_initial_messages_not_added_if_key_exists(self, sample_messages):
"""Test that initial messages are not added if Redis key already has data."""
with patch("agent_framework_redis._chat_message_store.redis.from_url") as mock_from_url:
mock_redis_client = MagicMock()
mock_redis_client.llen = AsyncMock(return_value=5) # Key already has messages
mock_redis_client.lrange = AsyncMock(return_value=[])
# Pipeline should not be called since key already exists
mock_pipeline = AsyncMock()
mock_pipeline.rpush = AsyncMock()
mock_pipeline.execute = AsyncMock()
mock_redis_client.pipeline.return_value.__aenter__.return_value = mock_pipeline
mock_from_url.return_value = mock_redis_client
store = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id="test123",
messages=sample_messages[:1], # One initial message
)
store._redis_client = mock_redis_client
await store.list_messages()
# Should check length but not add messages since key exists
mock_redis_client.llen.assert_called()
mock_pipeline.rpush.assert_not_called()
async def test_serialize_state(self, redis_store):
"""Test state serialization."""
state = await redis_store.serialize_state()
expected_state = {
"thread_id": "test_thread_123",
"redis_url": "redis://localhost:6379",
"key_prefix": "chat_messages",
"max_messages": None,
}
assert state == expected_state
async def test_deserialize_state(self, redis_store):
"""Test state deserialization."""
serialized_state = {
"thread_id": "restored_thread_456",
"redis_url": "redis://localhost:6380",
"key_prefix": "restored_messages",
"max_messages": 50,
}
await redis_store.deserialize_state(serialized_state)
assert redis_store.thread_id == "restored_thread_456"
assert redis_store.redis_url == "redis://localhost:6380"
assert redis_store.key_prefix == "restored_messages"
assert redis_store.max_messages == 50
async def test_deserialize_state_empty(self, redis_store):
"""Test deserializing empty state doesn't change anything."""
original_thread_id = redis_store.thread_id
await redis_store.deserialize_state(None)
assert redis_store.thread_id == original_thread_id
async def test_clear_messages(self, redis_store, mock_redis_client):
"""Test clearing all messages."""
await redis_store.clear()
mock_redis_client.delete.assert_called_once_with("chat_messages:test_thread_123")
async def test_message_serialization_roundtrip(self, sample_messages):
"""Test message serialization and deserialization roundtrip."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test123")
message = sample_messages[0]
# Test serialization
serialized = store._serialize_message(message)
assert isinstance(serialized, str)
# Test deserialization
deserialized = store._deserialize_message(serialized)
assert deserialized.role == message.role
assert deserialized.text == message.text
assert deserialized.message_id == message.message_id
async def test_message_serialization_with_complex_content(self):
"""Test serialization of messages with complex content."""
with patch("agent_framework_redis._chat_message_store.redis.from_url"):
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test123")
# Message with multiple content types
message = ChatMessage(
role=Role.ASSISTANT,
contents=[TextContent(text="Hello"), TextContent(text="World")],
author_name="TestBot",
message_id="complex_msg",
additional_properties={"metadata": "test"},
)
serialized = store._serialize_message(message)
deserialized = store._deserialize_message(serialized)
assert deserialized.role == Role.ASSISTANT
assert deserialized.text == "Hello World"
assert deserialized.author_name == "TestBot"
assert deserialized.message_id == "complex_msg"
assert deserialized.additional_properties == {"metadata": "test"}
async def test_redis_connection_error_handling(self):
"""Test handling Redis connection errors in add_messages."""
with patch("agent_framework_redis._chat_message_store.redis.from_url") as mock_from_url:
mock_client = MagicMock()
# Mock pipeline to raise exception during execution
mock_pipeline = AsyncMock()
mock_pipeline.rpush = AsyncMock()
mock_pipeline.execute = AsyncMock(side_effect=Exception("Connection failed"))
mock_client.pipeline.return_value.__aenter__.return_value = mock_pipeline
mock_from_url.return_value = mock_client
store = RedisChatMessageStore(redis_url="redis://localhost:6379", thread_id="test123")
store._redis_client = mock_client
message = ChatMessage(role=Role.USER, text="Test")
# Should propagate Redis connection errors
with pytest.raises(Exception, match="Connection failed"):
await store.add_messages([message])
async def test_getitem(self, redis_store, mock_redis_client, sample_messages):
"""Test getitem method using Redis LINDEX."""
# Mock LINDEX to return specific messages
serialized_msg0 = redis_store._serialize_message(sample_messages[0])
serialized_msg1 = redis_store._serialize_message(sample_messages[1])
def mock_lindex(key, index):
if index == 0:
return serialized_msg0
if index == -1 or index == 1:
return serialized_msg1
return None
mock_redis_client.lindex = AsyncMock(side_effect=mock_lindex)
# Test positive index
message = await redis_store.getitem(0)
assert message.text == "Hello"
# Test negative index
message = await redis_store.getitem(-1)
assert message.text == "Hi there!"
async def test_getitem_index_error(self, redis_store, mock_redis_client):
"""Test getitem raises IndexError for invalid index."""
mock_redis_client.lindex = AsyncMock(return_value=None)
with pytest.raises(IndexError):
await redis_store.getitem(0)
async def test_setitem(self, redis_store, mock_redis_client, sample_messages):
"""Test setitem method using Redis LSET."""
mock_redis_client.llen.return_value = 2
mock_redis_client.lset = AsyncMock()
new_message = ChatMessage(role=Role.USER, text="Updated message")
await redis_store.setitem(0, new_message)
mock_redis_client.lset.assert_called_once()
call_args = mock_redis_client.lset.call_args
assert call_args[0][0] == "chat_messages:test_thread_123"
assert call_args[0][1] == 0
async def test_setitem_index_error(self, redis_store, mock_redis_client):
"""Test setitem raises IndexError for invalid index."""
mock_redis_client.llen.return_value = 0
new_message = ChatMessage(role=Role.USER, text="Test")
with pytest.raises(IndexError):
await redis_store.setitem(0, new_message)
async def test_append(self, redis_store, mock_redis_client):
"""Test append method delegates to add_messages."""
message = ChatMessage(role=Role.USER, text="Appended message")
await redis_store.append(message)
# Should call pipeline operations via add_messages
mock_redis_client.pipeline.assert_called_with(transaction=True)
# Verify the message was added via pipeline
pipeline_mock = mock_redis_client.pipeline.return_value.__aenter__.return_value
pipeline_mock.rpush.assert_called()
pipeline_mock.execute.assert_called()
async def test_count(self, redis_store, mock_redis_client):
"""Test count method."""
mock_redis_client.llen.return_value = 5
count = await redis_store.count()
assert count == 5
mock_redis_client.llen.assert_called_with("chat_messages:test_thread_123")
async def test_len_method(self, redis_store, mock_redis_client):
"""Test async __len__ method."""
mock_redis_client.llen.return_value = 3
length = await redis_store.__len__()
assert length == 3
mock_redis_client.llen.assert_called_with("chat_messages:test_thread_123")
def test_bool_method(self, redis_store):
"""Test __bool__ method always returns True."""
# Store should always be truthy
assert bool(redis_store) is True
assert redis_store.__bool__() is True
# Should work in if statements (this is what Agent Framework uses)
if redis_store:
assert True # Should reach this
else:
raise AssertionError("Store should be truthy")
async def test_index_found(self, redis_store, mock_redis_client, sample_messages):
"""Test index method when message is found using Redis LINDEX."""
mock_redis_client.llen.return_value = 2
# Mock LINDEX to return messages at each position
serialized_msg0 = redis_store._serialize_message(sample_messages[0])
serialized_msg1 = redis_store._serialize_message(sample_messages[1])
def mock_lindex(key, index):
if index == 0:
return serialized_msg0
if index == 1:
return serialized_msg1
return None
mock_redis_client.lindex = AsyncMock(side_effect=mock_lindex)
index = await redis_store.index(sample_messages[1])
assert index == 1
# Should have called lindex twice (index 0, then index 1)
assert mock_redis_client.lindex.call_count == 2
async def test_index_not_found(self, redis_store, mock_redis_client, sample_messages):
"""Test index method when message is not found."""
mock_redis_client.llen.return_value = 1
mock_redis_client.lindex = AsyncMock(return_value="different_message")
with pytest.raises(ValueError, match="ChatMessage not found in store"):
await redis_store.index(sample_messages[0])
async def test_remove(self, redis_store, mock_redis_client, sample_messages):
"""Test remove method using Redis LREM."""
mock_redis_client.lrem = AsyncMock(return_value=1) # 1 element removed
await redis_store.remove(sample_messages[0])
# Should use LREM to remove the message
expected_serialized = redis_store._serialize_message(sample_messages[0])
mock_redis_client.lrem.assert_called_once_with("chat_messages:test_thread_123", 1, expected_serialized)
async def test_remove_not_found(self, redis_store, mock_redis_client, sample_messages):
"""Test remove method when message is not found."""
mock_redis_client.lrem = AsyncMock(return_value=0) # 0 elements removed
with pytest.raises(ValueError, match="ChatMessage not found in store"):
await redis_store.remove(sample_messages[0])
async def test_extend(self, redis_store, mock_redis_client, sample_messages):
"""Test extend method delegates to add_messages."""
await redis_store.extend(sample_messages[:2])
# Should call pipeline operations via add_messages
mock_redis_client.pipeline.assert_called_with(transaction=True)
# Verify rpush was called for each message
pipeline_mock = mock_redis_client.pipeline.return_value.__aenter__.return_value
assert pipeline_mock.rpush.call_count >= 2
@@ -0,0 +1,28 @@
# Copyright (c) Microsoft. All rights reserved.
def test_self_through_main() -> None:
try:
from agent_framework.redis import __version__
except ImportError:
__version__ = None
assert __version__ is not None
def test_self() -> None:
try:
from agent_framework_redis import __version__
except ImportError:
__version__ = None
assert __version__ is not None
def test_agent_framework() -> None:
try:
from agent_framework import __version__
except ImportError:
__version__ = None
assert __version__ is not None
@@ -0,0 +1,448 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
import pytest
from agent_framework import ChatMessage, Role
from agent_framework.exceptions import ServiceInitializationError
from pydantic import ValidationError
from redisvl.utils.vectorize import CustomTextVectorizer
from agent_framework_redis import RedisProvider
CUSTOM_VECTORIZER = CustomTextVectorizer(embed=lambda x: [1.0, 2.0, 3.0], dtype="float32")
@pytest.fixture
def mock_index() -> AsyncMock:
idx = AsyncMock()
idx.create = AsyncMock()
idx.load = AsyncMock()
idx.query = AsyncMock()
idx.exists = AsyncMock(return_value=False)
async def _paginate_generator(*_args: Any, **_kwargs: Any):
# Default empty generator; override per-test as needed
if False: # pragma: no cover
yield []
return
idx.paginate = _paginate_generator
return idx
@pytest.fixture
def patch_index_from_dict(mock_index: AsyncMock):
with patch("agent_framework_redis._provider.AsyncSearchIndex") as mock_cls:
mock_cls.from_dict = MagicMock(return_value=mock_index)
# Mock from_existing to return a mock with matching schema by default
# This prevents schema validation errors in tests that don't specifically test schema validation
async def mock_from_existing(index_name, redis_url):
mock_existing = AsyncMock()
# Return a schema that will match whatever the provider generates
# This is a bit of a hack, but allows existing tests to continue working
mock_existing.schema.to_dict = MagicMock(
side_effect=lambda: mock_cls.from_dict.call_args[0][0] if mock_cls.from_dict.call_args else {}
)
return mock_existing
mock_cls.from_existing = AsyncMock(side_effect=mock_from_existing)
yield mock_cls
@pytest.fixture
def patch_queries():
calls: dict[str, Any] = {"TextQuery": [], "HybridQuery": [], "FilterExpression": []}
def _mk_query(kind: str):
class _Q: # simple marker object with captured kwargs
def __init__(self, **kwargs):
self.kind = kind
self.kwargs = kwargs
return _Q
with (
patch(
"agent_framework_redis._provider.TextQuery",
side_effect=lambda **k: calls["TextQuery"].append(k) or _mk_query("text")(**k),
) as text_q,
patch(
"agent_framework_redis._provider.HybridQuery",
side_effect=lambda **k: calls["HybridQuery"].append(k) or _mk_query("hybrid")(**k),
) as hybrid_q,
patch(
"agent_framework_redis._provider.FilterExpression",
side_effect=lambda s: calls["FilterExpression"].append(s) or ("FE", s),
) as filt,
):
yield {"calls": calls, "TextQuery": text_q, "HybridQuery": hybrid_q, "FilterExpression": filt}
class TestRedisProviderInitialization:
# Verifies the provider can be imported from the package
def test_import(self):
from agent_framework_redis._provider import RedisProvider
assert RedisProvider is not None
# Constructing without filters should not raise; filters are enforced at call-time
def test_init_without_filters_ok(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider()
assert provider.user_id is None
assert provider.agent_id is None
assert provider.application_id is None
assert provider.thread_id is None
# Schema should omit vector field when no vector configuration is provided
def test_schema_without_vector_field(self, patch_index_from_dict):
RedisProvider(user_id="u1")
# Inspect schema passed to from_dict
args, kwargs = patch_index_from_dict.from_dict.call_args
schema = args[0]
assert isinstance(schema, dict)
names = [f["name"] for f in schema["fields"]]
types = [f["type"] for f in schema["fields"]]
assert "content" in names
assert "text" in types
assert "vector" not in types
class TestRedisProviderMessages:
@pytest.fixture
def sample_messages(self) -> list[ChatMessage]:
return [
ChatMessage(role=Role.USER, text="Hello, how are you?"),
ChatMessage(role=Role.ASSISTANT, text="I'm doing well, thank you!"),
ChatMessage(role=Role.SYSTEM, text="You are a helpful assistant"),
]
@pytest.mark.asyncio
# Writes require at least one scoping filter to avoid unbounded operations
async def test_messages_adding_requires_filters(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider()
with pytest.raises(ServiceInitializationError):
await provider.messages_adding("thread123", ChatMessage(role=Role.USER, text="Hello"))
@pytest.mark.asyncio
# Captures the per-operation thread id when provided
async def test_thread_created_sets_per_operation_id(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider(user_id="u1")
await provider.thread_created("t1")
assert provider._per_operation_thread_id == "t1"
@pytest.mark.asyncio
# Enforces single-thread usage when scope_to_per_operation_thread_id is True
async def test_thread_created_conflict_when_scoped(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider(user_id="u1", scope_to_per_operation_thread_id=True)
provider._per_operation_thread_id = "t1"
with pytest.raises(ValueError) as exc:
await provider.thread_created("t2")
assert "only be used with one thread" in str(exc.value)
@pytest.mark.asyncio
# Aggregates all results from the async paginator into a flat list
async def test_search_all_paginates(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
async def gen(_q, page_size: int = 200): # noqa: ARG001, ANN001
yield [{"id": 1}]
yield [{"id": 2}, {"id": 3}]
mock_index.paginate = gen
provider = RedisProvider(user_id="u1")
res = await provider.search_all(page_size=2)
assert res == [{"id": 1}, {"id": 2}, {"id": 3}]
class TestRedisProviderModelInvoking:
@pytest.mark.asyncio
# Reads require at least one scoping filter to avoid unbounded operations
async def test_model_invoking_requires_filters(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider()
with pytest.raises(ServiceInitializationError):
await provider.model_invoking(ChatMessage(role=Role.USER, text="Hi"))
@pytest.mark.asyncio
# Ensures text-only search path is used and context is composed from hits
async def test_textquery_path_and_context_contents(
self, mock_index: AsyncMock, patch_index_from_dict, patch_queries
): # noqa: ARG002
# Arrange: text-only search
mock_index.query = AsyncMock(return_value=[{"content": "A"}, {"content": "B"}])
provider = RedisProvider(user_id="u1")
# Act
ctx = await provider.model_invoking([ChatMessage(role=Role.USER, text="q1")])
# Assert: TextQuery used (not HybridQuery), filter_expression included
assert patch_queries["TextQuery"].call_count == 1
assert patch_queries["HybridQuery"].call_count == 0
kwargs = patch_queries["calls"]["TextQuery"][0]
assert kwargs["text"] == "q1"
assert kwargs["text_field_name"] == "content"
assert kwargs["num_results"] == 10
assert "filter_expression" in kwargs
# Context contains memories joined after the default prompt
assert ctx.contents is not None and len(ctx.contents) == 1
text = ctx.contents[0].text
assert text.endswith("A\nB")
@pytest.mark.asyncio
# When no results are returned, Context should have no contents
async def test_model_invoking_empty_results_returns_empty_context(
self, mock_index: AsyncMock, patch_index_from_dict, patch_queries
): # noqa: ARG002
mock_index.query = AsyncMock(return_value=[])
provider = RedisProvider(user_id="u1")
ctx = await provider.model_invoking([ChatMessage(role=Role.USER, text="any")])
assert ctx.contents is None
@pytest.mark.asyncio
# Ensures hybrid vector-text search is used when a vectorizer and vector field are configured
async def test_hybridquery_path_with_vectorizer(self, mock_index: AsyncMock, patch_index_from_dict, patch_queries): # noqa: ARG002
mock_index.query = AsyncMock(return_value=[{"content": "Hit"}])
provider = RedisProvider(user_id="u1", redis_vectorizer=CUSTOM_VECTORIZER, vector_field_name="vec")
ctx = await provider.model_invoking([ChatMessage(role=Role.USER, text="hello")])
# Assert: HybridQuery used with vector and vector field
assert patch_queries["HybridQuery"].call_count == 1
k = patch_queries["calls"]["HybridQuery"][0]
assert k["text"] == "hello"
assert k["vector_field_name"] == "vec"
assert k["vector"] == [1.0, 2.0, 3.0]
assert k["dtype"] == "float32"
assert k["num_results"] == 10
assert "filter_expression" in k
# Context assembled from returned memories
assert ctx.contents and "Hit" in ctx.contents[0].text
class TestRedisProviderContextManager:
@pytest.mark.asyncio
# Verifies async context manager returns self for chaining
async def test_async_context_manager_returns_self(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider(user_id="u1")
async with provider as ctx:
assert ctx is provider
@pytest.mark.asyncio
# Exit should be a no-op and not raise
async def test_aexit_noop(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider(user_id="u1")
assert await provider.__aexit__(None, None, None) is None
class TestMessagesAddingBehavior:
@pytest.mark.asyncio
# Adds messages while injecting partition defaults and preserving allowed roles
async def test_messages_adding_adds_partition_defaults_and_roles(
self, mock_index: AsyncMock, patch_index_from_dict
): # noqa: ARG002
provider = RedisProvider(
application_id="app",
agent_id="agent",
user_id="u1",
scope_to_per_operation_thread_id=True,
)
msgs = [
ChatMessage(role=Role.USER, text="u"),
ChatMessage(role=Role.ASSISTANT, text="a"),
ChatMessage(role=Role.SYSTEM, text="s"),
]
await provider.messages_adding("t1", msgs)
# Ensure load invoked with shaped docs containing defaults
assert mock_index.load.await_count == 1
(loaded_args, _kwargs) = mock_index.load.call_args
docs = loaded_args[0]
assert isinstance(docs, list) and len(docs) == 3
for d in docs:
assert d["role"] in {"user", "assistant", "system"}
assert d["content"] in {"u", "a", "s"}
assert d["application_id"] == "app"
assert d["agent_id"] == "agent"
assert d["user_id"] == "u1"
assert d["thread_id"] == "t1" # scoped via per-operation thread id
@pytest.mark.asyncio
# Skips blank text and disallowed roles (e.g., TOOL) when adding messages
async def test_messages_adding_ignores_blank_and_disallowed_roles(
self, mock_index: AsyncMock, patch_index_from_dict
): # noqa: ARG002
provider = RedisProvider(user_id="u1", scope_to_per_operation_thread_id=True)
msgs = [
ChatMessage(role=Role.USER, text=" "),
ChatMessage(role=Role.TOOL, text="tool output"),
]
await provider.messages_adding("tid", msgs)
# No valid messages -> no load
assert mock_index.load.await_count == 0
class TestIndexCreationPublicCalls:
@pytest.mark.asyncio
# Ensures index is created only once when drop=True on first public write call
async def test_messages_adding_triggers_index_create_once_when_drop_true(
self, mock_index: AsyncMock, patch_index_from_dict
): # noqa: ARG002
provider = RedisProvider(user_id="u1", drop_redis_index=True)
await provider.messages_adding("t1", ChatMessage(role=Role.USER, text="m1"))
await provider.messages_adding("t1", ChatMessage(role=Role.USER, text="m2"))
# create only on first call
assert mock_index.create.await_count == 1
@pytest.mark.asyncio
# Ensures index is created when drop=False and the index does not exist on first read
async def test_model_invoking_triggers_create_when_drop_false_and_not_exists(
self, mock_index: AsyncMock, patch_index_from_dict
): # noqa: ARG002
mock_index.exists = AsyncMock(return_value=False)
provider = RedisProvider(user_id="u1", drop_redis_index=False)
mock_index.query = AsyncMock(return_value=[{"content": "C"}])
await provider.model_invoking([ChatMessage(role=Role.USER, text="q")])
assert mock_index.create.await_count == 1
class TestThreadCreatedAdditional:
@pytest.mark.asyncio
# Allows None or same thread id repeatedly; different id raises when scoped
async def test_thread_created_allows_none_and_same_id(self, patch_index_from_dict): # noqa: ARG002
provider = RedisProvider(user_id="u1", scope_to_per_operation_thread_id=True)
# None is allowed
await provider.thread_created(None)
# Same id is allowed repeatedly
await provider.thread_created("t1")
await provider.thread_created("t1")
# Different id should raise
with pytest.raises(ValueError):
await provider.thread_created("t2")
class TestVectorPopulation:
@pytest.mark.asyncio
# When vectorizer configured, messages_adding should embed content and populate the vector field
async def test_messages_adding_populates_vector_field_when_vectorizer_present(
self, mock_index: AsyncMock, patch_index_from_dict
): # noqa: ARG002
provider = RedisProvider(
user_id="u1",
scope_to_per_operation_thread_id=True,
redis_vectorizer=CUSTOM_VECTORIZER,
vector_field_name="vec",
)
await provider.messages_adding("t1", ChatMessage(role=Role.USER, text="hello"))
assert mock_index.load.await_count == 1
(loaded_args, _kwargs) = mock_index.load.call_args
docs = loaded_args[0]
assert isinstance(docs, list) and len(docs) == 1
vec = docs[0].get("vec")
assert isinstance(vec, (bytes, bytearray))
assert len(vec) == 3 * np.dtype(np.float32).itemsize
class TestRedisProviderSchemaVectors:
# Adds a vector field when vectorizer supplies dims implicitly
def test_schema_with_vector_field_and_dims_inferred(self, patch_index_from_dict): # noqa: ARG002
RedisProvider(user_id="u1", redis_vectorizer=CUSTOM_VECTORIZER, vector_field_name="vec")
args, _ = patch_index_from_dict.from_dict.call_args
schema = args[0]
names = [f["name"] for f in schema["fields"]]
types = {f["name"]: f["type"] for f in schema["fields"]}
assert "vec" in names
assert types["vec"] == "vector"
# Raises when redis_vectorizer is not the correct type
def test_init_invalid_vectorizer(self, patch_index_from_dict): # noqa: ARG002
class DummyVectorizer:
pass
with pytest.raises(ValidationError):
RedisProvider(user_id="u1", redis_vectorizer=DummyVectorizer(), vector_field_name="vec")
class TestEnsureIndex:
@pytest.mark.asyncio
# Creates index once and marks _index_initialized to prevent duplicate calls
async def test_ensure_index_creates_once(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
# Mock index doesn't exist, so it will be created
mock_index.exists = AsyncMock(return_value=False)
provider = RedisProvider(user_id="u1", overwrite_index=False)
assert provider._index_initialized is False
await provider._ensure_index()
assert mock_index.create.await_count == 1
assert provider._index_initialized is True
# Second call should not create again due to _index_initialized flag
await provider._ensure_index()
assert mock_index.create.await_count == 1
@pytest.mark.asyncio
# Creates index with overwrite=True when overwrite_index=True
async def test_ensure_index_with_overwrite_true(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
mock_index.exists = AsyncMock(return_value=True)
provider = RedisProvider(user_id="u1", overwrite_index=True)
await provider._ensure_index()
# Should call create with overwrite=True, drop=False
mock_index.create.assert_called_once_with(overwrite=True, drop=False)
@pytest.mark.asyncio
# Creates index with overwrite=False when index doesn't exist
async def test_ensure_index_create_if_missing(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
mock_index.exists = AsyncMock(return_value=False)
provider = RedisProvider(user_id="u1", overwrite_index=False)
await provider._ensure_index()
# Should call create with overwrite=False, drop=False
mock_index.create.assert_called_once_with(overwrite=False, drop=False)
@pytest.mark.asyncio
# Validates schema compatibility when index exists and overwrite=False
async def test_ensure_index_schema_validation_success(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
mock_index.exists = AsyncMock(return_value=True)
provider = RedisProvider(user_id="u1", overwrite_index=False)
# Mock existing index with matching schema
expected_schema = provider.schema_dict
patch_index_from_dict.from_existing.return_value.schema.to_dict.return_value = expected_schema
await provider._ensure_index()
# Should validate schema and proceed to create
patch_index_from_dict.from_existing.assert_called_once_with("context", redis_url="redis://localhost:6379")
mock_index.create.assert_called_once_with(overwrite=False, drop=False)
@pytest.mark.asyncio
# Raises ServiceInitializationError when schemas don't match
async def test_ensure_index_schema_validation_failure(self, mock_index: AsyncMock, patch_index_from_dict): # noqa: ARG002
mock_index.exists = AsyncMock(return_value=True)
provider = RedisProvider(user_id="u1", overwrite_index=False)
# Override the mock to return a different schema after provider is created
async def mock_from_existing_different(index_name, redis_url):
mock_existing = AsyncMock()
mock_existing.schema.to_dict = MagicMock(return_value={"different": "schema"})
return mock_existing
patch_index_from_dict.from_existing = AsyncMock(side_effect=mock_from_existing_different)
with pytest.raises(ServiceInitializationError) as exc:
await provider._ensure_index()
assert "incompatible with the current configuration" in str(exc.value)
assert "overwrite_index=True" in str(exc.value)
# Should not call create when schema validation fails
mock_index.create.assert_not_called()
+4 -1
View File
@@ -9,6 +9,7 @@ dependencies = [
"agent-framework-copilotstudio",
"agent-framework-foundry",
"agent-framework-mem0",
"agent-framework-redis",
"agent-framework-devui",
"agent-framework-lab-gaia",
]
@@ -59,6 +60,7 @@ agent-framework-copilotstudio = { workspace = true }
agent-framework-foundry = { workspace = true }
agent-framework-lab-gaia = { workspace = true }
agent-framework-mem0 = { workspace = true }
agent-framework-redis = { workspace = true }
agent-framework-runtime = { workspace = true }
agent-framework-devui = { workspace = true }
@@ -185,7 +187,8 @@ build = "python run_tasks_in_packages_if_exists.py build"
# combined checks
check = ["fmt", "lint", "pyright", "mypy", "test", "markdown-code-lint", "samples-code-check"]
pre-commit-check = ["fmt", "lint", "pyright", "markdown-code-lint", "samples-code-check"]
all-tests = "pytest --import-mode=importlib --cov=agent_framework --cov=agent_framework_azure --cov=agent_framework_copilotstudio --cov=agent_framework_foundry --cov=agent_framework_mem0 --cov-report=term-missing:skip-covered packages/azure/tests packages/copilotstudio/tests packages/foundry/tests packages/main/tests packages/mem0/tests"
all-tests = "pytest --import-mode=importlib --cov=agent_framework --cov=agent_framework_azure --cov=agent_framework_copilotstudio --cov=agent_framework_foundry --cov=agent_framework_mem0 --cov-report=term-missing:skip-covered packages/azure/tests packages/copilotstudio/tests packages/foundry/tests packages/main/tests packages/mem0/tests packages/redis/tests"
[tool.poe.tasks.venv]
cmd = "uv venv --clear --python $python"
@@ -0,0 +1,112 @@
# Redis Context Provider Examples
The Redis context provider enables persistent, searchable memory for your agents using Redis (RediSearch). It supports fulltext search and optional hybrid search with vector embeddings, letting agents remember and retrieve user context across sessions and threads.
This folder contains an example demonstrating how to use the Redis context provider with the Agent Framework.
## Examples
| File | Description |
|------|-------------|
| [`redis_basics.py`](redis_basics.py) | Shows standalone provider usage and agent integration. Demonstrates writing messages to Redis, retrieving context via fulltext or hybrid vector search, and persisting preferences across threads. Also includes a simple tool example whose outputs are remembered. |
| [`redis_threads.py`](redis_threads.py) | Demonstrates thread scoping. Includes: (1) global thread scope with a fixed `thread_id` shared across operations; (2) peroperation thread scope where `scope_to_per_operation_thread_id=True` binds memory to a single thread for the providers lifetime; and (3) multiple agents with isolated memory via different `agent_id` values. |
## Prerequisites
### Required resources
1. A running Redis with RediSearch (Redis Stack or a managed service)
2. Python environment with Agent Framework Redis extra installed
3. Optional: OpenAI API key if using vector embeddings
### Install the package
```bash
pip install "agent-framework[redis]"
```
## Running Redis
Pick one option:
### Option A: Docker (local Redis Stack)
```bash
docker run --name redis -p 6379:6379 -d redis:8.0.3
```
### Option B: Redis Cloud
Create a free database and get the connection URL at `https://redis.io/cloud/`.
### Option C: Azure Managed Redis
See quickstart: `https://learn.microsoft.com/azure/redis/quickstart-create-managed-redis`
## Configuration
### Environment variables
- `OPENAI_API_KEY` (optional): Required only if you set `vectorizer_choice="openai"` to enable hybrid search.
### Provider configuration highlights
The provider supports both fulltext only and hybrid vector search:
- Set `vectorizer_choice` to `"openai"` or `"hf"` to enable embeddings and hybrid search.
- When using a vectorizer, also set `vector_field_name` (e.g., `"vector"`).
- Partition fields for scoping memory: `application_id`, `agent_id`, `user_id`, `thread_id`.
- Thread scoping: `scope_to_per_operation_thread_id=True` isolates memory per operation thread.
- Index management: `index_name`, `overwrite_redis_index`, `drop_redis_index`.
## What the example does
`redis_basics.py` walks through three scenarios:
1. Standalone provider usage: adds messages and retrieves context via `model_invoking`.
2. Agent integration: teaches the agent a preference and verifies it is remembered across turns.
3. Agent + tool: calls a sample tool (flight search) and then asks the agent to recall details remembered from the tool output.
It uses OpenAI for both chat (via `OpenAIChatClient`) and, in some steps, optional embeddings for hybrid search.
## How to run
1) Start Redis (see options above). For local default, ensure it's reachable at `redis://localhost:6379`.
2) Set your OpenAI key if using embeddings and for the chat client used in the sample:
```bash
export OPENAI_API_KEY="<your key>"
```
3) Run the example:
```bash
python redis_basics.py
```
You should see the agent responses and, when using embeddings, context retrieved from Redis. The example includes commented debug helpers you can print, such as index info or all stored docs.
## Key concepts
### Memory scoping
- Global scope: set `application_id`, `agent_id`, `user_id`, or `thread_id` on the provider to filter memory.
- Peroperation thread scope: set `scope_to_per_operation_thread_id=True` to isolate memory to the current thread created by the framework.
### Hybrid vector search (optional)
- Enable by setting `vectorizer_choice` to `"openai"` (requires `OPENAI_API_KEY`) or `"hf"` (offline model).
- Provide `vector_field_name` (e.g., `"vector"`); other vector settings have sensible defaults.
### Index lifecycle controls
- `overwrite_redis_index` and `drop_redis_index` help recreate indexes during iteration.
## Troubleshooting
- Ensure at least one of `application_id`, `agent_id`, `user_id`, or `thread_id` is set; the provider requires a scope.
- If using embeddings, verify `OPENAI_API_KEY` is set and reachable.
- Make sure Redis exposes RediSearch (Redis Stack image or managed service with search enabled).
@@ -0,0 +1,233 @@
# Copyright (c) Microsoft. All rights reserved.
"""Redis Context Provider: Basic usage and agent integration
This example demonstrates how to use the Redis context provider to persist and
retrieve conversational memory for agents. It covers three progressively more
realistic scenarios:
1) Standalone provider usage ("basic cache")
- Write messages to Redis and retrieve relevant context using full-text or
hybrid vector search.
2) Agent + provider
- Connect the provider to an agent so the agent can store user preferences
and recall them across turns.
3) Agent + provider + tool memory
- Expose a simple tool to the agent, then verify that details from the tool
outputs are captured and retrievable as part of the agent's memory.
Requirements:
- A Redis instance with RediSearch enabled (e.g., Redis Stack)
- agent-framework with the Redis extra installed: pip install "agent-framework[redis]"
- Optionally an OpenAI API key if enabling embeddings for hybrid search
Run:
python redis_basics.py
"""
import os
import asyncio
from agent_framework import ChatMessage, Role
from agent_framework_redis._provider import RedisProvider
from agent_framework.openai import OpenAIChatClient
from redisvl.utils.vectorize import OpenAITextVectorizer
from redisvl.extensions.cache.embeddings import EmbeddingsCache
def search_flights(
origin_airport_code: str,
destination_airport_code: str,
detailed: bool = False
) -> str:
"""Simulated flight-search tool to demonstrate tool memory.
The agent can call this function, and the returned details can be stored
by the Redis context provider. We later ask the agent to recall facts from
these tool results to verify memory is working as expected.
"""
# Minimal static catalog used to simulate a tool's structured output
flights = {
("JFK", "LAX"): {"airline": "SkyJet", "duration": "6h 15m", "price": 325, "cabin": "Economy", "baggage": "1 checked bag"},
("SFO", "SEA"): {"airline": "Pacific Air", "duration": "2h 5m", "price": 129, "cabin": "Economy", "baggage": "Carry-on only"},
("LHR", "DXB"): {"airline": "EuroWings", "duration": "6h 50m", "price": 499, "cabin": "Business", "baggage": "2 bags included"},
}
route = (origin_airport_code.upper(), destination_airport_code.upper())
if route not in flights:
return f"No flights found between {origin_airport_code} and {destination_airport_code}"
flight = flights[route]
if not detailed:
return f"Flights available from {origin_airport_code} to {destination_airport_code}."
return (
f"{flight['airline']} operates flights from {origin_airport_code} to {destination_airport_code}. "
f"Duration: {flight['duration']}. "
f"Price: ${flight['price']}. "
f"Cabin: {flight['cabin']}. "
f"Baggage policy: {flight['baggage']}."
)
async def main() -> None:
"""Walk through provider-only, agent integration, and tool-memory scenarios.
Helpful debugging (uncomment when iterating):
- print(await provider.redis_index.info())
- print(await provider.search_all())
"""
print("1. Standalone provider usage:")
print("-" * 40)
# Create a provider with partition scope and OpenAI embeddings
# Please set the OPENAI_API_KEY and OPENAI_CHAT_MODEL_ID environment variables to use the OpenAI vectorizer
# Recommend default for OPENAI_CHAT_MODEL_ID is gpt-4o-mini
# We attach an embedding vectorizer so the provider can perform hybrid (text + vector)
# retrieval. If you prefer text-only retrieval, instantiate RedisProvider without the
# 'vectorizer' and vector_* parameters.
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
# The provider manages persistence and retrieval. application_id/agent_id/user_id
# scope data for multi-tenant separation; thread_id (set later) narrows to a
# specific conversation.
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
# Build sample chat messages to persist to Redis
messages = [
ChatMessage(role=Role.USER, text="runA CONVO: User Message"),
ChatMessage(role=Role.ASSISTANT, text="runA CONVO: Assistant Message"),
ChatMessage(role=Role.SYSTEM, text="runA CONVO: System Message"),
]
# Declare/start a conversation/thread and write messages under 'runA'.
# Threads are logical boundaries used by the provider to group and retrieve
# conversation-specific context.
await provider.thread_created(thread_id="runA")
await provider.messages_adding(thread_id="runA", new_messages=messages)
# Retrieve relevant memories for a hypothetical model call. The provider uses
# the current request messages as the retrieval query and returns context to
# be injected into the model's instructions.
ctx = await provider.model_invoking([
ChatMessage(role=Role.SYSTEM, text="B: Assistant Message")
])
# Inspect retrieved memories that would be injected into instructions
# (Debug-only output so you can verify retrieval works as expected.)
print("Model Invoking Result:")
print(ctx)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
# --- Agent + provider: teach and recall a preference ---
print("\n2. Agent + provider: teach and recall a preference")
print("-" * 40)
# Fresh provider for the agent demo (recreates index)
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
# Recreate a clean index so the next scenario starts fresh
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics_2",
prefix="context_2",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
# Create chat client for the agent
client = OpenAIChatClient(ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
# Create agent wired to the Redis context provider. The provider automatically
# persists conversational details and surfaces relevant context on each turn.
agent = client.create_agent(
name="MemoryEnhancedAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context"
),
tools=[],
context_providers=provider)
# Teach a user preference; the agent writes this to the provider's memory
query = "Remember that I enjoy glugenflorgle"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Ask the agent to recall the stored preference; it should retrieve from memory
query = "What do I enjoy?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
# --- Agent + provider + tool: store and recall tool-derived context ---
print("\n3. Agent + provider + tool: store and recall tool-derived context")
print("-" * 40)
# Text-only provider (full-text search only). Omits vectorizer and related params.
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_basics_3",
prefix="context_3",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit"
)
# Create agent exposing the flight search tool. Tool outputs are captured by the
# provider and become retrievable context for later turns.
client = OpenAIChatClient(ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
agent = client.create_agent(
name="MemoryEnhancedAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context"
),
tools=search_flights,
context_providers=provider)
# Invoke the tool; outputs become part of memory/context
query = "Are there any flights from new york city (jfk) to la? Give me details"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Verify the agent can recall tool-derived context
query = "Which flight did I ask about?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,113 @@
# Copyright (c) Microsoft. All rights reserved.
"""Redis Context Provider: Basic usage and agent integration
This example demonstrates how to use the Redis ChatMessageStore to persist
conversational details. Pass it as a constructor argument to create_agent.
Requirements:
- A Redis instance with RediSearch enabled (e.g., Redis Stack)
- agent-framework with the Redis extra installed: pip install "agent-framework[redis]"
- Optionally an OpenAI API key if enabling embeddings for hybrid search
Run:
python redis_conversation.py
"""
import os
import asyncio
from agent_framework_redis._provider import RedisProvider
from agent_framework_redis._chat_message_store import RedisChatMessageStore
from agent_framework.openai import OpenAIChatClient
from redisvl.utils.vectorize import OpenAITextVectorizer
from redisvl.extensions.cache.embeddings import EmbeddingsCache
async def main() -> None:
"""Walk through provider and chat message store usage.
Helpful debugging (uncomment when iterating):
- print(await provider.redis_index.info())
- print(await provider.search_all())
"""
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
thread_id = "test_thread"
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_conversation",
prefix="redis_conversation",
application_id="matrix_of_kermits",
agent_id="agent_kermit",
user_id="kermit",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
thread_id=thread_id,
)
chat_message_store_factory = lambda: RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=thread_id,
key_prefix="chat_messages",
max_messages=100,
)
# Create chat client for the agent
client = OpenAIChatClient(ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID"), api_key=os.getenv("OPENAI_API_KEY"))
# Create agent wired to the Redis context provider. The provider automatically
# persists conversational details and surfaces relevant context on each turn.
agent = client.create_agent(
name="MemoryEnhancedAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context"
),
tools=[],
context_providers=provider,
chat_message_store_factory=chat_message_store_factory,
)
# Teach a user preference; the agent writes this to the provider's memory
query = "Remember that I enjoy gumbo"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Ask the agent to recall the stored preference; it should retrieve from memory
query = "What do I enjoy?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
query = "What did I say to you just now?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
query = "Remember that anyone who does not clean shrimp will be eaten by a shark"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
query = "Tulips are red"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
query = "What was the first thing I said to you this conversation?"
result = await agent.run(query)
print("User: ", query)
print("Agent: ", result)
# Drop / delete the provider index in Redis
await provider.redis_index.delete()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,250 @@
# Copyright (c) Microsoft. All rights reserved.
"""Redis Context Provider: Thread scoping examples
This sample demonstrates how conversational memory can be scoped when using the
Redis context provider. It covers three scenarios:
1) Global thread scope
- Provide a fixed thread_id to share memories across operations/threads.
2) Per-operation thread scope
- Enable scope_to_per_operation_thread_id to bind the provider to a single
thread for the lifetime of that provider instance. Use the same thread
object for reads/writes with that provider.
3) Multiple agents with isolated memory
- Use different agent_id values to keep memories separated for different
agent personas, even when the user_id is the same.
Requirements:
- A Redis instance with RediSearch enabled (e.g., Redis Stack)
- agent-framework with the Redis extra installed: pip install "agent-framework[redis]"
- Optionally an OpenAI API key for the chat client in this demo
Run:
python redis_threads.py
"""
import asyncio
import os
import uuid
from agent_framework_redis._provider import RedisProvider
from agent_framework.openai import OpenAIChatClient
from redisvl.utils.vectorize import OpenAITextVectorizer
from redisvl.extensions.cache.embeddings import EmbeddingsCache
# Please set the OPENAI_API_KEY and OPENAI_CHAT_MODEL_ID environment variables to use the OpenAI vectorizer
# Recommend default for OPENAI_CHAT_MODEL_ID is gpt-4o-mini
async def example_global_thread_scope() -> None:
"""Example 1: Global thread_id scope (memories shared across all operations)."""
print("1. Global Thread Scope Example:")
print("-" * 40)
global_thread_id = str(uuid.uuid4())
client = OpenAIChatClient(
ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
)
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_threads_global",
# overwrite_redis_index=True,
# drop_redis_index=True,
application_id="threads_demo_app",
agent_id="threads_demo_agent",
user_id="threads_demo_user",
thread_id=global_thread_id,
scope_to_per_operation_thread_id=False, # Share memories across all threads
)
agent = client.create_agent(
name="GlobalMemoryAssistant",
instructions=(
"You are a helpful assistant. Personalize replies using provided context. "
"Before answering, always check for stored context containing information"
),
tools=[],
context_providers=provider)
# Store a preference in the global scope
query = "Remember that I prefer technical responses with code examples when discussing programming."
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}\n")
# Create a new thread - memories should still be accessible due to global scope
new_thread = agent.get_new_thread()
query = "What technical responses do I prefer?"
print(f"User (new thread): {query}")
result = await agent.run(query, thread=new_thread)
print(f"Agent: {result}\n")
# Clean up the Redis index
await provider.redis_index.delete()
async def example_per_operation_thread_scope() -> None:
"""Example 2: Per-operation thread scope (memories isolated per thread).
Note: When scope_to_per_operation_thread_id=True, the provider is bound to a single thread
throughout its lifetime. Use the same thread object for all operations with that provider.
"""
print("2. Per-Operation Thread Scope Example:")
print("-" * 40)
client = OpenAIChatClient(
ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
)
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_threads_dynamic",
# overwrite_redis_index=True,
# drop_redis_index=True,
application_id="threads_demo_app",
agent_id="threads_demo_agent",
user_id="threads_demo_user",
scope_to_per_operation_thread_id=True, # Isolate memories per thread
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
agent = client.create_agent(
name="ScopedMemoryAssistant",
instructions="You are an assistant with thread-scoped memory.",
context_providers=provider,
)
# Create a specific thread for this scoped provider
dedicated_thread = agent.get_new_thread()
# Store some information in the dedicated thread
query = "Remember that for this conversation, I'm working on a Python project about data analysis."
print(f"User (dedicated thread): {query}")
result = await agent.run(query, thread=dedicated_thread)
print(f"Agent: {result}\n")
# Test memory retrieval in the same dedicated thread
query = "What project am I working on?"
print(f"User (same dedicated thread): {query}")
result = await agent.run(query, thread=dedicated_thread)
print(f"Agent: {result}\n")
# Store more information in the same thread
query = "Also remember that I prefer using pandas and matplotlib for this project."
print(f"User (same dedicated thread): {query}")
result = await agent.run(query, thread=dedicated_thread)
print(f"Agent: {result}\n")
# Test comprehensive memory retrieval
query = "What do you know about my current project and preferences?"
print(f"User (same dedicated thread): {query}")
result = await agent.run(query, thread=dedicated_thread)
print(f"Agent: {result}\n")
# Clean up the Redis index
await provider.redis_index.delete()
async def example_multiple_agents() -> None:
"""Example 3: Multiple agents with different thread configurations (isolated via agent_id) but within 1 index."""
print("3. Multiple Agents with Different Thread Configurations:")
print("-" * 40)
client = OpenAIChatClient(
ai_model_id=os.getenv("OPENAI_CHAT_MODEL_ID", "gpt-4o-mini"),
api_key=os.getenv("OPENAI_API_KEY"),
)
vectorizer = OpenAITextVectorizer(
model="text-embedding-ada-002",
api_config={"api_key": os.getenv("OPENAI_API_KEY")},
cache=EmbeddingsCache(name="openai_embeddings_cache", redis_url="redis://localhost:6379"),
)
personal_provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_threads_agents",
application_id="threads_demo_app",
agent_id="agent_personal",
user_id="threads_demo_user",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
personal_agent = client.create_agent(
name="PersonalAssistant",
instructions="You are a personal assistant that helps with personal tasks.",
context_providers=personal_provider,
)
work_provider = RedisProvider(
redis_url="redis://localhost:6379",
index_name="redis_threads_agents",
application_id="threads_demo_app",
agent_id="agent_work",
user_id="threads_demo_user",
redis_vectorizer=vectorizer,
vector_field_name="vector",
vector_algorithm="hnsw",
vector_distance_metric="cosine",
)
work_agent = client.create_agent(
name="WorkAssistant",
instructions="You are a work assistant that helps with professional tasks.",
context_providers=work_provider,
)
# Store personal information
query = "Remember that I like to exercise at 6 AM and prefer outdoor activities."
print(f"User to Personal Agent: {query}")
result = await personal_agent.run(query)
print(f"Personal Agent: {result}\n")
# Store work information
query = "Remember that I have team meetings every Tuesday at 2 PM."
print(f"User to Work Agent: {query}")
result = await work_agent.run(query)
print(f"Work Agent: {result}\n")
# Test memory isolation
query = "What do you know about my schedule?"
print(f"User to Personal Agent: {query}")
result = await personal_agent.run(query)
print(f"Personal Agent: {result}\n")
print(f"User to Work Agent: {query}")
result = await work_agent.run(query)
print(f"Work Agent: {result}\n")
# Clean up the Redis index (shared)
await work_provider.redis_index.delete()
async def main() -> None:
print("=== Redis Thread Scoping Examples ===\n")
await example_global_thread_scope()
await example_per_operation_thread_scope()
await example_multiple_agents()
if __name__ == "__main__":
asyncio.run(main())
@@ -8,3 +8,4 @@ This folder contains examples demonstrating different ways to manage conversatio
|------|-------------|
| [`custom_chat_message_store_thread.py`](custom_chat_message_store_thread.py) | Demonstrates how to implement a custom `ChatMessageStore` for persisting conversation history. Shows how to create a custom store with serialization/deserialization capabilities and integrate it with agents for thread management across multiple sessions. |
| [`suspend_resume_thread.py`](suspend_resume_thread.py) | Shows how to suspend and resume conversation threads, allowing you to save the state of a conversation and continue it later. This is useful for long-running conversations or when you need to persist conversation state across application restarts. |
| [`redis_chat_message_store_thread.py`](redis_chat_message_store_thread.py) | Comprehensive examples of using the Redis-backed `RedisChatMessageStore` for persistent conversation storage. Covers basic usage, user session management, conversation persistence across app restarts, thread serialization, and automatic message trimming. Requires Redis server and demonstrates production-ready patterns for scalable chat applications. |
@@ -0,0 +1,318 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from uuid import uuid4
from agent_framework import AgentThread
from agent_framework._threads import deserialize_thread_state
from agent_framework.openai import OpenAIChatClient
from agent_framework.redis import RedisChatMessageStore
async def example_basic_redis_store() -> None:
"""Basic example of using Redis chat message store."""
print("=== Basic Redis Chat Message Store Example ===")
# Create Redis store with auto-generated thread ID
redis_store = RedisChatMessageStore(
redis_url="redis://localhost:6379",
# thread_id will be auto-generated if not provided
)
print(f"Created store with thread ID: {redis_store.thread_id}")
# Create thread with Redis store
thread = AgentThread(message_store=redis_store)
# Create agent
agent = OpenAIChatClient().create_agent(
name="RedisBot",
instructions="You are a helpful assistant that remembers our conversation using Redis.",
)
# Have a conversation
print("\n--- Starting conversation ---")
query1 = "Hello! My name is Alice and I love pizza."
print(f"User: {query1}")
response1 = await agent.run(query1, thread=thread)
print(f"Agent: {response1.text}")
query2 = "What do you remember about me?"
print(f"User: {query2}")
response2 = await agent.run(query2, thread=thread)
print(f"Agent: {response2.text}")
# Show messages are stored in Redis
messages = await redis_store.list_messages()
print(f"\nTotal messages in Redis: {len(messages)}")
# Cleanup
await redis_store.clear()
await redis_store.aclose()
print("Cleaned up Redis data\n")
async def example_user_session_management() -> None:
"""Example of managing user sessions with Redis."""
print("=== User Session Management Example ===")
user_id = "alice_123"
session_id = f"session_{uuid4()}"
# Create Redis store for specific user session
def create_user_session_store():
return RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=f"user_{user_id}_{session_id}",
max_messages=10 # Keep only last 10 messages
)
# Create agent with factory pattern
agent = OpenAIChatClient().create_agent(
name="SessionBot",
instructions="You are a helpful assistant. Keep track of user preferences.",
chat_message_store_factory=create_user_session_store,
)
# Start conversation
thread = agent.get_new_thread()
print(f"Started session for user {user_id}")
if hasattr(thread.message_store, 'thread_id'):
print(f"Thread ID: {thread.message_store.thread_id}") # type: ignore[union-attr]
# Simulate conversation
queries = [
"Hi, I'm Alice and I prefer vegetarian food.",
"What restaurants would you recommend?",
"I also love Italian cuisine.",
"Can you remember my food preferences?"
]
for i, query in enumerate(queries, 1):
print(f"\n--- Message {i} ---")
print(f"User: {query}")
response = await agent.run(query, thread=thread)
print(f"Agent: {response.text}")
# Show persistent storage
if thread.message_store:
messages = await thread.message_store.list_messages() # type: ignore[union-attr]
print(f"\nMessages stored for user {user_id}: {len(messages)}")
# Cleanup
if thread.message_store:
await thread.message_store.clear() # type: ignore[union-attr]
await thread.message_store.aclose() # type: ignore[union-attr]
print("Cleaned up session data\n")
async def example_conversation_persistence() -> None:
"""Example of conversation persistence across application restarts."""
print("=== Conversation Persistence Example ===")
conversation_id = "persistent_chat_001"
# Phase 1: Start conversation
print("--- Phase 1: Starting conversation ---")
store1 = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=conversation_id,
)
thread1 = AgentThread(message_store=store1)
agent = OpenAIChatClient().create_agent(
name="PersistentBot",
instructions="You are a helpful assistant. Remember our conversation history.",
)
# Start conversation
query1 = "Hello! I'm working on a Python project about machine learning."
print(f"User: {query1}")
response1 = await agent.run(query1, thread=thread1)
print(f"Agent: {response1.text}")
query2 = "I'm specifically interested in neural networks."
print(f"User: {query2}")
response2 = await agent.run(query2, thread=thread1)
print(f"Agent: {response2.text}")
print(f"Stored {len(await store1.list_messages())} messages in Redis")
await store1.aclose()
# Phase 2: Resume conversation (simulating app restart)
print("\n--- Phase 2: Resuming conversation (after 'restart') ---")
store2 = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id=conversation_id, # Same thread ID
)
thread2 = AgentThread(message_store=store2)
# Continue conversation - agent should remember context
query3 = "What was I working on before?"
print(f"User: {query3}")
response3 = await agent.run(query3, thread=thread2)
print(f"Agent: {response3.text}")
query4 = "Can you suggest some Python libraries for neural networks?"
print(f"User: {query4}")
response4 = await agent.run(query4, thread=thread2)
print(f"Agent: {response4.text}")
print(f"Total messages after resuming: {len(await store2.list_messages())}")
# Cleanup
await store2.clear()
await store2.aclose()
print("Cleaned up persistent data\n")
async def example_thread_serialization() -> None:
"""Example of thread state serialization and deserialization."""
print("=== Thread Serialization Example ===")
# Create initial thread with Redis store
original_store = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id="serialization_test",
max_messages=50,
)
original_thread = AgentThread(message_store=original_store)
agent = OpenAIChatClient().create_agent(
name="SerializationBot",
instructions="You are a helpful assistant.",
)
# Have initial conversation
print("--- Initial conversation ---")
query1 = "Hello! I'm testing serialization."
print(f"User: {query1}")
response1 = await agent.run(query1, thread=original_thread)
print(f"Agent: {response1.text}")
# Serialize thread state
serialized_thread = await original_thread.serialize()
print(f"\nSerialized thread state: {serialized_thread}")
# Close original connection
await original_store.aclose()
# Deserialize thread state (simulating loading from database/file)
print("\n--- Deserializing thread state ---")
# Create a new thread with the same Redis store type
# This ensures the correct store type is used for deserialization
restored_store = RedisChatMessageStore(redis_url="redis://localhost:6379")
restored_thread = AgentThread(message_store=restored_store)
# Deserialize the thread state into the properly typed thread
await deserialize_thread_state(restored_thread, serialized_thread)
# Continue conversation with restored thread
query2 = "Do you remember what I said about testing?"
print(f"User: {query2}")
response2 = await agent.run(query2, thread=restored_thread)
print(f"Agent: {response2.text}")
# Cleanup
if restored_thread.message_store:
await restored_thread.message_store.clear() # type: ignore[union-attr]
await restored_thread.message_store.aclose() # type: ignore[union-attr]
print("Cleaned up serialization test data\n")
async def example_message_limits() -> None:
"""Example of automatic message trimming with limits."""
print("=== Message Limits Example ===")
# Create store with small message limit
store = RedisChatMessageStore(
redis_url="redis://localhost:6379",
thread_id="limits_test",
max_messages=3, # Keep only 3 most recent messages
)
thread = AgentThread(message_store=store)
agent = OpenAIChatClient().create_agent(
name="LimitBot",
instructions="You are a helpful assistant with limited memory.",
)
# Send multiple messages to test trimming
messages = [
"Message 1: Hello!",
"Message 2: How are you?",
"Message 3: What's the weather?",
"Message 4: Tell me a joke.",
"Message 5: This should trigger trimming.",
]
for i, query in enumerate(messages, 1):
print(f"\n--- Sending message {i} ---")
print(f"User: {query}")
response = await agent.run(query, thread=thread)
print(f"Agent: {response.text}")
stored_messages = await store.list_messages()
print(f"Messages in store: {len(stored_messages)}")
if len(stored_messages) > 0:
print(f"Oldest message: {stored_messages[0].text[:30]}...")
# Final check
final_messages = await store.list_messages()
print(f"\nFinal message count: {len(final_messages)} (should be <= 6: 3 messages × 2 per exchange)")
# Cleanup
await store.clear()
await store.aclose()
print("Cleaned up limits test data\n")
async def main() -> None:
"""Run all Redis chat message store examples."""
print("Redis Chat Message Store Examples")
print("=" * 50)
print("Prerequisites:")
print("- Redis server running on localhost:6379")
print("- OPENAI_API_KEY environment variable set")
print("=" * 50)
# Check prerequisites
if not os.getenv("OPENAI_API_KEY"):
print("ERROR: OPENAI_API_KEY environment variable not set")
return
try:
# Test Redis connection
test_store = RedisChatMessageStore(redis_url="redis://localhost:6379")
connection_ok = await test_store.ping()
await test_store.aclose()
if not connection_ok:
raise Exception("Redis ping failed")
print("✓ Redis connection successful\n")
except Exception as e:
print(f"ERROR: Cannot connect to Redis: {e}")
print("Please ensure Redis is running on localhost:6379")
return
try:
# Run all examples
await example_basic_redis_store()
await example_user_session_management()
await example_conversation_persistence()
await example_thread_serialization()
await example_message_limits()
print("All examples completed successfully!")
except Exception as e:
print(f"Error running examples: {e}")
raise
if __name__ == "__main__":
asyncio.run(main())
+122 -3
View File
@@ -33,6 +33,7 @@ members = [
"agent-framework-lab-tau2",
"agent-framework-mem0",
"agent-framework-project",
"agent-framework-redis",
"agent-framework-runtime",
]
@@ -70,6 +71,7 @@ all = [
{ name = "agent-framework-devui", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-foundry", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-mem0", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-runtime", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
azure = [
@@ -84,6 +86,9 @@ foundry = [
mem0 = [
{ name = "agent-framework-mem0", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
redis = [
{ name = "agent-framework-redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
runtime = [
{ name = "agent-framework-runtime", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
@@ -101,6 +106,8 @@ requires-dist = [
{ name = "agent-framework-foundry", marker = "extra == 'foundry'", editable = "packages/foundry" },
{ name = "agent-framework-mem0", marker = "extra == 'all'", editable = "packages/mem0" },
{ name = "agent-framework-mem0", marker = "extra == 'mem0'", editable = "packages/mem0" },
{ name = "agent-framework-redis", marker = "extra == 'all'", editable = "packages/redis" },
{ name = "agent-framework-redis", marker = "extra == 'redis'", editable = "packages/redis" },
{ name = "agent-framework-runtime", marker = "extra == 'all'", editable = "packages/runtime" },
{ name = "agent-framework-runtime", marker = "extra == 'runtime'", editable = "packages/runtime" },
{ name = "aiofiles", specifier = ">=24.1.0" },
@@ -117,7 +124,7 @@ requires-dist = [
{ name = "pydantic-settings", specifier = ">=2.10.1" },
{ name = "typing-extensions", specifier = ">=4.14.0" },
]
provides-extras = ["azure", "foundry", "viz", "runtime", "mem0", "devui", "all"]
provides-extras = ["azure", "foundry", "redis", "viz", "runtime", "mem0", "devui", "all"]
[[package]]
name = "agent-framework-azure"
@@ -286,8 +293,8 @@ dependencies = [
{ name = "agent-framework-copilotstudio", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-devui", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-foundry", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-lab-gaia", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-mem0", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
[package.dev-dependencies]
@@ -321,8 +328,8 @@ requires-dist = [
{ name = "agent-framework-copilotstudio", editable = "packages/copilotstudio" },
{ name = "agent-framework-devui", editable = "packages/devui" },
{ name = "agent-framework-foundry", editable = "packages/foundry" },
{ name = "agent-framework-lab-gaia", editable = "packages/lab/gaia" },
{ name = "agent-framework-mem0", editable = "packages/mem0" },
{ name = "agent-framework-redis", editable = "packages/redis" },
]
[package.metadata.requires-dev]
@@ -349,6 +356,26 @@ docs = [
{ name = "py2docfx", specifier = ">=0.1.20" },
]
[[package]]
name = "agent-framework-redis"
version = "0.0.0b1"
source = { editable = "packages/redis" }
dependencies = [
{ name = "agent-framework", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" },
{ name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.11' and sys_platform == 'darwin') or (python_full_version >= '3.11' and sys_platform == 'linux') or (python_full_version >= '3.11' and sys_platform == 'win32')" },
{ name = "redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "redisvl", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
[package.metadata]
requires-dist = [
{ name = "agent-framework", editable = "packages/main" },
{ name = "numpy", specifier = ">=2.2.6" },
{ name = "redis", specifier = ">=6.4.0" },
{ name = "redisvl", specifier = ">=0.8.2" },
]
[[package]]
name = "agent-framework-runtime"
version = "0.1.0b1"
@@ -1987,6 +2014,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/e8/685f47e0d754320684db4425a0967f7d3fa70126bffd76110b7009a0090f/joblib-1.5.2-py3-none-any.whl", hash = "sha256:4e1f0bdbb987e6d843c70cf43714cb276623def372df3c22fe5266b2670bc241", size = 308396, upload-time = "2025-08-27T12:15:45.188Z" },
]
[[package]]
name = "jsonpath-ng"
version = "1.7.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "ply", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/6d/86/08646239a313f895186ff0a4573452038eed8c86f54380b3ebac34d32fb2/jsonpath-ng-1.7.0.tar.gz", hash = "sha256:f6f5f7fd4e5ff79c785f1573b394043b39849fb2bb47bcead935d12b00beab3c", size = 37838, upload-time = "2024-10-11T15:41:42.404Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/35/5a/73ecb3d82f8615f32ccdadeb9356726d6cae3a4bbc840b437ceb95708063/jsonpath_ng-1.7.0-py3-none-any.whl", hash = "sha256:f3d7f9e848cba1b6da28c55b1c26ff915dc9e0b1ba7e752a53d6da8d5cbd00b6", size = 30105, upload-time = "2024-11-20T17:58:30.418Z" },
]
[[package]]
name = "jsonschema"
version = "4.25.1"
@@ -2424,6 +2463,48 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/9d/d4/4e0af83b1461ca71c8756efe79b294e10e03f38d36b99c631cf477269033/microsoft_agents_hosting_core-0.3.2-py3-none-any.whl", hash = "sha256:4ad78b4856c59f99319bb92277bb55f61cfc9c37990a4bd614ca9eec7f311614", size = 94546, upload-time = "2025-09-17T20:13:32.946Z" },
]
[[package]]
name = "ml-dtypes"
version = "0.5.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" },
{ name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.11' and sys_platform == 'darwin') or (python_full_version >= '3.11' and sys_platform == 'linux') or (python_full_version >= '3.11' and sys_platform == 'win32')" },
]
sdist = { url = "https://files.pythonhosted.org/packages/78/a7/aad060393123cfb383956dca68402aff3db1e1caffd5764887ed5153f41b/ml_dtypes-0.5.3.tar.gz", hash = "sha256:95ce33057ba4d05df50b1f3cfefab22e351868a843b3b15a46c65836283670c9", size = 692316, upload-time = "2025-07-29T18:39:19.454Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ac/bb/1f32124ab6d3a279ea39202fe098aea95b2d81ef0ce1d48612b6bf715e82/ml_dtypes-0.5.3-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:0a1d68a7cb53e3f640b2b6a34d12c0542da3dd935e560fdf463c0c77f339fc20", size = 667409, upload-time = "2025-07-29T18:38:17.321Z" },
{ url = "https://files.pythonhosted.org/packages/1d/ac/e002d12ae19136e25bb41c7d14d7e1a1b08f3c0e99a44455ff6339796507/ml_dtypes-0.5.3-cp310-cp310-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0cd5a6c711b5350f3cbc2ac28def81cd1c580075ccb7955e61e9d8f4bfd40d24", size = 4960702, upload-time = "2025-07-29T18:38:19.616Z" },
{ url = "https://files.pythonhosted.org/packages/dd/12/79e9954e6b3255a4b1becb191a922d6e2e94d03d16a06341ae9261963ae8/ml_dtypes-0.5.3-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bdcf26c2dbc926b8a35ec8cbfad7eff1a8bd8239e12478caca83a1fc2c400dc2", size = 4933471, upload-time = "2025-07-29T18:38:21.809Z" },
{ url = "https://files.pythonhosted.org/packages/d5/aa/d1eff619e83cd1ddf6b561d8240063d978e5d887d1861ba09ef01778ec3a/ml_dtypes-0.5.3-cp310-cp310-win_amd64.whl", hash = "sha256:aecbd7c5272c82e54d5b99d8435fd10915d1bc704b7df15e4d9ca8dc3902be61", size = 206330, upload-time = "2025-07-29T18:38:23.663Z" },
{ url = "https://files.pythonhosted.org/packages/af/f1/720cb1409b5d0c05cff9040c0e9fba73fa4c67897d33babf905d5d46a070/ml_dtypes-0.5.3-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:4a177b882667c69422402df6ed5c3428ce07ac2c1f844d8a1314944651439458", size = 667412, upload-time = "2025-07-29T18:38:25.275Z" },
{ url = "https://files.pythonhosted.org/packages/6a/d5/05861ede5d299f6599f86e6bc1291714e2116d96df003cfe23cc54bcc568/ml_dtypes-0.5.3-cp311-cp311-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9849ce7267444c0a717c80c6900997de4f36e2815ce34ac560a3edb2d9a64cd2", size = 4964606, upload-time = "2025-07-29T18:38:27.045Z" },
{ url = "https://files.pythonhosted.org/packages/db/dc/72992b68de367741bfab8df3b3fe7c29f982b7279d341aa5bf3e7ef737ea/ml_dtypes-0.5.3-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c3f5ae0309d9f888fd825c2e9d0241102fadaca81d888f26f845bc8c13c1e4ee", size = 4938435, upload-time = "2025-07-29T18:38:29.193Z" },
{ url = "https://files.pythonhosted.org/packages/81/1c/d27a930bca31fb07d975a2d7eaf3404f9388114463b9f15032813c98f893/ml_dtypes-0.5.3-cp311-cp311-win_amd64.whl", hash = "sha256:58e39349d820b5702bb6f94ea0cb2dc8ec62ee81c0267d9622067d8333596a46", size = 206334, upload-time = "2025-07-29T18:38:30.687Z" },
{ url = "https://files.pythonhosted.org/packages/1a/d8/6922499effa616012cb8dc445280f66d100a7ff39b35c864cfca019b3f89/ml_dtypes-0.5.3-cp311-cp311-win_arm64.whl", hash = "sha256:66c2756ae6cfd7f5224e355c893cfd617fa2f747b8bbd8996152cbdebad9a184", size = 157584, upload-time = "2025-07-29T18:38:32.187Z" },
{ url = "https://files.pythonhosted.org/packages/0d/eb/bc07c88a6ab002b4635e44585d80fa0b350603f11a2097c9d1bfacc03357/ml_dtypes-0.5.3-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:156418abeeda48ea4797db6776db3c5bdab9ac7be197c1233771e0880c304057", size = 663864, upload-time = "2025-07-29T18:38:33.777Z" },
{ url = "https://files.pythonhosted.org/packages/cf/89/11af9b0f21b99e6386b6581ab40fb38d03225f9de5f55cf52097047e2826/ml_dtypes-0.5.3-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1db60c154989af253f6c4a34e8a540c2c9dce4d770784d426945e09908fbb177", size = 4951313, upload-time = "2025-07-29T18:38:36.45Z" },
{ url = "https://files.pythonhosted.org/packages/d8/a9/b98b86426c24900b0c754aad006dce2863df7ce0bb2bcc2c02f9cc7e8489/ml_dtypes-0.5.3-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:1b255acada256d1fa8c35ed07b5f6d18bc21d1556f842fbc2d5718aea2cd9e55", size = 4928805, upload-time = "2025-07-29T18:38:38.29Z" },
{ url = "https://files.pythonhosted.org/packages/50/c1/85e6be4fc09c6175f36fb05a45917837f30af9a5146a5151cb3a3f0f9e09/ml_dtypes-0.5.3-cp312-cp312-win_amd64.whl", hash = "sha256:da65e5fd3eea434ccb8984c3624bc234ddcc0d9f4c81864af611aaebcc08a50e", size = 208182, upload-time = "2025-07-29T18:38:39.72Z" },
{ url = "https://files.pythonhosted.org/packages/9e/17/cf5326d6867be057f232d0610de1458f70a8ce7b6290e4b4a277ea62b4cd/ml_dtypes-0.5.3-cp312-cp312-win_arm64.whl", hash = "sha256:8bb9cd1ce63096567f5f42851f5843b5a0ea11511e50039a7649619abfb4ba6d", size = 161560, upload-time = "2025-07-29T18:38:41.072Z" },
{ url = "https://files.pythonhosted.org/packages/2d/87/1bcc98a66de7b2455dfb292f271452cac9edc4e870796e0d87033524d790/ml_dtypes-0.5.3-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:5103856a225465371fe119f2fef737402b705b810bd95ad5f348e6e1a6ae21af", size = 663781, upload-time = "2025-07-29T18:38:42.984Z" },
{ url = "https://files.pythonhosted.org/packages/fd/2c/bd2a79ba7c759ee192b5601b675b180a3fd6ccf48ffa27fe1782d280f1a7/ml_dtypes-0.5.3-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:4cae435a68861660af81fa3c5af16b70ca11a17275c5b662d9c6f58294e0f113", size = 4956217, upload-time = "2025-07-29T18:38:44.65Z" },
{ url = "https://files.pythonhosted.org/packages/14/f3/091ba84e5395d7fe5b30c081a44dec881cd84b408db1763ee50768b2ab63/ml_dtypes-0.5.3-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:6936283b56d74fbec431ca57ce58a90a908fdbd14d4e2d22eea6d72bb208a7b7", size = 4933109, upload-time = "2025-07-29T18:38:46.405Z" },
{ url = "https://files.pythonhosted.org/packages/bc/24/054036dbe32c43295382c90a1363241684c4d6aaa1ecc3df26bd0c8d5053/ml_dtypes-0.5.3-cp313-cp313-win_amd64.whl", hash = "sha256:d0f730a17cf4f343b2c7ad50cee3bd19e969e793d2be6ed911f43086460096e4", size = 208187, upload-time = "2025-07-29T18:38:48.24Z" },
{ url = "https://files.pythonhosted.org/packages/a6/3d/7dc3ec6794a4a9004c765e0c341e32355840b698f73fd2daff46f128afc1/ml_dtypes-0.5.3-cp313-cp313-win_arm64.whl", hash = "sha256:2db74788fc01914a3c7f7da0763427280adfc9cd377e9604b6b64eb8097284bd", size = 161559, upload-time = "2025-07-29T18:38:50.493Z" },
{ url = "https://files.pythonhosted.org/packages/12/91/e6c7a0d67a152b9330445f9f0cf8ae6eee9b83f990b8c57fe74631e42a90/ml_dtypes-0.5.3-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:93c36a08a6d158db44f2eb9ce3258e53f24a9a4a695325a689494f0fdbc71770", size = 689321, upload-time = "2025-07-29T18:38:52.03Z" },
{ url = "https://files.pythonhosted.org/packages/9e/6c/b7b94b84a104a5be1883305b87d4c6bd6ae781504474b4cca067cb2340ec/ml_dtypes-0.5.3-cp313-cp313t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0e44a3761f64bc009d71ddb6d6c71008ba21b53ab6ee588dadab65e2fa79eafc", size = 5274495, upload-time = "2025-07-29T18:38:53.797Z" },
{ url = "https://files.pythonhosted.org/packages/5b/38/6266604dffb43378055394ea110570cf261a49876fc48f548dfe876f34cc/ml_dtypes-0.5.3-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:bdf40d2aaabd3913dec11840f0d0ebb1b93134f99af6a0a4fd88ffe924928ab4", size = 5285422, upload-time = "2025-07-29T18:38:56.603Z" },
{ url = "https://files.pythonhosted.org/packages/7c/88/8612ff177d043a474b9408f0382605d881eeb4125ba89d4d4b3286573a83/ml_dtypes-0.5.3-cp314-cp314-macosx_10_13_universal2.whl", hash = "sha256:aec640bd94c4c85c0d11e2733bd13cbb10438fb004852996ec0efbc6cacdaf70", size = 661182, upload-time = "2025-07-29T18:38:58.414Z" },
{ url = "https://files.pythonhosted.org/packages/6f/2b/0569a5e88b29240d373e835107c94ae9256fb2191d3156b43b2601859eff/ml_dtypes-0.5.3-cp314-cp314-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:bda32ce212baa724e03c68771e5c69f39e584ea426bfe1a701cb01508ffc7035", size = 4956187, upload-time = "2025-07-29T18:39:00.611Z" },
{ url = "https://files.pythonhosted.org/packages/51/66/273c2a06ae44562b104b61e6b14444da00061fd87652506579d7eb2c40b1/ml_dtypes-0.5.3-cp314-cp314-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c205cac07d24a29840c163d6469f61069ce4b065518519216297fc2f261f8db9", size = 4930911, upload-time = "2025-07-29T18:39:02.405Z" },
{ url = "https://files.pythonhosted.org/packages/93/ab/606be3e87dc0821bd360c8c1ee46108025c31a4f96942b63907bb441b87d/ml_dtypes-0.5.3-cp314-cp314-win_amd64.whl", hash = "sha256:cd7c0bb22d4ff86d65ad61b5dd246812e8993fbc95b558553624c33e8b6903ea", size = 216664, upload-time = "2025-07-29T18:39:03.927Z" },
{ url = "https://files.pythonhosted.org/packages/30/a2/e900690ca47d01dffffd66375c5de8c4f8ced0f1ef809ccd3b25b3e6b8fa/ml_dtypes-0.5.3-cp314-cp314-win_arm64.whl", hash = "sha256:9d55ea7f7baf2aed61bf1872116cefc9d0c3693b45cae3916897ee27ef4b835e", size = 160203, upload-time = "2025-07-29T18:39:05.671Z" },
{ url = "https://files.pythonhosted.org/packages/53/21/783dfb51f40d2660afeb9bccf3612b99f6a803d980d2a09132b0f9d216ab/ml_dtypes-0.5.3-cp314-cp314t-macosx_10_13_universal2.whl", hash = "sha256:e12e29764a0e66a7a31e9b8bf1de5cc0423ea72979f45909acd4292de834ccd3", size = 689324, upload-time = "2025-07-29T18:39:07.567Z" },
{ url = "https://files.pythonhosted.org/packages/09/f7/a82d249c711abf411ac027b7163f285487f5e615c3e0716c61033ce996ab/ml_dtypes-0.5.3-cp314-cp314t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:19f6c3a4f635c2fc9e2aa7d91416bd7a3d649b48350c51f7f715a09370a90d93", size = 5275917, upload-time = "2025-07-29T18:39:09.339Z" },
{ url = "https://files.pythonhosted.org/packages/7f/3c/541c4b30815ab90ebfbb51df15d0b4254f2f9f1e2b4907ab229300d5e6f2/ml_dtypes-0.5.3-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5ab039ffb40f3dc0aeeeba84fd6c3452781b5e15bef72e2d10bcb33e4bbffc39", size = 5285284, upload-time = "2025-07-29T18:39:11.532Z" },
]
[[package]]
name = "msal"
version = "1.33.0"
@@ -3431,6 +3512,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
]
[[package]]
name = "ply"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/e5/69/882ee5c9d017149285cab114ebeab373308ef0f874fcdac9beb90e0ac4da/ply-3.11.tar.gz", hash = "sha256:00c7c1aaa88358b9c765b6d3000c6eec0ba42abca5351b095321aef446081da3", size = 159130, upload-time = "2018-02-15T19:01:31.097Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce", size = 49567, upload-time = "2018-02-15T19:01:27.172Z" },
]
[[package]]
name = "poethepoet"
version = "0.37.0"
@@ -3935,6 +4025,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/45/58/38b5afbc1a800eeea951b9285d3912613f2603bdf897a4ab0f4bd7f405fc/python_multipart-0.0.20-py3-none-any.whl", hash = "sha256:8a62d3a8335e06589fe01f2a3e178cdcc632f3fbe0d492ad9ee0ec35aab1f104", size = 24546, upload-time = "2024-12-16T19:45:44.423Z" },
]
[[package]]
name = "python-ulid"
version = "3.1.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/40/7e/0d6c82b5ccc71e7c833aed43d9e8468e1f2ff0be1b3f657a6fcafbb8433d/python_ulid-3.1.0.tar.gz", hash = "sha256:ff0410a598bc5f6b01b602851a3296ede6f91389f913a5d5f8c496003836f636", size = 93175, upload-time = "2025-08-18T16:09:26.305Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/6c/a0/4ed6632b70a52de845df056654162acdebaf97c20e3212c559ac43e7216e/python_ulid-3.1.0-py3-none-any.whl", hash = "sha256:e2cdc979c8c877029b4b7a38a6fba3bc4578e4f109a308419ff4d3ccf0a46619", size = 11577, upload-time = "2025-08-18T16:09:25.047Z" },
]
[[package]]
name = "pytz"
version = "2025.2"
@@ -4041,6 +4140,26 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/02/89e2ed7e85db6c93dfa9e8f691c5087df4e3551ab39081a4d7c6d1f90e05/redis-6.4.0-py3-none-any.whl", hash = "sha256:f0544fa9604264e9464cdf4814e7d4830f74b165d52f2a330a760a88dd248b7f", size = 279847, upload-time = "2025-08-07T08:10:09.84Z" },
]
[[package]]
name = "redisvl"
version = "0.8.2"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "jsonpath-ng", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "ml-dtypes", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version < '3.11' and sys_platform == 'darwin') or (python_full_version < '3.11' and sys_platform == 'linux') or (python_full_version < '3.11' and sys_platform == 'win32')" },
{ name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "(python_full_version >= '3.11' and sys_platform == 'darwin') or (python_full_version >= '3.11' and sys_platform == 'linux') or (python_full_version >= '3.11' and sys_platform == 'win32')" },
{ name = "pydantic", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "python-ulid", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "pyyaml", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "redis", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "tenacity", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/d6/26/f3a5128d96eeeb5af0fc345156e48971ce0ce99689b62ba01dc855744c61/redisvl-0.8.2.tar.gz", hash = "sha256:3938ddcd093507c4c427cb431ac9faaa8bb999bb2ca116cbd57e4b7334fe18eb", size = 573106, upload-time = "2025-08-26T15:23:40.356Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/0e/16/a9eb70249c518b9b6a19efb32089bda8ecc146bafee360abd375eae7053e/redisvl-0.8.2-py3-none-any.whl", hash = "sha256:67b413387d72849d571723c95fa1183539d6fa60d6ac533513ee8e3e31874600", size = 152593, upload-time = "2025-08-26T15:23:38.393Z" },
]
[[package]]
name = "referencing"
version = "0.36.2"