Python: Implement annotation-based context compaction (#4469)

* Implement annotation-based context compaction

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Handle missing compaction attributes in BaseChatClient

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix CI typing and bandit issues

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Optimize incremental compaction annotation pass

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* refinement

* Python: add ToolResultCompactionStrategy and CompactionProvider

Add ToolResultCompactionStrategy that collapses older tool-call groups
into short summary messages (e.g. [Tool calls: get_weather]) while
keeping the most recent groups verbatim. This mirrors the .NET
ToolResultCompactionStrategy from PR #4533.

Add CompactionProvider as a context-provider that auto-applies compaction
before each agent turn and stores compacted history in session state
after each turn.

Includes tests and samples for both features.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* refinement and alignment with dotnet PR

* updated tool result compaction

* updated tool result compaction

* Python: add ToolResultCompactionStrategy, CompactionProvider, and skip_excluded

- ToolResultCompactionStrategy collapses older tool-call groups into
  [Tool results: func_name: result] summaries with bidirectional tracing
  (same pattern as SummarizationStrategy).
- CompactionProvider as BaseContextProvider with separate before_strategy
  and after_strategy parameters. before_strategy compacts loaded context;
  after_strategy compacts stored history via history_source_id.
- InMemoryHistoryProvider gains skip_excluded flag to filter out messages
  marked as excluded by compaction strategies.
- Tests, samples, and exports updated.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fixed checks

* fix mypy

* Fix: ensure summary messages from both strategies get full compaction annotations

SummarizationStrategy was not calling annotate_message_groups after
inserting its summary message, so the summary lacked core group
annotations (id, kind, index, has_reasoning, _excluded). Added the
missing call. ToolResultCompactionStrategy already had it.

Added tests verifying both strategies produce fully annotated summaries.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* updated propagation

* fix mypy

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-03-11 20:23:00 +01:00
committed by GitHub
Unverified
parent 565c0b1623
commit 3e03a305f6
29 changed files with 4397 additions and 205 deletions
@@ -0,0 +1,23 @@
# Context Compaction Samples
This folder demonstrates context compaction patterns introduced by ADR-0019.
## Files
- `basics.py` — builds a local message list and applies each built-in strategy one at a time.
- `advanced.py` — composes multiple strategies with `TokenBudgetComposedStrategy`.
- `agent_client_overrides.py` — shows client defaults, agent-level overrides, and per-run compaction overrides.
- `custom.py` — defines a custom strategy implementing the `CompactionStrategy` protocol.
- `tiktoken_tokenizer.py` — shows a `TokenizerProtocol` implementation backed by `tiktoken`.
- `compaction_provider.py` — uses `CompactionProvider` with an agent and `InMemoryHistoryProvider`.
Run samples with:
```bash
uv run samples/02-agents/compaction/basics.py
uv run samples/02-agents/compaction/advanced.py
uv run samples/02-agents/compaction/agent_client_overrides.py
uv run samples/02-agents/compaction/custom.py
uv run samples/02-agents/compaction/tiktoken_tokenizer.py
uv run samples/02-agents/compaction/compaction_provider.py # requires OPENAI_API_KEY
```
@@ -0,0 +1,115 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import (
CharacterEstimatorTokenizer,
ChatResponse,
Message,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
SummarizationStrategy,
TokenBudgetComposedStrategy,
annotate_message_groups,
apply_compaction,
included_token_count,
)
"""This sample demonstrates composed in-run compaction with a token budget.
Key components:
- TokenBudgetComposedStrategy
- Sequential strategy composition
- Summarization with a SupportsChatGetResponse-compatible summarizer client
"""
class BudgetSummaryClient:
async def get_response(
self,
messages: list[Message],
*,
stream: bool = False,
options: dict[str, Any] | None = None,
**kwargs: Any,
) -> ChatResponse:
summary_text = f"Budget summary generated from {len(messages)} prompt messages."
return ChatResponse(messages=[Message(role="assistant", text=summary_text)])
def _build_long_history() -> list[Message]:
history = [Message(role="system", text="You are a migration copilot.")]
for i in range(1, 8):
history.append(
Message(
role="user",
text=f"Iteration {i}: capture migration requirements and edge cases.",
)
)
history.append(
Message(
role="assistant",
text=(
f"Iteration {i}: detailed plan with dependencies, rollback guidance, and testing details. "
"This sentence is intentionally long to create token pressure."
),
)
)
return history
async def main() -> None:
# 1. Build synthetic history representing long-running in-run growth.
messages = _build_long_history()
# 2. Configure tokenizer and measure token count before compaction.
tokenizer = CharacterEstimatorTokenizer()
annotate_message_groups(messages, tokenizer=tokenizer)
budget_before = included_token_count(messages)
# 3. Configure composed strategy stack.
composed = TokenBudgetComposedStrategy(
token_budget=200,
tokenizer=tokenizer,
strategies=[
SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=0),
SummarizationStrategy(
client=BudgetSummaryClient(),
target_count=3,
threshold=3,
),
SlidingWindowStrategy(keep_last_groups=4),
],
)
# 4. Apply compaction and inspect the budget result.
projected = await apply_compaction(messages, strategy=composed, tokenizer=tokenizer)
budget_after = included_token_count(messages)
print(f"Projected messages after compaction: {len(projected)}")
print(f"Included token count before compaction: {budget_before}")
print(f"Included token count after compaction: {budget_after}")
print("Projected roles:", [m.role for m in projected])
print("Projected messages with token counts:")
for msg in projected:
group = msg.additional_properties.get("_group")
token_count = group.get("token_count") if isinstance(group, dict) else None
text_preview = msg.text[:80] if msg.text else "<non-text>"
print(f"- [{msg.role}] {text_preview} ({token_count} tokens)")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Projected messages after compaction: 3
Included token count before compaction: 793
Included token count after compaction: 144
Projected roles: ['system', 'user', 'assistant']
Projected messages with token counts:
- [system] You are a migration copilot. (35 tokens)
- [user] Iteration 7: capture migration requirements and edge cases. (43 tokens)
- [assistant] Iteration 7: detailed plan with dependencies, rollback guidance, and testing det (66 tokens)
"""
@@ -0,0 +1,144 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
import asyncio
from collections.abc import Awaitable, Mapping, Sequence
from typing import Any
from agent_framework import (
GROUP_ANNOTATION_KEY,
GROUP_TOKEN_COUNT_KEY,
Agent,
BaseChatClient,
ChatResponse,
Message,
SlidingWindowStrategy,
TruncationStrategy,
)
"""This sample demonstrates client defaults, agent overrides, and run-level overrides for in-run compaction.
Key components:
- A shared client with default `compaction_strategy` and `tokenizer`
- An agent-level override that takes precedence over the shared client defaults
- A run-level override passed through `agent.run(...)`
"""
class FixedTokenizer:
"""Simple tokenizer used to make token annotations easy to inspect."""
def __init__(self, token_count: int) -> None:
self._token_count = token_count
def count_tokens(self, text: str) -> int:
return self._token_count
class InspectingChatClient(BaseChatClient[Any]):
"""Chat client that records the messages it receives after compaction."""
def __init__(self, **kwargs: Any) -> None:
super().__init__(**kwargs)
self.last_messages: list[Message] = []
def _inner_get_response(
self,
*,
messages: Sequence[Message],
stream: bool,
options: Mapping[str, Any],
**kwargs: Any,
) -> Awaitable[ChatResponse]:
if stream:
raise ValueError("This sample only demonstrates non-streaming responses.")
self.last_messages = list(messages)
async def _get_response() -> ChatResponse:
return ChatResponse(messages=[Message(role="assistant", text="done")])
return _get_response()
def _build_messages() -> list[Message]:
return [
Message(role="user", text="Collect the deployment requirements."),
Message(role="assistant", text="I will gather the constraints first."),
Message(role="user", text="Summarize the rollout risks."),
Message(role="assistant", text="The main risks are drift, downtime, and rollback gaps."),
]
def _token_count(message: Message) -> int | None:
group_annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY)
if not isinstance(group_annotation, dict):
return None
value = group_annotation.get(GROUP_TOKEN_COUNT_KEY)
return value if isinstance(value, int) else None
def _print_model_input(title: str, client: InspectingChatClient) -> None:
print(f"\n{title}")
print(f"Model receives {len(client.last_messages)} message(s):")
for message in client.last_messages:
print(f"- [{message.role}] {message.text} ({_token_count(message)} tokens)")
async def main() -> None:
# 1. Create one shared client with default compaction settings.
shared_client = InspectingChatClient(
compaction_strategy=TruncationStrategy(max_n=3, compact_to=2),
tokenizer=FixedTokenizer(7),
)
# 2. Create one agent that relies on the client defaults.
client_default_agent = Agent(client=shared_client, name="ClientDefaultAgent")
# 3. Create another agent that overrides the shared client's defaults.
agent_override = Agent(
client=shared_client,
name="AgentOverrideAgent",
compaction_strategy=SlidingWindowStrategy(keep_last_groups=3),
tokenizer=FixedTokenizer(11),
)
# 4. Run the first agent; the client defaults are applied.
await client_default_agent.run(_build_messages())
_print_model_input("1. Client default compaction", shared_client)
# 5. Run the second agent; the agent-level override wins over the client defaults.
await agent_override.run(_build_messages())
_print_model_input("2. Agent-level override", shared_client)
# 6. Override both settings for a single run; the per-run values win over both.
await agent_override.run(
_build_messages(),
compaction_strategy=TruncationStrategy(max_n=2, compact_to=1),
tokenizer=FixedTokenizer(23),
)
_print_model_input("3. Per-run override", shared_client)
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
1. Client default compaction
Model receives 2 message(s):
- [user] Summarize the rollout risks. (7 tokens)
- [assistant] The main risks are drift, downtime, and rollback gaps. (7 tokens)
2. Agent-level override
Model receives 3 message(s):
- [assistant] I will gather the constraints first. (11 tokens)
- [user] Summarize the rollout risks. (11 tokens)
- [assistant] The main risks are drift, downtime, and rollback gaps. (11 tokens)
3. Per-run override
Model receives 1 message(s):
- [assistant] The main risks are drift, downtime, and rollback gaps. (23 tokens)
"""
@@ -0,0 +1,241 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from agent_framework import (
CharacterEstimatorTokenizer,
ChatResponse,
Content,
Message,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
SummarizationStrategy,
TokenBudgetComposedStrategy,
ToolResultCompactionStrategy,
TruncationStrategy,
apply_compaction,
)
"""This sample demonstrates selecting one compaction strategy at a time.
How to use this sample:
- Keep one ``selected_strategy`` block active in ``main``.
- Comment the active block and uncomment one of the alternatives to switch strategies.
- Run again to compare behavior against the same "before" message list shown once.
"""
SUMMARY_OF_MESSAGE_IDS_KEY = "_summary_of_message_ids"
SUMMARIZED_BY_SUMMARY_ID_KEY = "_summarized_by_summary_id"
# Keep optional strategy classes imported for quick uncomment/switch in main().
AVAILABLE_STRATEGY_TYPES = (
TruncationStrategy,
CharacterEstimatorTokenizer,
SlidingWindowStrategy,
SelectiveToolCallCompactionStrategy,
ToolResultCompactionStrategy,
SummarizationStrategy,
TokenBudgetComposedStrategy,
)
class LocalSummaryClient:
"""Simple local summarizer compatible with SupportsChatGetResponse."""
async def get_response(
self,
messages: list[Message],
*,
stream: bool = False,
options: dict[str, Any] | None = None,
**kwargs: Any,
) -> ChatResponse:
return ChatResponse(messages=[Message(role="assistant", text=f"Summary for {len(messages)} messages.")])
async def main() -> None:
# 1. Build one baseline history and print it once.
messages = [
Message(role="system", text="You are a helpful assistant."),
Message(role="user", text="Plan a data migration."),
Message(role="assistant", text="I will gather requirements."),
Message(
role="assistant",
contents=[
Content.from_function_call(
call_id="call_1",
name="list_tables",
arguments='{"db":"legacy"}',
)
],
),
Message(
role="tool",
contents=[
Content.from_function_result(
call_id="call_1",
result="users, orders, events",
)
],
),
Message(role="assistant", text="I found three core tables."),
Message(role="user", text="Estimate effort and risks."),
Message(role="assistant", text="Primary risk is schema drift."),
]
print("\n--- Before compaction ---")
print(f"Message count: {len(messages)}")
for index, message in enumerate(messages, start=1):
message_text = message.text or ", ".join(content.type for content in message.contents)
print(f"{index:02d}. [{message.role}] {message_text}")
# 2. Select exactly one strategy (default shown below).
# Truncate when included history exceeds 5 messages, then keep 4.
# System remains anchored, so the oldest non-system messages are removed first.
# selected_strategy_name = "TruncationStrategy"
# selected_strategy = TruncationStrategy(max_n=5, compact_to=4, preserve_system=True)
# Keep the most recent 4 non-system groups and preserve the system anchor.
# A group represents a user turn (and related assistant/tool follow-up).
# selected_strategy_name = "SlidingWindowStrategy"
# selected_strategy = SlidingWindowStrategy(keep_last_groups=4, preserve_system=True)
# This means all tool-call groups are removed (assistant function_call message
# plus matching tool result messages). In this example, setting to 0 removes
# the single assistant+tool pair.
selected_strategy_name = "SelectiveToolCallCompactionStrategy"
selected_strategy = SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=0)
# Collapse older tool-call groups into short "[Tool results: tool_name]" summaries
# while keeping the most recent group verbatim. Unlike SelectiveToolCallCompactionStrategy
# which fully excludes groups, this preserves a readable trace of tool usage.
# selected_strategy_name = "ToolResultCompactionStrategy"
# selected_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=0)
# Summarize older messages so only recent context remains, and attach summary
# trace metadata linking summary -> originals and originals -> summary.
# summary_client = LocalSummaryClient()
# selected_strategy_name = "SummarizationStrategy"
# selected_strategy = SummarizationStrategy(
# client=summary_client, target_count=3, threshold=2
# )
# tokenizer = CharacterEstimatorTokenizer()
# selected_strategy_name = "TokenBudgetComposedStrategy"
# selected_strategy = TokenBudgetComposedStrategy(
# token_budget=150,
# tokenizer=tokenizer,
# strategies=[
# SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=0),
# SlidingWindowStrategy(keep_last_groups=2),
# ],
# )
# 3. Apply the selected strategy and print projected output.
projected = await apply_compaction(messages, strategy=selected_strategy)
print(f"\n--- After compaction ({selected_strategy_name}) ---")
print(f"Message count: {len(projected)}")
for index, message in enumerate(projected, start=1):
message_text = message.text or ", ".join(content.type for content in message.contents)
print(f"{index:02d}. [{message.role}] {message_text}")
summaries = []
summarized = []
for message in messages:
group_annotation = message.additional_properties.get("_group")
if not isinstance(group_annotation, dict):
continue
if group_annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY):
summaries.append(message)
if group_annotation.get(SUMMARIZED_BY_SUMMARY_ID_KEY):
summarized.append(message)
if summaries or summarized:
print("Summary trace metadata present:")
for message in summaries:
group_annotation = message.additional_properties.get("_group")
summarized_ids = (
group_annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY) if isinstance(group_annotation, dict) else None
)
print(f" summary_id={message.message_id} summarizes={summarized_ids}")
for message in summarized:
group_annotation = message.additional_properties.get("_group")
summarized_by = (
group_annotation.get(SUMMARIZED_BY_SUMMARY_ID_KEY) if isinstance(group_annotation, dict) else None
)
print(f" original_id={message.message_id} summarized_by={summarized_by}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output (always present):
--- Before compaction ---
Message count: 8
01. [system] You are a helpful assistant.
02. [user] Plan a data migration.
03. [assistant] I will gather requirements.
04. [assistant] function_call
05. [tool] function_result
06. [assistant] I found three core tables.
07. [user] Estimate effort and risks.
08. [assistant] Primary risk is schema drift.
"""
"""
Sample output (varies based on selected strategy):
--- After compaction (TruncationStrategy) ---
Message count: 4
01. [system] You are a helpful assistant.
02. [assistant] I found three core tables.
03. [user] Estimate effort and risks.
04. [assistant] Primary risk is schema drift.
--- After compaction (SlidingWindowStrategy) ---
Message count: 6
01. [system] You are a helpful assistant.
02. [assistant] function_call
03. [tool] function_result
04. [assistant] I found three core tables.
05. [user] Estimate effort and risks.
06. [assistant] Primary risk is schema drift.
--- After compaction (SelectiveToolCallCompactionStrategy) ---
Message count: 6
01. [system] You are a helpful assistant.
02. [user] Plan a data migration.
03. [assistant] I will gather requirements.
04. [assistant] I found three core tables.
05. [user] Estimate effort and risks.
06. [assistant] Primary risk is schema drift.
--- After compaction (ToolResultCompactionStrategy) ---
Message count: 7
01. [system] You are a helpful assistant.
02. [assistant] [Tool results: list_tables]
03. [user] Plan a data migration.
04. [assistant] I will gather requirements.
05. [assistant] I found three core tables.
06. [user] Estimate effort and risks.
07. [assistant] Primary risk is schema drift.
--- After compaction (SummarizationStrategy) ---
Message count: 5
01. [system] You are a helpful assistant.
02. [assistant] Summary for 2 messages.
03. [assistant] I found three core tables.
04. [user] Estimate effort and risks.
05. [assistant] Primary risk is schema drift.
Summary trace metadata present:
summary_id=summary_8 summarizes=['msg_1', 'msg_2', 'msg_3', 'msg_4']
original_id=msg_1 summarized_by=summary_8
original_id=msg_2 summarized_by=summary_8
original_id=msg_3 summarized_by=summary_8
original_id=msg_4 summarized_by=summary_8
--- After compaction (TokenBudgetComposedStrategy) ---
Message count: 3
01. [system] You are a helpful assistant.
02. [user] Estimate effort and risks.
03. [assistant] Primary risk is schema drift.
"""
@@ -0,0 +1,249 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from collections.abc import Sequence
from typing import Any
from agent_framework import (
Agent,
ChatContext,
CompactionProvider,
InMemoryHistoryProvider,
Message,
SlidingWindowStrategy,
ToolResultCompactionStrategy,
chat_middleware,
tool,
)
from agent_framework.openai import OpenAIChatClient
from dotenv import load_dotenv
load_dotenv()
"""
CompactionProvider with Agent Example
Demonstrates ``CompactionProvider`` as part of a real agent's context-provider
pipeline alongside ``InMemoryHistoryProvider``.
The compaction provider uses two separate strategies:
- ``before_strategy``: Applied to the loaded history before the model sees it.
Here a ``SlidingWindowStrategy`` keeps only the last 3 message groups, so
older turns get dropped as the conversation grows.
- ``after_strategy``: Applied to the stored history after each turn.
Here a ``ToolResultCompactionStrategy`` collapses all but the most recent
tool-call group into short ``[Tool results: ...]`` summaries.
A chat middleware logs the messages the model actually receives (after context
providers and compaction have run) so you can see the effect of compaction.
This sample intentionally is too aggressive in excluding content, because you can see
that the last turn actually does not have the full context any longer and is therefore
only comparing the results from Paris and Tokyo and not from London.
Run with:
uv run samples/02-agents/compaction/compaction_provider.py
"""
@tool(approval_mode="never_require")
def get_weather(city: str) -> str:
"""Get the current weather for a city."""
weather_data = {
"London": "cloudy, 12°C",
"Paris": "sunny, 18°C",
"Tokyo": "rainy, 22°C",
}
return weather_data.get(city, f"No data for {city}")
@chat_middleware
async def log_model_input(context: ChatContext, call_next: Any) -> None:
"""Chat middleware that logs the messages sent to the model (after compaction)."""
msgs: Sequence[Message] = context.messages
print(f"\n Model receives {len(msgs)} messages:")
for i, m in enumerate(msgs, 1):
text = m.text or ", ".join(c.type for c in m.contents)
print(f" {i:02d}. [{m.role}] {text[:70]}")
await call_next()
async def main() -> None:
client = OpenAIChatClient(model_id="gpt-4o-mini")
# History provider loads/stores conversation messages in session.state.
# skip_excluded=True means get_messages() will omit messages that were
# marked as excluded by the CompactionProvider's after_strategy.
history = InMemoryHistoryProvider(skip_excluded=True)
compaction = CompactionProvider(
# BEFORE each turn: SlidingWindow drops older message groups from
# the loaded context so the model's input stays bounded. With
# keep_last_groups=3, only the 3 most recent non-system groups are
# sent to the model — older turns are not shown to the model.
before_strategy=SlidingWindowStrategy(keep_last_groups=3, preserve_system=True),
# AFTER each turn: ToolResultCompaction marks older tool-call groups
# (assistant function_call + tool result messages) as excluded and
# inserts a short "[Tool results: ...]" summary. The original messages
# stay in storage with _excluded=True; skip_excluded on the history
# provider ensures they won't be loaded on the next turn.
after_strategy=ToolResultCompactionStrategy(keep_last_tool_call_groups=1),
history_source_id=history.source_id,
)
# Provider order matters:
# before_run: history loads → compaction trims (forward order)
# after_run: compaction marks exclusions → history stores (reverse order)
agent = Agent(
client=client,
name="WeatherAssistant",
instructions="You are a helpful weather assistant. Use the get_weather tool when asked about weather.",
tools=[get_weather],
context_providers=[history, compaction],
middleware=[log_model_input],
)
session = agent.create_session()
queries = [
"What is the weather in London?",
"How about Paris?",
"And Tokyo?",
"Which city is the warmest?",
]
for turn, query in enumerate(queries, 1):
print(f"\n{'=' * 60}")
print(f"Turn {turn} — User: {query}")
# ── What is in the persistent store right now? ──
# This shows ALL messages the history provider has accumulated,
# including any that were marked as excluded by the after_strategy
# on the previous turn. Messages marked ✗ are excluded and won't
# be loaded because skip_excluded=True on the history provider.
stored = session.state.get(history.source_id, {}).get("messages", [])
if stored:
excluded_count = sum(1 for m in stored if m.additional_properties.get("_excluded", False))
print(f"\n Stored history: {len(stored)} messages ({excluded_count} excluded)")
for i, m in enumerate(stored, 1):
text = m.text or ", ".join(c.type for c in m.contents)
excluded = m.additional_properties.get("_excluded", False)
reason = m.additional_properties.get("_exclude_reason", "")
if excluded:
marker = f" ✗ ({reason})"
elif (m.text or "").startswith("[Tool results:"):
marker = " ← summary"
else:
marker = ""
print(f" {i:02d}. [{m.role}]{marker} {text[:65]}")
# ── What the model actually sees ──
# The chat middleware fires AFTER the full context pipeline:
# 1. InMemoryHistoryProvider loads non-excluded stored messages
# 2. CompactionProvider.before_strategy (SlidingWindow) drops
# older groups so only the last 3 non-system groups survive
# 3. The agent prepends instructions and appends the new user input
# So this list is shorter than what's in storage.
result = await agent.run(query, session=session)
# ── What happens after the turn ──
# The agent's after_run pipeline runs in reverse provider order:
# 1. CompactionProvider.after_strategy (ToolResultCompaction) marks
# older tool-call groups as excluded in the stored messages —
# their assistant+tool messages get ✗ and a summary is inserted
# 2. InMemoryHistoryProvider appends the new input + response
# On the NEXT turn, skip_excluded=True means the ✗ messages won't load.
print(f"\n Agent: {result.text}")
print(f"\n{'=' * 60}")
print("Done.")
"""
Example output:
============================================================
Turn 1 — User: What is the weather in London?
Model receives 1 messages:
01. [user] What is the weather in London?
Agent: The weather in London is cloudy with a temperature of 12°C.
============================================================
Turn 2 — User: How about Paris?
Stored history: 4 messages (0 excluded)
01. [user] What is the weather in London?
02. [assistant] function_call
03. [tool] function_result
04. [assistant] The weather in London is cloudy with a temperature of 12°C.
Model receives 5 messages:
01. [user] What is the weather in London?
02. [assistant] function_call
03. [tool] function_result
04. [assistant] The weather in London is cloudy with a temperature of 12°C.
05. [user] How about Paris?
Agent: The weather in Paris is sunny with a temperature of 18°C.
============================================================
Turn 3 — User: And Tokyo?
Stored history: 8 messages (0 excluded)
01. [user] What is the weather in London?
02. [assistant] function_call
03. [tool] function_result
04. [assistant] The weather in London is cloudy with a temperature of 12°C.
05. [user] How about Paris?
06. [assistant] function_call
07. [tool] function_result
08. [assistant] The weather in Paris is sunny with a temperature of 18°C.
Model receives 5 messages:
01. [assistant] The weather in London is cloudy with a temperature of 12°C.
02. [assistant] function_call
03. [tool] function_result
04. [assistant] The weather in Paris is sunny with a temperature of 18°C.
05. [user] And Tokyo?
Agent: The weather in Tokyo is rainy with a temperature of 22°C.
============================================================
Turn 4 — User: Which city is the warmest?
Stored history: 13 messages (3 excluded)
01. [user] What is the weather in London?
02. [assistant] ← summary [Tool results: get_weather: cloudy, 12°C]
03. [assistant] ✗ (tool_result_compaction) function_call
04. [tool] ✗ (tool_result_compaction) function_result
05. [assistant] The weather in London is cloudy with a temperature of 12°C.
06. [user] ✗ (tool_result_compaction) How about Paris?
07. [assistant] function_call
08. [tool] function_result
09. [assistant] The weather in Paris is sunny with a temperature of 18°C.
10. [user] And Tokyo?
11. [assistant] function_call
12. [tool] function_result
13. [assistant] The weather in Tokyo is rainy with a temperature of 22°C.
Model receives 8 messages:
01. [assistant] function_call
02. [tool] function_result
03. [assistant] The weather in Paris is sunny with a temperature of 18°C.
04. [user] And Tokyo?
05. [assistant] function_call
06. [tool] function_result
07. [assistant] The weather in Tokyo is rainy with a temperature of 22°C.
08. [user] Which city is the warmest?
Agent: Tokyo is the warmest city with a temperature of 22°C, compared to Paris, which is at 18°C.
============================================================
Done.
"""
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,89 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework import (
Message,
annotate_message_groups,
apply_compaction,
included_messages,
)
"""This sample demonstrates authoring a custom compaction strategy.
The custom strategy keeps system messages and the most recent user turn while
excluding older non-system groups.
"""
EXCLUDED_KEY = "_excluded"
GROUP_ANNOTATION_KEY = "_group"
class KeepLastUserTurnStrategy:
async def __call__(self, messages: list[Message]) -> bool:
group_ids = annotate_message_groups(messages)
group_kinds: dict[str, str] = {}
for message in messages:
group_annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY)
group_id = group_annotation.get("id") if isinstance(group_annotation, dict) else None
kind = group_annotation.get("kind") if isinstance(group_annotation, dict) else None
if (
isinstance(group_id, str)
and isinstance(kind, str)
and group_id not in group_kinds
):
group_kinds[group_id] = kind
user_group_ids = [
group_id for group_id in group_ids if group_kinds.get(group_id) == "user"
]
if not user_group_ids:
return False
keep_user_group_id = user_group_ids[-1]
changed = False
for message in messages:
group_annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY)
group_id = group_annotation.get("id") if isinstance(group_annotation, dict) else None
if message.role == "system":
continue
if group_id == keep_user_group_id:
continue
if message.additional_properties.get(EXCLUDED_KEY) is not True:
changed = True
message.additional_properties[EXCLUDED_KEY] = True
return changed
def _messages() -> list[Message]:
return [
Message(role="system", text="You are concise."),
Message(role="user", text="first request"),
Message(role="assistant", text="first response"),
Message(role="user", text="second request"),
Message(role="assistant", text="second response"),
]
async def main() -> None:
# 1. Build a short conversation.
messages = _messages()
print(f"Number of messages before compaction: {len(messages)}")
# 2. Apply custom strategy.
await apply_compaction(messages, strategy=KeepLastUserTurnStrategy())
# 3. Print projected messages.
projected = included_messages(messages)
print(f"Number of messages after compaction: {len(projected)}")
for msg in projected:
print(f"[{msg.role}] {msg.text}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Number of messages before compaction: 5
Number of messages after compaction: 2
[system] You are concise.
[user] second request
"""
@@ -0,0 +1,124 @@
# /// script
# requires-python = ">=3.10"
# dependencies = [
# "tiktoken",
# ]
# ///
# Run with: uv run samples/02-agents/compaction/tiktoken_tokenizer.py
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
import tiktoken
from agent_framework import (
Message,
TokenizerProtocol,
TruncationStrategy,
annotate_message_groups,
apply_compaction,
included_token_count,
)
"""This sample demonstrates a custom TokenizerProtocol implementation with tiktoken.
Key components:
- `TiktokenTokenizer` backed by `tiktoken`
- Token-based `TruncationStrategy` (`max_n` / `compact_to`)
- Inspecting projected roles and remaining included token count
"""
class TiktokenTokenizer(TokenizerProtocol):
"""TokenizerProtocol implementation backed by tiktoken's o200k_base (gpt-4.1 and up default) encoding."""
def __init__(
self, *, encoding_name: str = "o200k_base", model_name: str | None = None
) -> None:
if model_name is not None:
self._encoding = tiktoken.encoding_for_model(model_name)
else:
self._encoding: Any = tiktoken.get_encoding(encoding_name)
def count_tokens(self, text: str) -> int:
return len(self._encoding.encode(text))
def _build_messages() -> list[Message]:
return [
Message(role="system", text="You are a migration assistant."),
Message(
role="user",
text="List all migration risks and include detailed mitigations for each risk category.",
),
Message(
role="assistant",
text=(
"Primary risks include schema drift, missing foreign key constraints, "
"and data quality regressions. Mitigations include staged validation, "
"shadow writes, and replay-based verification."
),
),
Message(
role="user",
text=(
"Now provide a detailed checklist with owners, rollback "
"gates, and validation criteria."
),
),
Message(
role="assistant",
text=(
"Checklist: baseline snapshots, migration dry-run, production "
"canary, progressive deployment, automated integrity checks, and "
"post-migration reconciliation."
),
),
]
async def main() -> None:
# 1. Create a tokenizer implementation that uses tiktoken.
tokenizer = TiktokenTokenizer()
# 2. Configure token-based truncation.
strategy = TruncationStrategy(
max_n=250,
compact_to=150,
tokenizer=tokenizer,
preserve_system=True,
)
# 3. Build conversation and measure token count before compaction.
messages = _build_messages()
annotate_message_groups(messages, tokenizer=tokenizer)
token_count_before = included_token_count(messages)
# 4. Apply compaction and measure token count after compaction.
projected = await apply_compaction(messages, strategy=strategy, tokenizer=tokenizer)
token_count_after = included_token_count(messages)
# 5. Print before/after token counts and projected conversation.
print(f"Projected messages: {len(projected)}")
print(f"Included token count before compaction: {token_count_before}")
print(f"Included token count after compaction: {token_count_after}")
print("Projected roles:", [message.role for message in projected])
for message in projected:
token_count = message.additional_properties.get("_group", {}).get("token_count")
print(f"- [{message.role}] {message.text} ({token_count} tokens)")
if __name__ == "__main__":
asyncio.run(main())
"""
Projected messages: 3
Included token count before compaction: 263
Included token count after compaction: 149
Projected roles: ['system', 'user', 'assistant']
- [system] You are a migration assistant. (40 tokens)
- [user] Now provide a detailed checklist with owners, rollback gates, and validation criteria. (49 tokens)
- [assistant] Checklist: baseline snapshots, migration dry-run, production canary,
progressive deployment, automated integrity checks, and post-migration reconciliation. (60 tokens)
"""