mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
Python: add experimental file history provider (#5248)
* add experimental file history provider * Improve file history provider writes Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * typo * cleanup * cleanup * fix in readme * added security messages * Refine file history provider locking Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * added additional sample --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
Unverified
parent
eab7f09d03
commit
ff05c22c58
@@ -63,6 +63,8 @@ agent_framework/
|
||||
- **`SessionContext`** - Context object for session-scoped data during agent runs
|
||||
- **`ContextProvider`** - Base class for context providers (RAG, memory systems)
|
||||
- **`HistoryProvider`** - Base class for conversation history storage
|
||||
- **`InMemoryHistoryProvider`** - Built-in session-state history provider for local runs
|
||||
- **`FileHistoryProvider`** - JSON Lines file-backed history provider storing one file per session with one message record per line
|
||||
|
||||
### Skills (`_skills.py`)
|
||||
|
||||
|
||||
@@ -103,6 +103,7 @@ from ._middleware import (
|
||||
from ._sessions import (
|
||||
AgentSession,
|
||||
ContextProvider,
|
||||
FileHistoryProvider,
|
||||
HistoryProvider,
|
||||
InMemoryHistoryProvider,
|
||||
SessionContext,
|
||||
@@ -318,6 +319,7 @@ __all__ = [
|
||||
"FanInEdgeGroup",
|
||||
"FanOutEdgeGroup",
|
||||
"FileCheckpointStorage",
|
||||
"FileHistoryProvider",
|
||||
"FinalT",
|
||||
"FinishReason",
|
||||
"FinishReasonLiteral",
|
||||
|
||||
@@ -47,6 +47,7 @@ class ExperimentalFeature(str, Enum):
|
||||
"""
|
||||
|
||||
EVALS = "EVALS"
|
||||
FILE_HISTORY = "FILE_HISTORY"
|
||||
SKILLS = "SKILLS"
|
||||
|
||||
|
||||
|
||||
@@ -8,16 +8,24 @@ This module provides the core types for the context provider pipeline:
|
||||
- HistoryProvider: Base class for history storage providers
|
||||
- AgentSession: Lightweight session state container
|
||||
- InMemoryHistoryProvider: Built-in in-memory history provider
|
||||
- FileHistoryProvider: Built-in JSON Lines file history provider
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import copy
|
||||
import json
|
||||
import threading
|
||||
import uuid
|
||||
import weakref
|
||||
from abc import abstractmethod
|
||||
from base64 import urlsafe_b64encode
|
||||
from collections.abc import Awaitable, Callable, Mapping, Sequence
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, TypeGuard, cast
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Any, ClassVar, TypeAlias, TypeGuard, cast
|
||||
|
||||
from ._feature_stage import ExperimentalFeature, experimental
|
||||
from ._middleware import ChatContext, ChatMiddleware
|
||||
from ._types import AgentResponse, ChatResponse, Message, ResponseStream
|
||||
from .exceptions import ChatClientInvalidResponseException
|
||||
@@ -30,6 +38,17 @@ if TYPE_CHECKING:
|
||||
# Registry of known types for state deserialization
|
||||
_STATE_TYPE_REGISTRY: dict[str, type] = {}
|
||||
|
||||
JsonDumps: TypeAlias = Callable[[Any], str | bytes]
|
||||
JsonLoads: TypeAlias = Callable[[str | bytes], Any]
|
||||
|
||||
|
||||
def _default_json_dumps(value: Any) -> str:
|
||||
return json.dumps(value, ensure_ascii=False)
|
||||
|
||||
|
||||
def _default_json_loads(value: str | bytes) -> Any:
|
||||
return json.loads(value)
|
||||
|
||||
|
||||
def _is_middleware_sequence(
|
||||
middleware: MiddlewareTypes | Sequence[MiddlewareTypes],
|
||||
@@ -837,3 +856,247 @@ class InMemoryHistoryProvider(HistoryProvider):
|
||||
return
|
||||
existing = state.get("messages", [])
|
||||
state["messages"] = [*existing, *messages]
|
||||
|
||||
|
||||
@experimental(feature_id=ExperimentalFeature.FILE_HISTORY)
|
||||
class FileHistoryProvider(HistoryProvider):
|
||||
"""File-backed history provider that stores one JSON Lines file per session.
|
||||
|
||||
Each persisted message is written as a single JSON object per line. The
|
||||
provider does not serialize full session snapshots into the file. By default
|
||||
it uses the standard library ``json`` module, but callers can inject
|
||||
alternative ``dumps`` and ``loads`` callables compatible with the JSON
|
||||
Lines format.
|
||||
|
||||
Security posture:
|
||||
Persisted history is stored as plaintext JSONL on the local filesystem.
|
||||
Treat ``storage_path`` as trusted application storage, not as a secret
|
||||
store. Encoded fallback filenames and resolved-path validation help
|
||||
prevent path traversal via ``session_id``, but they do not encrypt file
|
||||
contents or provide cross-process / cross-host locking. Use OS-level
|
||||
file permissions, trusted directories, and carefully review what agent
|
||||
or tool output is allowed to be persisted.
|
||||
"""
|
||||
|
||||
DEFAULT_SOURCE_ID: ClassVar[str] = "file_history"
|
||||
DEFAULT_SESSION_FILE_STEM: ClassVar[str] = "default"
|
||||
FILE_EXTENSION: ClassVar[str] = ".jsonl"
|
||||
_FILE_LOCK_STRIPE_COUNT: ClassVar[int] = 64
|
||||
_ENCODED_SESSION_PREFIX: ClassVar[str] = "~session-"
|
||||
_FILE_WRITE_LOCKS: ClassVar[tuple[threading.Lock, ...]] = tuple(
|
||||
threading.Lock() for _ in range(_FILE_LOCK_STRIPE_COUNT)
|
||||
)
|
||||
_WINDOWS_RESERVED_FILE_STEMS: ClassVar[frozenset[str]] = frozenset({
|
||||
"CON",
|
||||
"PRN",
|
||||
"AUX",
|
||||
"NUL",
|
||||
"COM1",
|
||||
"COM2",
|
||||
"COM3",
|
||||
"COM4",
|
||||
"COM5",
|
||||
"COM6",
|
||||
"COM7",
|
||||
"COM8",
|
||||
"COM9",
|
||||
"LPT1",
|
||||
"LPT2",
|
||||
"LPT3",
|
||||
"LPT4",
|
||||
"LPT5",
|
||||
"LPT6",
|
||||
"LPT7",
|
||||
"LPT8",
|
||||
"LPT9",
|
||||
})
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
storage_path: str | Path,
|
||||
*,
|
||||
source_id: str = DEFAULT_SOURCE_ID,
|
||||
load_messages: bool = True,
|
||||
store_inputs: bool = True,
|
||||
store_context_messages: bool = False,
|
||||
store_context_from: set[str] | None = None,
|
||||
store_outputs: bool = True,
|
||||
skip_excluded: bool = False,
|
||||
dumps: JsonDumps | None = None,
|
||||
loads: JsonLoads | None = None,
|
||||
) -> None:
|
||||
"""Initialize the file history provider.
|
||||
|
||||
Args:
|
||||
storage_path: Directory path where session history files will be stored.
|
||||
|
||||
Keyword Args:
|
||||
source_id: Unique identifier for this provider instance.
|
||||
load_messages: Whether to load messages before invocation.
|
||||
store_inputs: Whether to store input messages.
|
||||
store_context_messages: Whether to store context from other providers.
|
||||
store_context_from: If set, only store context from these source_ids.
|
||||
store_outputs: Whether to store response messages.
|
||||
skip_excluded: When True, ``get_messages`` omits messages whose
|
||||
``additional_properties["_excluded"]`` is truthy.
|
||||
dumps: Callable that serializes a message payload dict to JSON text
|
||||
or UTF-8 bytes. The returned JSON must fit on a single line.
|
||||
loads: Callable that deserializes JSON text or bytes back to a
|
||||
message payload dict.
|
||||
"""
|
||||
super().__init__(
|
||||
source_id=source_id,
|
||||
load_messages=load_messages,
|
||||
store_inputs=store_inputs,
|
||||
store_context_messages=store_context_messages,
|
||||
store_context_from=store_context_from,
|
||||
store_outputs=store_outputs,
|
||||
)
|
||||
self.storage_path = Path(storage_path)
|
||||
self.storage_path.mkdir(parents=True, exist_ok=True)
|
||||
self._storage_root = self.storage_path.resolve()
|
||||
self.skip_excluded = skip_excluded
|
||||
self.dumps = dumps or _default_json_dumps
|
||||
self.loads = loads or _default_json_loads
|
||||
self._async_write_locks_by_loop: weakref.WeakKeyDictionary[
|
||||
asyncio.AbstractEventLoop,
|
||||
tuple[asyncio.Lock, ...],
|
||||
] = weakref.WeakKeyDictionary()
|
||||
|
||||
async def get_messages(
|
||||
self,
|
||||
session_id: str | None,
|
||||
*,
|
||||
state: dict[str, Any] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> list[Message]:
|
||||
"""Retrieve messages from the session's JSON Lines file."""
|
||||
del state, kwargs
|
||||
file_path = self._session_file_path(session_id)
|
||||
async_lock = self._session_async_write_lock(file_path)
|
||||
thread_lock = self._session_write_lock(file_path)
|
||||
|
||||
def _read_messages() -> list[Message]:
|
||||
with thread_lock:
|
||||
if not file_path.exists():
|
||||
return []
|
||||
|
||||
messages: list[Message] = []
|
||||
with file_path.open(encoding="utf-8") as file_handle:
|
||||
for line_number, line in enumerate(file_handle, start=1):
|
||||
serialized = line.strip()
|
||||
if not serialized:
|
||||
continue
|
||||
try:
|
||||
payload = self.loads(serialized)
|
||||
except (TypeError, ValueError) as exc:
|
||||
raise ValueError(
|
||||
f"Failed to deserialize history line {line_number} from '{file_path}'."
|
||||
) from exc
|
||||
if not isinstance(payload, Mapping):
|
||||
raise ValueError(
|
||||
f"History line {line_number} in '{file_path}' did not deserialize to a mapping."
|
||||
)
|
||||
|
||||
try:
|
||||
message = Message.from_dict(dict(cast(Mapping[str, Any], payload)))
|
||||
except ValueError as exc:
|
||||
raise ValueError(
|
||||
f"History line {line_number} in '{file_path}' is not a valid Message payload."
|
||||
) from exc
|
||||
messages.append(message)
|
||||
return messages
|
||||
|
||||
async with async_lock:
|
||||
messages = await asyncio.to_thread(_read_messages)
|
||||
if self.skip_excluded:
|
||||
messages = [m for m in messages if not m.additional_properties.get("_excluded", False)]
|
||||
return messages
|
||||
|
||||
async def save_messages(
|
||||
self,
|
||||
session_id: str | None,
|
||||
messages: Sequence[Message],
|
||||
*,
|
||||
state: dict[str, Any] | None = None,
|
||||
**kwargs: Any,
|
||||
) -> None:
|
||||
"""Append messages to the session's JSON Lines file."""
|
||||
del state, kwargs
|
||||
if not messages:
|
||||
return
|
||||
|
||||
file_path = self._session_file_path(session_id)
|
||||
async_lock = self._session_async_write_lock(file_path)
|
||||
file_lock = self._session_write_lock(file_path)
|
||||
|
||||
def _append_messages() -> None:
|
||||
with file_lock, file_path.open("a", encoding="utf-8") as file_handle:
|
||||
for message in messages:
|
||||
file_handle.write(f"{self._serialize_message(message)}\n")
|
||||
|
||||
async with async_lock:
|
||||
await asyncio.to_thread(_append_messages)
|
||||
|
||||
def _serialize_message(self, message: Message) -> str:
|
||||
"""Serialize a message payload to a single JSON Lines record."""
|
||||
serialized = self.dumps(message.to_dict())
|
||||
if isinstance(serialized, bytes):
|
||||
serialized_text = serialized.decode("utf-8")
|
||||
elif isinstance(serialized, str):
|
||||
serialized_text = serialized
|
||||
else:
|
||||
raise TypeError("FileHistoryProvider.dumps must return str or bytes.")
|
||||
|
||||
if "\n" in serialized_text or "\r" in serialized_text:
|
||||
raise ValueError("FileHistoryProvider.dumps must return single-line JSON for JSON Lines storage.")
|
||||
return serialized_text
|
||||
|
||||
def _session_file_path(self, session_id: str | None) -> Path:
|
||||
"""Resolve the on-disk history file path for a session."""
|
||||
file_path = (self._storage_root / f"{self._session_file_stem(session_id)}{self.FILE_EXTENSION}").resolve()
|
||||
if not file_path.is_relative_to(self._storage_root):
|
||||
raise ValueError(f"Session history path escaped storage directory: {session_id!r}")
|
||||
return file_path
|
||||
|
||||
def _session_file_stem(self, session_id: str | None) -> str:
|
||||
"""Return the filename stem for a session."""
|
||||
raw_session_id = session_id or self.DEFAULT_SESSION_FILE_STEM
|
||||
if self._is_literal_session_file_stem_safe(raw_session_id):
|
||||
return raw_session_id
|
||||
|
||||
encoded_session_id = urlsafe_b64encode(raw_session_id.encode("utf-8")).decode("ascii").rstrip("=")
|
||||
return f"{self._ENCODED_SESSION_PREFIX}{encoded_session_id or self.DEFAULT_SESSION_FILE_STEM}"
|
||||
|
||||
def _session_async_write_lock(self, file_path: Path) -> asyncio.Lock:
|
||||
"""Return the event-loop-local async lock for a session history file."""
|
||||
loop = asyncio.get_running_loop()
|
||||
locks = self._async_write_locks_by_loop.get(loop)
|
||||
if locks is None:
|
||||
locks = tuple(asyncio.Lock() for _ in range(self._FILE_LOCK_STRIPE_COUNT))
|
||||
self._async_write_locks_by_loop[loop] = locks
|
||||
return locks[self._lock_index(file_path)]
|
||||
|
||||
@classmethod
|
||||
def _session_write_lock(cls, file_path: Path) -> threading.Lock:
|
||||
"""Return the process-local thread lock for a session history file."""
|
||||
return cls._FILE_WRITE_LOCKS[cls._lock_index(file_path)]
|
||||
|
||||
@classmethod
|
||||
def _lock_index(cls, file_path: Path) -> int:
|
||||
"""Map a session history file to a bounded lock stripe."""
|
||||
return hash(file_path) % cls._FILE_LOCK_STRIPE_COUNT
|
||||
|
||||
@classmethod
|
||||
def _is_literal_session_file_stem_safe(cls, session_id: str) -> bool:
|
||||
"""Return whether the session ID can be used directly as a filename stem."""
|
||||
if (
|
||||
not session_id
|
||||
or session_id.startswith(".")
|
||||
or session_id.endswith((" ", "."))
|
||||
or session_id.upper() in cls._WINDOWS_RESERVED_FILE_STEMS
|
||||
):
|
||||
return False
|
||||
if any(ord(character) < 32 for character in session_id):
|
||||
return False
|
||||
return all(character.isalnum() or character in "._-" for character in session_id)
|
||||
|
||||
@@ -1,7 +1,12 @@
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import threading
|
||||
import time
|
||||
from collections.abc import Awaitable, Callable, Sequence
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import pytest
|
||||
|
||||
@@ -10,6 +15,8 @@ from agent_framework import (
|
||||
AgentSession,
|
||||
ChatContext,
|
||||
ContextProvider,
|
||||
ExperimentalFeature,
|
||||
FileHistoryProvider,
|
||||
HistoryProvider,
|
||||
InMemoryHistoryProvider,
|
||||
Message,
|
||||
@@ -505,3 +512,217 @@ class TestInMemoryHistoryProvider:
|
||||
ctx = SessionContext(session_id="s1", input_messages=[])
|
||||
ctx.extend_messages("custom-source", [Message(role="user", contents=["test"])])
|
||||
assert "custom-source" in ctx.context_messages
|
||||
|
||||
|
||||
class TestFileHistoryProvider:
|
||||
def test_is_marked_experimental(self) -> None:
|
||||
assert FileHistoryProvider.__feature_stage__ == "experimental"
|
||||
assert FileHistoryProvider.__feature_id__ == ExperimentalFeature.FILE_HISTORY.value
|
||||
assert FileHistoryProvider.__doc__ is not None
|
||||
assert ".. warning:: Experimental" in FileHistoryProvider.__doc__
|
||||
|
||||
async def test_stores_and_loads_messages(self, tmp_path: Path) -> None:
|
||||
from agent_framework import AgentResponse
|
||||
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
session = AgentSession(session_id="s1")
|
||||
|
||||
input_message = Message(role="user", contents=["hello"])
|
||||
response_message = Message(role="assistant", contents=["hi there"])
|
||||
first_context = SessionContext(session_id=session.session_id, input_messages=[input_message])
|
||||
|
||||
await provider.before_run( # type: ignore[arg-type]
|
||||
agent=None,
|
||||
session=session,
|
||||
context=first_context,
|
||||
state={},
|
||||
)
|
||||
first_context._response = AgentResponse(messages=[response_message])
|
||||
await provider.after_run( # type: ignore[arg-type]
|
||||
agent=None,
|
||||
session=session,
|
||||
context=first_context,
|
||||
state={},
|
||||
)
|
||||
|
||||
session_file = provider._session_file_path(session.session_id)
|
||||
assert session_file.name == "s1.jsonl"
|
||||
assert session_file.exists()
|
||||
raw_lines = (await asyncio.to_thread(session_file.read_text, encoding="utf-8")).splitlines()
|
||||
assert len(raw_lines) == 2
|
||||
payloads = [json.loads(line) for line in raw_lines]
|
||||
assert all(payload["type"] == "message" for payload in payloads)
|
||||
assert all("session_id" not in payload for payload in payloads)
|
||||
|
||||
second_context = SessionContext(
|
||||
session_id=session.session_id, input_messages=[Message(role="user", contents=["again"])]
|
||||
)
|
||||
await provider.before_run( # type: ignore[arg-type]
|
||||
agent=None,
|
||||
session=session,
|
||||
context=second_context,
|
||||
state={},
|
||||
)
|
||||
loaded = second_context.context_messages.get(provider.source_id, [])
|
||||
assert len(loaded) == 2
|
||||
assert loaded[0].text == "hello"
|
||||
assert loaded[1].text == "hi there"
|
||||
|
||||
def test_creates_storage_directory(self, tmp_path: Path) -> None:
|
||||
nested_path = tmp_path / "nested" / "history"
|
||||
provider = FileHistoryProvider(nested_path)
|
||||
assert provider.storage_path == nested_path
|
||||
assert nested_path.exists()
|
||||
assert nested_path.is_dir()
|
||||
|
||||
async def test_uses_encoded_filename_for_unsafe_session_id(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
unsafe_session_id = "../unsafe/session"
|
||||
|
||||
await provider.save_messages(unsafe_session_id, [Message(role="user", contents=["hello"])])
|
||||
|
||||
session_file = provider._session_file_path(unsafe_session_id)
|
||||
assert session_file.parent == provider.storage_path
|
||||
assert session_file.name.startswith("~session-")
|
||||
assert session_file.suffix == ".jsonl"
|
||||
assert session_file.exists()
|
||||
jsonl_files = await asyncio.to_thread(
|
||||
lambda: sorted(path.name for path in provider.storage_path.glob("*.jsonl"))
|
||||
)
|
||||
assert jsonl_files == [session_file.name]
|
||||
|
||||
async def test_allows_custom_serializers_returning_bytes(self, tmp_path: Path) -> None:
|
||||
calls: list[str] = []
|
||||
|
||||
def dumps(payload: object) -> bytes:
|
||||
calls.append("dumps")
|
||||
return json.dumps(payload).encode("utf-8")
|
||||
|
||||
def loads(payload: str | bytes) -> object:
|
||||
calls.append("loads")
|
||||
if isinstance(payload, bytes):
|
||||
payload = payload.decode("utf-8")
|
||||
return json.loads(payload)
|
||||
|
||||
provider = FileHistoryProvider(tmp_path, dumps=dumps, loads=loads)
|
||||
|
||||
await provider.save_messages("custom-serializer", [Message(role="user", contents=["hello"])])
|
||||
loaded = await provider.get_messages("custom-serializer")
|
||||
|
||||
assert calls == ["dumps", "loads"]
|
||||
assert len(loaded) == 1
|
||||
assert loaded[0].text == "hello"
|
||||
|
||||
async def test_invalid_jsonl_line_raises(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
await asyncio.to_thread(provider._session_file_path("broken").write_text, "{not-json}\n", encoding="utf-8")
|
||||
|
||||
with pytest.raises(ValueError, match="Failed to deserialize history line 1"):
|
||||
await provider.get_messages("broken")
|
||||
|
||||
async def test_missing_session_file_returns_empty_messages(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
|
||||
loaded = await provider.get_messages("missing")
|
||||
|
||||
assert loaded == []
|
||||
|
||||
async def test_none_session_id_uses_default_jsonl_file(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
|
||||
await provider.save_messages(None, [Message(role="user", contents=["hello"])])
|
||||
|
||||
session_file = provider._session_file_path(None)
|
||||
assert session_file.name == "default.jsonl"
|
||||
loaded = await provider.get_messages(None)
|
||||
assert [message.text for message in loaded] == ["hello"]
|
||||
|
||||
async def test_non_mapping_jsonl_line_raises(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
await asyncio.to_thread(provider._session_file_path("non-mapping").write_text, "[1, 2, 3]\n", encoding="utf-8")
|
||||
|
||||
with pytest.raises(ValueError, match="did not deserialize to a mapping"):
|
||||
await provider.get_messages("non-mapping")
|
||||
|
||||
async def test_skip_excluded_omits_excluded_messages(self, tmp_path: Path) -> None:
|
||||
provider = FileHistoryProvider(tmp_path, skip_excluded=True)
|
||||
|
||||
await provider.save_messages(
|
||||
"skip-excluded",
|
||||
[
|
||||
Message(role="user", contents=["keep"]),
|
||||
Message(role="assistant", contents=["skip"], additional_properties={"_excluded": True}),
|
||||
],
|
||||
)
|
||||
|
||||
loaded = await provider.get_messages("skip-excluded")
|
||||
|
||||
assert [message.text for message in loaded] == ["keep"]
|
||||
|
||||
async def test_serializer_must_return_single_line_json(self, tmp_path: Path) -> None:
|
||||
def dumps(payload: object) -> str:
|
||||
return json.dumps(payload, indent=2)
|
||||
|
||||
provider = FileHistoryProvider(tmp_path, dumps=dumps)
|
||||
|
||||
with pytest.raises(ValueError, match="single-line JSON"):
|
||||
await provider.save_messages("pretty-json", [Message(role="user", contents=["hello"])])
|
||||
|
||||
async def test_concurrent_writes_for_same_session_are_locked(
|
||||
self,
|
||||
tmp_path: Path,
|
||||
monkeypatch: pytest.MonkeyPatch,
|
||||
) -> None:
|
||||
provider = FileHistoryProvider(tmp_path)
|
||||
session_id = "shared-session"
|
||||
file_path = provider._session_file_path(session_id)
|
||||
real_open = Path.open
|
||||
write_started = threading.Event()
|
||||
active_writes = 0
|
||||
overlap_detected = False
|
||||
|
||||
class _TrackingFile:
|
||||
def __init__(self, wrapped: Any) -> None:
|
||||
self._wrapped = wrapped
|
||||
|
||||
def __enter__(self) -> "_TrackingFile":
|
||||
self._wrapped.__enter__()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
|
||||
self._wrapped.__exit__(exc_type, exc_val, exc_tb)
|
||||
|
||||
def write(self, data: str) -> int:
|
||||
nonlocal active_writes, overlap_detected
|
||||
write_started.set()
|
||||
active_writes += 1
|
||||
overlap_detected = overlap_detected or active_writes > 1
|
||||
try:
|
||||
time.sleep(0.05)
|
||||
return int(self._wrapped.write(data))
|
||||
finally:
|
||||
active_writes -= 1
|
||||
|
||||
def __getattr__(self, name: str) -> Any:
|
||||
return getattr(self._wrapped, name)
|
||||
|
||||
def tracked_open(path: Path, *args: Any, **kwargs: Any) -> Any:
|
||||
handle = real_open(path, *args, **kwargs)
|
||||
if path == file_path and args and args[0] == "a":
|
||||
return _TrackingFile(handle)
|
||||
return handle
|
||||
|
||||
monkeypatch.setattr(Path, "open", tracked_open)
|
||||
|
||||
first_save = asyncio.create_task(provider.save_messages(session_id, [Message(role="user", contents=["first"])]))
|
||||
started = await asyncio.to_thread(write_started.wait, 1.0)
|
||||
assert started
|
||||
|
||||
second_save = asyncio.create_task(
|
||||
provider.save_messages(session_id, [Message(role="assistant", contents=["second"])])
|
||||
)
|
||||
await asyncio.gather(first_save, second_save)
|
||||
|
||||
assert not overlap_detected
|
||||
loaded = await provider.get_messages(session_id)
|
||||
assert [message.text for message in loaded] == ["first", "second"]
|
||||
|
||||
@@ -8,6 +8,8 @@ These samples demonstrate different approaches to managing conversation history
|
||||
|------|-------------|
|
||||
| [`suspend_resume_session.py`](suspend_resume_session.py) | Suspend and resume conversation sessions, comparing service-managed sessions (Azure AI Foundry) with in-memory sessions (OpenAI). |
|
||||
| [`custom_history_provider.py`](custom_history_provider.py) | Implement a custom history provider by extending `HistoryProvider`, enabling conversation persistence in your preferred storage backend. |
|
||||
| [`file_history_provider.py`](file_history_provider.py) | Use the experimental `FileHistoryProvider` with `FoundryChatClient` and a function tool so the local JSON Lines file shows the full tool-calling loop. |
|
||||
| [`file_history_provider_conversation_persistence.py`](file_history_provider_conversation_persistence.py) | Persist a tool-driven weather conversation with `FileHistoryProvider`, inspect the stored JSONL records, and continue with another city. |
|
||||
| [`cosmos_history_provider.py`](cosmos_history_provider.py) | Use Azure Cosmos DB as a history provider for durable conversation storage with `CosmosHistoryProvider`. |
|
||||
| [`cosmos_history_provider_conversation_persistence.py`](cosmos_history_provider_conversation_persistence.py) | Persist and resume conversations across application restarts using `CosmosHistoryProvider` — serialize session state, restore it, and continue with full Cosmos DB history. |
|
||||
| [`cosmos_history_provider_messages.py`](cosmos_history_provider_messages.py) | Direct message history operations — retrieve stored messages as a transcript, clear session history, and verify data deletion. |
|
||||
@@ -25,6 +27,20 @@ These samples demonstrate different approaches to managing conversation history
|
||||
**For `custom_history_provider.py`:**
|
||||
- `OPENAI_API_KEY`: Your OpenAI API key
|
||||
|
||||
**For `file_history_provider.py`:**
|
||||
- `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint
|
||||
- `FOUNDRY_MODEL`: The Foundry model deployment name
|
||||
- Azure CLI authentication (`az login`)
|
||||
- The sample writes plaintext JSONL conversation logs to disk; use a trusted
|
||||
local directory and avoid treating the history files as secure secret storage
|
||||
|
||||
**For `file_history_provider_conversation_persistence.py`:**
|
||||
- `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint
|
||||
- `FOUNDRY_MODEL`: The Foundry model deployment name
|
||||
- Azure CLI authentication (`az login`)
|
||||
- The sample writes plaintext JSONL conversation logs to disk; use a trusted
|
||||
local directory and avoid treating the history files as secure secret storage
|
||||
|
||||
**For Cosmos DB samples (`cosmos_history_provider*.py`):**
|
||||
- `FOUNDRY_PROJECT_ENDPOINT`: Your Azure AI Foundry project endpoint
|
||||
- `FOUNDRY_MODEL`: The Foundry model deployment name
|
||||
|
||||
@@ -0,0 +1,157 @@
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
# Uncomment this filter to suppress the experimental FileHistoryProvider warning
|
||||
# before running the sample.
|
||||
# import warnings # isort: skip
|
||||
# warnings.filterwarnings("ignore", message=r"\[FILE_HISTORY\].*", category=FutureWarning)
|
||||
from agent_framework import Agent, FileHistoryProvider, tool
|
||||
from agent_framework.foundry import FoundryChatClient
|
||||
from azure.identity import AzureCliCredential
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import Field
|
||||
|
||||
try:
|
||||
import orjson
|
||||
except ImportError:
|
||||
orjson = None
|
||||
|
||||
|
||||
# Load environment variables from .env file.
|
||||
load_dotenv()
|
||||
|
||||
"""
|
||||
File History Provider
|
||||
|
||||
This sample demonstrates how to use the experimental `FileHistoryProvider` with
|
||||
`FoundryChatClient` and a function tool so the persisted JSON Lines file shows
|
||||
the tool-calling loop as well as the regular chat turns.
|
||||
|
||||
Environment variables:
|
||||
FOUNDRY_PROJECT_ENDPOINT: Azure AI Foundry project endpoint.
|
||||
FOUNDRY_MODEL: Foundry model deployment name.
|
||||
|
||||
Key components:
|
||||
- `FileHistoryProvider`: Stores one message JSON object per line in a local
|
||||
`.jsonl` file for each session.
|
||||
- `lookup_weather`: A function tool that makes the persisted file show the
|
||||
assistant function call and tool result lines.
|
||||
- `json.dumps(..., indent=2)`: Pretty-prints selected records in the sample
|
||||
output while keeping the on-disk JSONL file compact and valid.
|
||||
- `USE_TEMP_DIRECTORY`: Toggle between a temporary directory and a persistent
|
||||
`sessions/` folder next to this sample file.
|
||||
|
||||
Security posture:
|
||||
- The history files are plaintext JSONL on disk, so use a trusted storage
|
||||
directory and treat the files as conversation logs, not as secure secret
|
||||
storage.
|
||||
- Path safety checks protect the filename derived from the session id, but they
|
||||
do not redact message contents or encrypt the file.
|
||||
"""
|
||||
|
||||
USE_TEMP_DIRECTORY = False
|
||||
"""When True, store JSONL files in a temporary directory for this run only."""
|
||||
|
||||
LOCAL_SESSIONS_DIRECTORY_NAME = "sessions"
|
||||
"""Folder name used when persisting history next to this sample file."""
|
||||
|
||||
|
||||
@tool(approval_mode="never_require")
|
||||
def lookup_weather(
|
||||
location: Annotated[str, Field(description="The city to look up weather for.")],
|
||||
) -> str:
|
||||
"""Return a deterministic weather report for a city."""
|
||||
weather_reports = {
|
||||
"Seattle": "Seattle is rainy with a high of 13C.",
|
||||
"Amsterdam": "Amsterdam is cloudy with a high of 16C.",
|
||||
}
|
||||
return weather_reports.get(location, f"{location} is sunny with a high of 20C.")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _resolve_storage_directory() -> Iterator[Path]:
|
||||
"""Yield the configured storage directory for the sample run."""
|
||||
if USE_TEMP_DIRECTORY:
|
||||
with tempfile.TemporaryDirectory(prefix="af-file-history-") as temp_directory:
|
||||
yield Path(temp_directory)
|
||||
return
|
||||
|
||||
storage_directory = Path(__file__).resolve().parent / LOCAL_SESSIONS_DIRECTORY_NAME
|
||||
storage_directory.mkdir(parents=True, exist_ok=True)
|
||||
yield storage_directory
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Run the file history provider sample."""
|
||||
|
||||
with _resolve_storage_directory() as storage_directory:
|
||||
print(f"Using temporary directory: {USE_TEMP_DIRECTORY}")
|
||||
print(f"Storage directory: {storage_directory}\n")
|
||||
|
||||
# 2. Create the agent with a tool so the JSONL file includes tool-calling messages.
|
||||
agent = Agent(
|
||||
client=FoundryChatClient(
|
||||
project_endpoint=os.getenv("FOUNDRY_PROJECT_ENDPOINT"),
|
||||
model=os.getenv("FOUNDRY_MODEL"),
|
||||
credential=AzureCliCredential(),
|
||||
),
|
||||
name="FileHistoryAgent",
|
||||
instructions=(
|
||||
"You are a helpful assistant, use the lookup_weather tool for weather questions and "
|
||||
"answer with the tool result in one sentence."
|
||||
),
|
||||
tools=[lookup_weather],
|
||||
# if orjson is available, use it for faster JSON serialization in the FileHistoryProvider,
|
||||
# otherwise fall back to the default json module.
|
||||
context_providers=[
|
||||
FileHistoryProvider(
|
||||
storage_directory,
|
||||
dumps=orjson.dumps if orjson else None,
|
||||
loads=orjson.loads if orjson else None,
|
||||
)
|
||||
],
|
||||
default_options={"store": False},
|
||||
)
|
||||
|
||||
# 3. Let Agent create the default UUID session id for this conversation.
|
||||
session = agent.create_session()
|
||||
|
||||
# 4. Ask a question that triggers the weather tool.
|
||||
print("=== Run with tool calling ===")
|
||||
query = "Use the lookup_weather tool for Seattle and tell me the weather."
|
||||
response = await agent.run(query, session=session)
|
||||
print(f"User: {query}")
|
||||
print(f"Assistant: {response.text}\n")
|
||||
|
||||
# 5. Ask a follow-up question that triggers the weather tool as well
|
||||
print("=== Follow-up question ===")
|
||||
query = "And what about Amsterdam?"
|
||||
response = await agent.run(query, session=session)
|
||||
print(f"User: {query}")
|
||||
print(f"Assistant: {response.text}\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
"""
|
||||
Sample output:
|
||||
Using temporary directory: False
|
||||
Storage directory: /path/to/samples/02-agents/conversations/sessions
|
||||
|
||||
=== Run with tool calling ===
|
||||
User: Use the lookup_weather tool for Seattle and tell me the weather.
|
||||
Assistant: <model response varies>
|
||||
=== Follow-up question ===
|
||||
User: And what about Amsterdam?
|
||||
Assistant: <model response varies>
|
||||
"""
|
||||
+185
@@ -0,0 +1,185 @@
|
||||
# Copyright (c) Microsoft. All rights reserved.
|
||||
# ruff: noqa: T201
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import tempfile
|
||||
from collections.abc import Iterator
|
||||
from contextlib import contextmanager
|
||||
from pathlib import Path
|
||||
from typing import Annotated
|
||||
|
||||
# Uncomment this filter to suppress the experimental FileHistoryProvider warning
|
||||
# before running the sample.
|
||||
# import warnings # isort: skip
|
||||
# warnings.filterwarnings("ignore", message=r"\[FILE_HISTORY\].*", category=FutureWarning)
|
||||
from agent_framework import Agent, FileHistoryProvider, tool
|
||||
from agent_framework.foundry import FoundryChatClient
|
||||
from azure.identity.aio import AzureCliCredential
|
||||
from dotenv import load_dotenv
|
||||
from pydantic import Field
|
||||
|
||||
try:
|
||||
import orjson
|
||||
except ImportError:
|
||||
orjson = None
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
"""
|
||||
File History Provider Conversation Persistence
|
||||
|
||||
This sample demonstrates persisting a tool-driven conversation with the
|
||||
experimental `FileHistoryProvider`, reading the stored JSONL file back from
|
||||
disk, and then continuing the same conversation with another city.
|
||||
|
||||
Environment variables:
|
||||
FOUNDRY_PROJECT_ENDPOINT: Azure AI Foundry project endpoint.
|
||||
FOUNDRY_MODEL: Foundry model deployment name.
|
||||
|
||||
Key components:
|
||||
- `FileHistoryProvider`: Stores one message JSON object per line in a local
|
||||
`.jsonl` file for each session.
|
||||
- `get_weather`: A function tool that makes the persisted file show the
|
||||
assistant function call and tool result records.
|
||||
- `json.dumps(..., indent=2)`: Pretty-prints a few persisted JSONL records
|
||||
while keeping the on-disk file compact and valid.
|
||||
- `load_dotenv()`: Loads `.env` values up front so the sample can stay focused
|
||||
on history persistence instead of manual environment variable plumbing.
|
||||
- Optional `orjson`: Uses `orjson.dumps` / `orjson.loads` automatically when
|
||||
available, otherwise falls back to the standard library `json` module.
|
||||
|
||||
Security posture:
|
||||
- The history file is plaintext JSONL on disk, so use a trusted storage
|
||||
directory and treat it as conversation logging, not as secure secret storage.
|
||||
- Path safety checks protect the filename derived from the session id, but they
|
||||
do not redact message contents or encrypt the file.
|
||||
"""
|
||||
|
||||
USE_TEMP_DIRECTORY = False
|
||||
"""When True, store JSONL files in a temporary directory for this run only."""
|
||||
|
||||
LOCAL_SESSIONS_DIRECTORY_NAME = "sessions"
|
||||
"""Folder name used when persisting history next to this sample file."""
|
||||
|
||||
|
||||
@tool(approval_mode="never_require")
|
||||
def get_weather(
|
||||
city: Annotated[str, Field(description="The city to get the weather for.")],
|
||||
) -> str:
|
||||
"""Return a deterministic weather report for a city."""
|
||||
weather_reports = {
|
||||
"Seattle": "Seattle is rainy with a high of 13C.",
|
||||
"Amsterdam": "Amsterdam is cloudy with a high of 16C.",
|
||||
}
|
||||
return weather_reports.get(city, f"{city} is sunny with a high of 20C.")
|
||||
|
||||
|
||||
@contextmanager
|
||||
def _resolve_storage_directory() -> Iterator[Path]:
|
||||
"""Yield the configured storage directory for the sample run."""
|
||||
if USE_TEMP_DIRECTORY:
|
||||
with tempfile.TemporaryDirectory(prefix="af-file-history-resume-") as temp_directory:
|
||||
yield Path(temp_directory)
|
||||
return
|
||||
|
||||
storage_directory = Path(__file__).resolve().parent / LOCAL_SESSIONS_DIRECTORY_NAME
|
||||
storage_directory.mkdir(parents=True, exist_ok=True)
|
||||
yield storage_directory
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
"""Run the file history provider conversation persistence sample."""
|
||||
|
||||
with _resolve_storage_directory() as storage_directory:
|
||||
print(f"Using temporary directory: {USE_TEMP_DIRECTORY}")
|
||||
print(f"Storage directory: {storage_directory}\n")
|
||||
|
||||
# 1. Create the client, history provider, and tool-enabled agent.
|
||||
agent = Agent(
|
||||
client=FoundryChatClient(
|
||||
credential=AzureCliCredential(),
|
||||
),
|
||||
name="WeatherHistoryAgent",
|
||||
instructions=(
|
||||
"You are a helpful assistant. Use the get_weather tool for weather questions "
|
||||
"and answer in one sentence using the tool result."
|
||||
),
|
||||
tools=[get_weather],
|
||||
context_providers=[
|
||||
FileHistoryProvider(
|
||||
storage_directory,
|
||||
dumps=orjson.dumps if orjson else None,
|
||||
loads=orjson.loads if orjson else None,
|
||||
)
|
||||
],
|
||||
default_options={"store": False},
|
||||
)
|
||||
|
||||
# 2. Ask about the first city so the JSONL file is created on disk.
|
||||
session = agent.create_session()
|
||||
history_file = storage_directory / f"{session.session_id}.jsonl"
|
||||
print("=== First weather question ===\n")
|
||||
first_query = "Use the get_weather tool and tell me the weather in Seattle."
|
||||
first_response = await agent.run(first_query, session=session)
|
||||
print(f"User: {first_query}")
|
||||
print(f"Assistant: {first_response.text}\n")
|
||||
|
||||
# 3. Read the stored JSONL records back from disk and pretty-print a few of them.
|
||||
raw_lines = (await asyncio.to_thread(history_file.read_text, encoding="utf-8")).splitlines()
|
||||
print(f"Stored message lines after first question: {len(raw_lines)}")
|
||||
print(f"History file: {history_file}\n")
|
||||
print("=== JSONL preview from disk ===\n")
|
||||
for index, line in enumerate(raw_lines[:4], start=1):
|
||||
print(f"Record {index}:")
|
||||
print(json.dumps(json.loads(line), indent=2))
|
||||
print()
|
||||
|
||||
# 4. Continue the same persisted conversation with another city.
|
||||
print("=== Second weather question ===\n")
|
||||
second_query = "Now use the get_weather tool for Amsterdam."
|
||||
second_response = await agent.run(second_query, session=session)
|
||||
print(f"User: {second_query}")
|
||||
print(f"Assistant: {second_response.text}\n")
|
||||
|
||||
updated_lines = (await asyncio.to_thread(history_file.read_text, encoding="utf-8")).splitlines()
|
||||
print(f"Stored message lines after second question: {len(updated_lines)}")
|
||||
print(f"History file: {history_file}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
"""
|
||||
Sample output:
|
||||
Using temporary directory: False
|
||||
Storage directory: /path/to/samples/02-agents/conversations/sessions
|
||||
|
||||
=== First weather question ===
|
||||
|
||||
User: Use the get_weather tool and tell me the weather in Seattle.
|
||||
Assistant: <model response varies>
|
||||
|
||||
Stored message lines after first question: 4
|
||||
History file: /path/to/samples/02-agents/conversations/sessions/<session-uuid>.jsonl
|
||||
|
||||
=== JSONL preview from disk ===
|
||||
|
||||
Record 1:
|
||||
{
|
||||
"type": "message",
|
||||
"role": "user",
|
||||
...
|
||||
}
|
||||
|
||||
=== Second weather question ===
|
||||
|
||||
User: Now use the get_weather tool for Amsterdam.
|
||||
Assistant: <model response varies>
|
||||
|
||||
Stored message lines after second question: 8
|
||||
History file: /path/to/samples/02-agents/conversations/sessions/<session-uuid>.jsonl
|
||||
"""
|
||||
Reference in New Issue
Block a user