Python: Add Foundry Memory Context Provider (#3943)

* Initial plan

* Add FoundryMemoryProvider and tests

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Add sample and documentation for FoundryMemoryProvider

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address code review feedback for FoundryMemoryProvider

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Address PR review comments: Add DEFAULT_SOURCE_ID, use logging.getLogger, move state to session.state

Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>

* Fix Foundry memory ItemParam usage and exports

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Refactor provider hook state and standardize source IDs

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Support endpoint-based Foundry memory init

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updated implementation and sample

* updated code and samples

* Fix foundry memory provider tests: mock structure and field names

- Use Mock objects with memory_item.content for memory mocks
- Assert 'content' instead of 'text' on SDK message items
- Update exception types from ServiceInitializationError to ValueError
- Remove unused ServiceInitializationError import

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix mypy errors in foundry memory provider

Add type: ignore[arg-type] for scope (str | None vs str) and items
(list variance) passed to Azure SDK methods.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix import

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: eavanvalkenburg <13749212+eavanvalkenburg@users.noreply.github.com>
Co-authored-by: eavanvalkenburg <github@vanvalkenburg.eu>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Copilot
2026-02-24 07:05:53 +01:00
committed by GitHub
Unverified
parent de612c47f5
commit 7b24d9160d
8 changed files with 956 additions and 0 deletions
@@ -1114,6 +1114,7 @@ Defaults introduced by this change:
- `RedisContextProvider.DEFAULT_SOURCE_ID = "redis"`
- `RedisHistoryProvider.DEFAULT_SOURCE_ID = "redis_memory"`
- `AzureAISearchContextProvider.DEFAULT_SOURCE_ID = "azure_ai_search"`
- `FoundryMemoryProvider.DEFAULT_SOURCE_ID = "foundry_memory"`
## Comparison to .NET Implementation
+17
View File
@@ -6,4 +6,21 @@ Please install this package via pip:
pip install agent-framework-azure-ai --pre
```
## Foundry Memory Context Provider
The Foundry Memory context provider enables semantic memory capabilities for your agents using Azure AI Foundry Memory Store. It automatically:
- Retrieves static (user profile) memories on first run
- Searches for contextual memories based on conversation
- Updates the memory store with new conversation messages
### Basic Usage Example
See the [Foundry Memory example](../../samples/02-agents/context_providers/azure_ai_foundry_memory.py) which demonstrates:
- Creating a memory store using Azure AI Projects client
- Setting up an agent with FoundryMemoryProvider
- Teaching the agent user preferences
- Retrieving information using remembered context across conversations
- Automatic memory updates with configurable delays
and see the [README](https://github.com/microsoft/agent-framework/tree/main/python/README.md) for more information.
@@ -5,6 +5,7 @@ import importlib.metadata
from ._agent_provider import AzureAIAgentsProvider
from ._chat_client import AzureAIAgentClient, AzureAIAgentOptions
from ._client import AzureAIClient, AzureAIProjectAgentOptions, RawAzureAIClient
from ._foundry_memory_provider import FoundryMemoryProvider
from ._project_provider import AzureAIProjectAgentProvider
from ._shared import AzureAISettings
@@ -21,6 +22,7 @@ __all__ = [
"AzureAIProjectAgentOptions",
"AzureAIProjectAgentProvider",
"AzureAISettings",
"FoundryMemoryProvider",
"RawAzureAIClient",
"__version__",
]
@@ -0,0 +1,256 @@
# Copyright (c) Microsoft. All rights reserved.
"""Foundry Memory Context Provider using BaseContextProvider.
This module provides ``FoundryMemoryProvider``, built on
:class:`BaseContextProvider`.
"""
from __future__ import annotations
import logging
import sys
from contextlib import AbstractAsyncContextManager
from typing import TYPE_CHECKING, Any, ClassVar
from agent_framework import AGENT_FRAMEWORK_USER_AGENT, Message
from agent_framework._sessions import AgentSession, BaseContextProvider, SessionContext
from agent_framework._settings import load_settings
from agent_framework.azure._entra_id_authentication import AzureCredentialTypes
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import ItemParam, ResponsesAssistantMessageItemParam, ResponsesUserMessageItemParam
from ._shared import AzureAISettings
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
else:
from typing_extensions import Self # pragma: no cover
if TYPE_CHECKING:
from agent_framework._agents import SupportsAgentRun
logger = logging.getLogger(__name__)
class FoundryMemoryProvider(BaseContextProvider):
"""Foundry Memory context provider using the new BaseContextProvider hooks pattern.
Integrates Azure AI Foundry Memory Store for persistent semantic memory,
searching and storing memories via the Azure AI Projects SDK.
Args:
source_id: Unique identifier for this provider instance.
project_client: Azure AI Project client for memory operations.
memory_store_name: The name of the memory store to use.
scope: The namespace that logically groups and isolates memories (e.g., user ID).
context_prompt: The prompt to prepend to retrieved memories.
update_delay: Timeout period before processing memory update in seconds.
Defaults to 300 (5 minutes). Set to 0 to immediately trigger updates.
"""
DEFAULT_SOURCE_ID: ClassVar[str] = "foundry_memory"
DEFAULT_CONTEXT_PROMPT = "## Memories\nConsider the following memories when answering user questions:"
def __init__(
self,
source_id: str = DEFAULT_SOURCE_ID,
*,
project_client: AIProjectClient | None = None,
project_endpoint: str | None = None,
credential: AzureCredentialTypes | None = None,
memory_store_name: str,
scope: str | None = None,
context_prompt: str | None = None,
update_delay: int = 300,
env_file_path: str | None = None,
env_file_encoding: str | None = None,
) -> None:
"""Initialize the Foundry Memory context provider.
Args:
source_id: Unique identifier for this provider instance.
project_client: Azure AI Project client for memory operations.
project_endpoint: Azure AI project endpoint URL. Used when project_client is not provided.
credential: Azure credential for authentication. Accepts a TokenCredential,
AsyncTokenCredential, or a callable token provider.
Required when project_client is not provided.
memory_store_name: The name of the memory store to use.
scope: The namespace that logically groups and isolates memories (e.g., user ID).
If None, `session_id` will be used.
context_prompt: The prompt to prepend to retrieved memories.
update_delay: Timeout period before processing memory update in seconds.
env_file_path: Path to environment file for loading settings.
env_file_encoding: Encoding of the environment file.
"""
super().__init__(source_id)
azure_ai_settings = load_settings(
AzureAISettings,
env_prefix="AZURE_AI_",
project_endpoint=project_endpoint,
env_file_path=env_file_path,
env_file_encoding=env_file_encoding,
)
if project_client is None:
resolved_endpoint = azure_ai_settings.get("project_endpoint")
if not resolved_endpoint:
raise ValueError(
"Azure AI project endpoint is required. Set via 'project_endpoint' parameter "
"or 'AZURE_AI_PROJECT_ENDPOINT' environment variable."
)
if not credential:
raise ValueError("Azure credential is required when project_client is not provided.")
project_client = AIProjectClient(
endpoint=resolved_endpoint,
credential=credential, # type: ignore[arg-type]
user_agent=AGENT_FRAMEWORK_USER_AGENT,
)
if not memory_store_name:
raise ValueError("memory_store_name is required")
if not scope:
raise ValueError("scope is required")
self.project_client = project_client
self.memory_store_name = memory_store_name
self.scope = scope
self.context_prompt = context_prompt or self.DEFAULT_CONTEXT_PROMPT
self.update_delay = update_delay
async def __aenter__(self) -> Self:
"""Async context manager entry."""
if self.project_client and isinstance(self.project_client, AbstractAsyncContextManager):
await self.project_client.__aenter__()
return self
async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
"""Async context manager exit."""
if self.project_client and isinstance(self.project_client, AbstractAsyncContextManager):
await self.project_client.__aexit__(exc_type, exc_val, exc_tb)
# -- Hooks pattern ---------------------------------------------------------
async def before_run(
self,
*,
agent: SupportsAgentRun,
session: AgentSession,
context: SessionContext,
state: dict[str, Any],
) -> None:
"""Search Foundry Memory for relevant memories and add to the session context.
This method:
1. Retrieves static memories (user profile) on first call per session
2. Searches for contextual memories based on input messages
3. Combines and injects memories into the context
"""
# On first run, retrieve static memories (user profile memories)
if not state.get("initialized"):
try:
static_search_result = await self.project_client.memory_stores.search_memories(
name=self.memory_store_name,
scope=self.scope or context.session_id, # type: ignore[arg-type]
)
static_memories = [{"content": memory.memory_item.content} for memory in static_search_result.memories]
state["static_memories"] = static_memories
except Exception as e:
# Log but don't fail - memory retrieval is non-critical
logger.warning(f"Failed to retrieve static memories: {e}")
state["static_memories"] = []
finally:
# Mark as initialized regardless of success to avoid repeated attempts
state["initialized"] = True
# Search for contextual memories based on input messages
# Check if there are any non-empty input messages
has_input = any(msg and msg.text and msg.text.strip() for msg in context.input_messages)
if not has_input:
return
# Convert input messages to ItemParam format for search
items = [
ItemParam({"type": "text", "text": msg.text})
for msg in context.input_messages
if msg and msg.text and msg.text.strip()
]
try:
search_result = await self.project_client.memory_stores.search_memories(
name=self.memory_store_name,
scope=self.scope or context.session_id, # type: ignore[arg-type]
items=items,
previous_search_id=state.get("previous_search_id"),
)
# Extract search_id for next incremental search
if search_result.memories:
state["previous_search_id"] = search_result.search_id
# Combine static and contextual memories
contextual_memories = [{"content": memory.memory_item.content} for memory in search_result.memories]
all_memories = state.get("static_memories", []) + contextual_memories
# Inject memories into context
if all_memories:
line_separated_memories = "\n".join(
str(memory.get("content", "")) for memory in all_memories if memory.get("content")
)
if line_separated_memories:
context.extend_messages(
self.source_id,
[Message(role="user", text=f"{self.context_prompt}\n{line_separated_memories}")],
)
except Exception as e:
# Log but don't fail - memory retrieval is non-critical
logger.warning(f"Failed to search contextual memories: {e}")
async def after_run(
self,
*,
agent: SupportsAgentRun,
session: AgentSession,
context: SessionContext,
state: dict[str, Any],
) -> None:
"""Store request/response messages to Foundry Memory for future retrieval.
This method updates the memory store with conversation messages.
The update is debounced by the configured update_delay.
"""
messages_to_store: list[Message] = list(context.input_messages)
if context.response and context.response.messages:
messages_to_store.extend(context.response.messages)
# Filter and convert messages to ItemParam format
items: list[ResponsesUserMessageItemParam | ResponsesAssistantMessageItemParam] = []
for message in messages_to_store:
if message.role in {"user", "assistant", "system"} and message.text and message.text.strip():
if message.role == "user":
items.append(ResponsesUserMessageItemParam(content=message.text))
elif message.role == "assistant":
items.append(ResponsesAssistantMessageItemParam(content=message.text))
if not items:
return
try:
# Fire and forget - don't wait for the update to complete
update_poller = await self.project_client.memory_stores.begin_update_memories(
name=self.memory_store_name,
scope=self.scope or context.session_id, # type: ignore[arg-type]
items=items, # type: ignore[arg-type]
previous_update_id=state.get("previous_update_id"),
update_delay=self.update_delay,
)
# Store the update_id for next incremental update
state["previous_update_id"] = update_poller.update_id
except Exception as e:
# Log but don't fail - memory storage is non-critical
logger.warning(f"Failed to update memories: {e}")
__all__ = ["FoundryMemoryProvider"]
@@ -0,0 +1,504 @@
# Copyright (c) Microsoft. All rights reserved.
# pyright: reportPrivateUsage=false
from __future__ import annotations
import os
from unittest.mock import AsyncMock, Mock, patch
import pytest
from agent_framework import AGENT_FRAMEWORK_USER_AGENT, AgentResponse, Message
from agent_framework._sessions import AgentSession, SessionContext
from agent_framework_azure_ai._foundry_memory_provider import FoundryMemoryProvider
@pytest.fixture
def mock_project_client() -> AsyncMock:
"""Create a mock AIProjectClient."""
mock_client = AsyncMock()
mock_client.memory_stores = AsyncMock()
mock_client.memory_stores.search_memories = AsyncMock()
mock_client.memory_stores.begin_update_memories = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
return mock_client
@pytest.fixture
def mock_credential() -> Mock:
"""Create a mock Azure credential."""
return Mock()
# -- Initialization tests ------------------------------------------------------
class TestInit:
"""Test FoundryMemoryProvider initialization."""
def test_init_with_all_params(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
source_id="custom_source",
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
context_prompt="Custom prompt",
update_delay=60,
)
assert provider.source_id == "custom_source"
assert provider.project_client is mock_project_client
assert provider.memory_store_name == "test_store"
assert provider.scope == "user_123"
assert provider.context_prompt == "Custom prompt"
assert provider.update_delay == 60
def test_init_default_source_id(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
assert provider.source_id == FoundryMemoryProvider.DEFAULT_SOURCE_ID
def test_init_default_context_prompt(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
assert provider.context_prompt == FoundryMemoryProvider.DEFAULT_CONTEXT_PROMPT
def test_init_default_update_delay(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
assert provider.update_delay == 300
def test_init_with_project_endpoint_and_credential(
self, mock_project_client: AsyncMock, mock_credential: Mock
) -> None:
with patch("agent_framework_azure_ai._foundry_memory_provider.AIProjectClient") as mock_ai_project_client:
mock_ai_project_client.return_value = mock_project_client
provider = FoundryMemoryProvider(
project_endpoint="https://test.project.endpoint",
credential=mock_credential, # type: ignore[arg-type]
memory_store_name="test_store",
scope="user_123",
)
assert provider.project_client is mock_project_client
mock_ai_project_client.assert_called_once_with(
endpoint="https://test.project.endpoint",
credential=mock_credential,
user_agent=AGENT_FRAMEWORK_USER_AGENT,
)
def test_init_requires_project_endpoint_without_project_client(self) -> None:
with (
patch("agent_framework_azure_ai._foundry_memory_provider.load_settings") as mock_load_settings,
patch.dict(os.environ, {}, clear=True),
pytest.raises(ValueError, match="project endpoint is required"),
):
mock_load_settings.return_value = {"project_endpoint": None}
FoundryMemoryProvider(
memory_store_name="test_store",
scope="user_123",
)
def test_init_requires_credential_without_project_client(self) -> None:
with pytest.raises(ValueError, match="Azure credential is required"):
FoundryMemoryProvider(
project_endpoint="https://test.project.endpoint",
memory_store_name="test_store",
scope="user_123",
)
def test_init_requires_memory_store_name(self, mock_project_client: AsyncMock) -> None:
with pytest.raises(ValueError, match="memory_store_name is required"):
FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="",
scope="user_123",
)
def test_init_requires_scope(self, mock_project_client: AsyncMock) -> None:
with pytest.raises(ValueError, match="scope is required"):
FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="",
)
# -- before_run tests ----------------------------------------------------------
class TestBeforeRun:
"""Test before_run hook."""
async def test_retrieves_static_memories_on_first_run(self, mock_project_client: AsyncMock) -> None:
"""First call retrieves static (user profile) memories."""
mem1 = Mock()
mem1.memory_item.content = "User prefers Python"
mem2 = Mock()
mem2.memory_item.content = "User is based in Seattle"
mock_search_result = Mock()
mock_search_result.memories = [mem1, mem2]
mock_project_client.memory_stores.search_memories.return_value = mock_search_result
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="Hello")], session_id="s1")
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
# Should call search_memories twice: once for static, once for contextual
assert mock_project_client.memory_stores.search_memories.call_count == 2
# Static memories should be cached
assert len(session.state[provider.source_id]["static_memories"]) == 2
assert session.state[provider.source_id]["initialized"] is True
async def test_contextual_memories_added_to_context(self, mock_project_client: AsyncMock) -> None:
"""Contextual search returns memories → messages added to context with prompt."""
# Mock static search (first call)
static_mem = Mock()
static_mem.memory_item.content = "User prefers Python"
static_result = Mock()
static_result.memories = [static_mem]
# Mock contextual search (second call)
contextual_mem = Mock()
contextual_mem.memory_item.content = "Last discussed async patterns"
contextual_result = Mock()
contextual_result.memories = [contextual_mem]
contextual_result.search_id = "search-123"
mock_project_client.memory_stores.search_memories.side_effect = [static_result, contextual_result]
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="Hello")], session_id="s1")
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
# Check that memories were added to context
assert provider.source_id in ctx.context_messages
added = ctx.context_messages[provider.source_id]
assert len(added) == 1
assert "User prefers Python" in added[0].text # type: ignore[operator]
assert "Last discussed async patterns" in added[0].text # type: ignore[operator]
assert provider.context_prompt in added[0].text # type: ignore[operator]
assert session.state[provider.source_id]["previous_search_id"] == "search-123"
async def test_empty_input_skips_contextual_search(self, mock_project_client: AsyncMock) -> None:
"""Empty input messages → only static search performed, no contextual search."""
static_result = Mock()
static_result.memories = []
mock_project_client.memory_stores.search_memories.return_value = static_result
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="")], session_id="s1")
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
# Should only call search_memories once for static memories
assert mock_project_client.memory_stores.search_memories.call_count == 1
assert provider.source_id not in ctx.context_messages
async def test_empty_search_results_no_messages(self, mock_project_client: AsyncMock) -> None:
"""Empty search results → no messages added."""
mock_search_result = Mock()
mock_search_result.memories = []
mock_project_client.memory_stores.search_memories.return_value = mock_search_result
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="test")], session_id="s1")
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
assert provider.source_id not in ctx.context_messages
async def test_static_memories_only_retrieved_once(self, mock_project_client: AsyncMock) -> None:
"""Static memories are only retrieved on the first call."""
static_mem = Mock()
static_mem.memory_item.content = "Static memory"
static_result = Mock()
static_result.memories = [static_mem]
contextual_result = Mock()
contextual_result.memories = []
mock_project_client.memory_stores.search_memories.side_effect = [static_result, contextual_result]
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="Hello")], session_id="s1")
# First call
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
assert mock_project_client.memory_stores.search_memories.call_count == 2
# Reset mock for second call
mock_project_client.memory_stores.search_memories.reset_mock()
contextual_result2 = Mock()
contextual_result2.memories = []
mock_project_client.memory_stores.search_memories.return_value = contextual_result2
# Second call - should only search contextual, not static
ctx2 = SessionContext(input_messages=[Message(role="user", text="World")], session_id="s1")
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx2, state=session.state.setdefault(provider.source_id, {})
)
assert mock_project_client.memory_stores.search_memories.call_count == 1
async def test_handles_search_exception_gracefully(self, mock_project_client: AsyncMock) -> None:
"""Search exception is logged but doesn't fail the operation."""
mock_project_client.memory_stores.search_memories.side_effect = Exception("API error")
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="Hello")], session_id="s1")
# Should not raise exception
await provider.before_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
# No memories added
assert provider.source_id not in ctx.context_messages
# -- after_run tests -----------------------------------------------------------
class TestAfterRun:
"""Test after_run hook."""
async def test_stores_input_and_response(self, mock_project_client: AsyncMock) -> None:
"""Stores input+response messages via begin_update_memories."""
mock_poller = Mock()
mock_poller.update_id = "update-456"
mock_project_client.memory_stores.begin_update_memories.return_value = mock_poller
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="question")], session_id="s1")
ctx._response = AgentResponse(messages=[Message(role="assistant", text="answer")])
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
mock_project_client.memory_stores.begin_update_memories.assert_awaited_once()
call_kwargs = mock_project_client.memory_stores.begin_update_memories.call_args.kwargs
assert call_kwargs["name"] == "test_store"
assert call_kwargs["scope"] == "user_123"
assert len(call_kwargs["items"]) == 2
assert call_kwargs["items"][0]["content"] == "question"
assert call_kwargs["items"][1]["content"] == "answer"
assert session.state[provider.source_id]["previous_update_id"] == "update-456"
async def test_only_stores_user_assistant_system(self, mock_project_client: AsyncMock) -> None:
"""Only stores user/assistant/system messages with text."""
mock_poller = Mock()
mock_project_client.memory_stores.begin_update_memories.return_value = mock_poller
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(
input_messages=[
Message(role="user", text="hello"),
Message(role="tool", text="tool output"),
],
session_id="s1",
)
ctx._response = AgentResponse(messages=[Message(role="assistant", text="reply")])
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
call_kwargs = mock_project_client.memory_stores.begin_update_memories.call_args.kwargs
items = call_kwargs["items"]
assert len(items) == 2
assert items[0]["content"] == "hello"
assert items[1]["content"] == "reply"
async def test_skips_empty_messages(self, mock_project_client: AsyncMock) -> None:
"""Skips messages with empty text."""
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(
input_messages=[
Message(role="user", text=""),
Message(role="user", text=" "),
],
session_id="s1",
)
ctx._response = AgentResponse(messages=[])
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
mock_project_client.memory_stores.begin_update_memories.assert_not_awaited()
async def test_uses_configured_update_delay(self, mock_project_client: AsyncMock) -> None:
"""Uses the configured update_delay parameter."""
mock_poller = Mock()
mock_project_client.memory_stores.begin_update_memories.return_value = mock_poller
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
update_delay=60,
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="hi")], session_id="s1")
ctx._response = AgentResponse(messages=[Message(role="assistant", text="hey")])
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
call_kwargs = mock_project_client.memory_stores.begin_update_memories.call_args.kwargs
assert call_kwargs["update_delay"] == 60
async def test_uses_previous_update_id_for_incremental_updates(self, mock_project_client: AsyncMock) -> None:
"""Uses previous_update_id for incremental updates."""
mock_poller1 = Mock()
mock_poller1.update_id = "update-1"
mock_poller2 = Mock()
mock_poller2.update_id = "update-2"
mock_project_client.memory_stores.begin_update_memories.side_effect = [mock_poller1, mock_poller2]
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx1 = SessionContext(input_messages=[Message(role="user", text="first")], session_id="s1")
ctx1._response = AgentResponse(messages=[Message(role="assistant", text="response1")])
# First update
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx1, state=session.state.setdefault(provider.source_id, {})
)
assert session.state[provider.source_id]["previous_update_id"] == "update-1"
# Second update should use previous_update_id
ctx2 = SessionContext(input_messages=[Message(role="user", text="second")], session_id="s1")
ctx2._response = AgentResponse(messages=[Message(role="assistant", text="response2")])
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx2, state=session.state.setdefault(provider.source_id, {})
)
call_kwargs = mock_project_client.memory_stores.begin_update_memories.call_args.kwargs
assert call_kwargs["previous_update_id"] == "update-1"
assert session.state[provider.source_id]["previous_update_id"] == "update-2"
async def test_handles_update_exception_gracefully(self, mock_project_client: AsyncMock) -> None:
"""Update exception is logged but doesn't fail the operation."""
mock_project_client.memory_stores.begin_update_memories.side_effect = Exception("API error")
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
session = AgentSession(session_id="test-session")
ctx = SessionContext(input_messages=[Message(role="user", text="hi")], session_id="s1")
ctx._response = AgentResponse(messages=[Message(role="assistant", text="hey")])
# Should not raise exception
await provider.after_run( # type: ignore[arg-type]
agent=None, session=session, context=ctx, state=session.state.setdefault(provider.source_id, {})
)
# -- Context manager tests -----------------------------------------------------
class TestContextManager:
"""Test __aenter__/__aexit__ delegation."""
async def test_aenter_delegates_to_client(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
result = await provider.__aenter__()
assert result is provider
mock_project_client.__aenter__.assert_awaited_once()
async def test_aexit_delegates_to_client(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
await provider.__aexit__(None, None, None)
mock_project_client.__aexit__.assert_awaited_once()
async def test_async_with_syntax(self, mock_project_client: AsyncMock) -> None:
provider = FoundryMemoryProvider(
project_client=mock_project_client,
memory_store_name="test_store",
scope="user_123",
)
async with provider as p:
assert p is provider
@@ -32,6 +32,7 @@ _IMPORTS: dict[str, tuple[str, str]] = {
"AzureAIAgentsProvider": ("agent_framework_azure_ai", "agent-framework-azure-ai"),
"AzureCredentialTypes": ("agent_framework.azure._entra_id_authentication", "agent-framework-core"),
"AzureTokenProvider": ("agent_framework.azure._entra_id_authentication", "agent-framework-core"),
"FoundryMemoryProvider": ("agent_framework_azure_ai", "agent-framework-azure-ai"),
"AzureOpenAIAssistantsClient": ("agent_framework.azure._assistants_client", "agent-framework-core"),
"AzureOpenAIAssistantsOptions": ("agent_framework.azure._assistants_client", "agent-framework-core"),
"AzureOpenAIChatClient": ("agent_framework.azure._chat_client", "agent-framework-core"),
@@ -7,6 +7,7 @@ from agent_framework_azure_ai import (
AzureAIProjectAgentOptions,
AzureAIProjectAgentProvider,
AzureAISettings,
FoundryMemoryProvider,
)
from agent_framework_azure_ai_search import AzureAISearchContextProvider, AzureAISearchSettings
from agent_framework_azurefunctions import AgentFunctionApp
@@ -47,4 +48,5 @@ __all__ = [
"DurableAIAgentClient",
"DurableAIAgentOrchestrationContext",
"DurableAIAgentWorker",
"FoundryMemoryProvider",
]
@@ -0,0 +1,173 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from datetime import datetime, timezone
from agent_framework import Agent, InMemoryHistoryProvider
from agent_framework.azure import AzureOpenAIResponsesClient, FoundryMemoryProvider
from azure.ai.projects.aio import AIProjectClient
from azure.ai.projects.models import (
MemoryStoreDefaultDefinition,
MemoryStoreDefaultOptions,
)
from azure.identity.aio import AzureCliCredential
from dotenv import load_dotenv
"""
Azure AI Agent with Foundry Memory Context Provider Example
This sample demonstrates using the FoundryMemoryProvider as a context provider
to add semantic memory capabilities to your agents. The provider automatically:
1. Retrieves static (user profile) memories on first run
2. Searches for contextual memories based on conversation
3. Updates the memory store with new conversation messages
The sample creates a temporary memory store with user profile enabled (and chat summary
disabled), scopes memories to a specific user ID ("user_123"), and sets update_delay=0
so memories are stored immediately (in production, use a delay to batch updates and
reduce costs). Conversation history is intentionally not stored (neither service-side
via ``store=False`` nor client-side via ``load_messages=False`` on the history provider),
so that follow-up responses demonstrate the agent relying solely on Foundry Memory
rather than chat history. The memory store is deleted at the end of the run.
Prerequisites:
1. Set AZURE_AI_PROJECT_ENDPOINT environment variable
2. Set AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME for the chat/responses model
3. Set AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME for the embedding model
4. Deploy both a chat model (e.g. gpt-4) and an embedding model (e.g. text-embedding-3-small)
"""
load_dotenv()
async def main() -> None:
endpoint = os.environ["AZURE_AI_PROJECT_ENDPOINT"]
async with (
AzureCliCredential() as credential,
AIProjectClient(endpoint=endpoint, credential=credential) as project_client,
):
# Generate a unique memory store name to avoid conflicts
memory_store_name = f"agent_framework_memory_{datetime.now(timezone.utc).strftime('%Y%m%d')}"
# Specify memory store options
options = MemoryStoreDefaultOptions(
chat_summary_enabled=False,
user_profile_enabled=True,
user_profile_details="Avoid irrelevant or sensitive data, such as age, financials, precise location, and credentials",
)
memory_store_definition = MemoryStoreDefaultDefinition(
chat_model=os.environ["AZURE_OPENAI_RESPONSES_DEPLOYMENT_NAME"],
embedding_model=os.environ["AZURE_OPENAI_EMBEDDING_DEPLOYMENT_NAME"],
options=options,
)
print(f"Creating memory store '{memory_store_name}'...")
try:
# Create a memory store
memory_store = await project_client.memory_stores.create(
name=memory_store_name,
description="Memory store for Agent Framework with FoundryMemoryProvider",
definition=memory_store_definition,
)
except Exception as e:
print(f"Failed to create memory store: {e}")
return
print(f"Created memory store: {memory_store.name} ({memory_store.id})")
print(f"Description: {memory_store.description}\n")
print("==========================================")
# Create the chat client
client = AzureOpenAIResponsesClient(project_client=project_client)
# Create the Foundry Memory context provider
memory_provider = FoundryMemoryProvider(
project_client=project_client,
memory_store_name=memory_store.name,
scope="user_123", # Scope memories to a specific user, if not set, the session_id
# will be used as scope, which means memories are only shared within the same session
update_delay=0, # Do not wait to update memories after each interaction (for demo purposes)
# In production, consider setting a delay to batch updates and reduce costs
)
# Create an agent with the memory context provider
async with Agent(
name="MemoryAgent",
client=client,
instructions="""You are a helpful assistant that remembers past conversations.
The memories from previous interactions are automatically provided to you.""",
context_providers=[memory_provider, InMemoryHistoryProvider(load_messages=False)],
default_options={"store": False},
) as agent:
try:
# note that we will use the service side storage, nor load messsages from the history provider,
# but we include it to demonstrate that it can be used alongside the Foundry provider for other use cases.
session = agent.create_session()
# First interaction - establish some preferences
print("=== First conversation ===")
query1 = "I prefer dark roast coffee and I'm allergic to nuts"
print(f"User: {query1}")
result1 = await agent.run(query1, session=session)
print(f"Agent: {result1}\n")
# Wait for memories to be processed
print("Waiting for memories to be stored...")
await asyncio.sleep(8)
# Second interaction - test memory recall
print("=== Second conversation ===")
query2 = "Can you recommend a coffee and snack for me?"
print(f"User: {query2}")
result2 = await agent.run(query2, session=session)
print(f"Agent: {result2}\n")
# Third interaction - continue the conversation
print("=== Third conversation ===")
query3 = "What do you remember about my preferences?"
print(f"User: {query3}")
result3 = await agent.run(query3, session=session)
print(f"Agent: {result3}\n")
print(f"Stored memories from: {memory_store.name} ({memory_store.id})")
res = await project_client.memory_stores.search_memories(name=memory_store.name, scope="user_123")
for memory in res.memories:
print(f"Memory: {memory.memory_item.content}")
except Exception as e:
print(f"An error occurred: {e}")
finally:
await project_client.memory_stores.delete(memory_store_name)
print("==========================================")
print("Memory store deleted")
if __name__ == "__main__":
asyncio.run(main())
"""
Example output:
Creating memory store 'agent_framework_memory_20260223'...
Created memory store: agent_framework_memory_20260223 (memstore_57c1f95bb4040c6d00RVOP71Q8tS23opIc4G4ZE8DuALiBFx44)
Description: Memory store for Agent Framework with FoundryMemoryProvider
==========================================
=== First conversation ===
User: I prefer dark roast coffee and I'm allergic to nuts
Agent: Got it—Ill remember: you prefer dark roast coffee, and youre allergic to nuts.
Waiting for memories to be stored...
=== Second conversation ===
User: Can you recommend a coffee and snack for me?
Agent: For coffee: **dark roast drip or Americano** (choose a **dark roast** like French/Italian roast). If you like it smoother, try a **dark-roast cold brew**.
For a snack (nut-free): **Greek yogurt with berries**, or a **cheese stick + whole-grain crackers**. If you want something sweet: **dark chocolate (check “may contain nuts” warnings)**.
=== Third conversation ===
User: What do you remember about my preferences?
Agent: - Youre allergic to nuts.
- You prefer dark roast coffee.
Stored memories from: agent_framework_memory_20260223 (memstore_57c1f95bb4040c6d00RVOP71Q8tS23opIc4G4ZE8DuALiBFx44)
Memory: The user is allergic to nuts.
Memory: The user prefers dark roast coffee.
==========================================
Memory store deleted
"""