Python: Fix compaction message-id collisions and tool-loop summary persistence (#6299)

* Fix compaction message-id collisions and tool-loop summary persistence

Fixes two bugs in the compaction strategies:

- #5237: incremental group annotation assigned message ids by position
  within the re-annotated slice, so moving the re-annotation start back to
  a previous group start restarted ids at 0 and produced collisions
  (e.g. a user message reusing an assistant message's id), merging groups
  and causing tool-result compaction to wrongly exclude messages.
  group_messages/_ensure_message_ids now take an id_offset and guard
  against existing-id collisions; annotate_message_groups threads the
  slice start index through as the offset.

- #4991: the function-invocation loop copied the message list each
  iteration, so summaries inserted by compaction landed in a throwaway
  copy and were lost across tool-loop iterations (only the persistent
  excluded flags survived). _prepare_messages_for_model_call now compacts
  the list in place when messages is a list, so inserted summaries persist.

Adds regression tests (incremental id uniqueness, existing-id collision
avoidance, idempotency, and tool-loop summary persistence including
streaming and conversation-id modes).

Also adds a summarization.py sample demonstrating SummarizationStrategy
directly with a real client, and reworks advanced.py with tool-call
groups and a real summarizer.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Guard incremental message-id assignment against prefix-id collisions

Addresses PR review on #5237: _ensure_message_ids only guarded against
collisions within the re-annotated slice. A preexisting (e.g. user-supplied)
id in the preserved prefix could still be reassigned in the suffix when the
id was numerically out of position, merging groups across the re-annotation
boundary again.

group_messages/_ensure_message_ids now accept reserved_ids, and
annotate_message_groups passes the preserved prefix's ids so auto-assigned
suffix ids never collide across the full list. Adds a regression test
reproducing the out-of-position prefix-id collision.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-06-04 10:37:59 +02:00
committed by GitHub
Unverified
parent f29bae8fbc
commit f970a699d8
7 changed files with 633 additions and 54 deletions
@@ -5,7 +5,8 @@ This folder demonstrates context compaction patterns introduced by ADR-0019.
## Files
- `basics.py` — builds a local message list and applies each built-in strategy one at a time.
- `advanced.py` — composes multiple strategies with `TokenBudgetComposedStrategy`.
- `summarization.py` — runs `SummarizationStrategy` directly with a real summarizing chat client.
- `advanced.py` — composes multiple strategies with `TokenBudgetComposedStrategy`, including a real summarizer and tool-call groups.
- `agent_client_overrides.py` — shows client defaults, agent-level overrides, and per-run compaction overrides.
- `custom.py` — defines a custom strategy implementing the `CompactionStrategy` protocol.
- `tiktoken_tokenizer.py` — shows a `TokenizerProtocol` implementation backed by `tiktoken`.
@@ -15,7 +16,8 @@ Run samples with:
```bash
uv run samples/02-agents/compaction/basics.py
uv run samples/02-agents/compaction/advanced.py
uv run samples/02-agents/compaction/summarization.py # requires OPENAI_API_KEY
uv run samples/02-agents/compaction/advanced.py # requires OPENAI_API_KEY
uv run samples/02-agents/compaction/agent_client_overrides.py
uv run samples/02-agents/compaction/custom.py
uv run samples/02-agents/compaction/tiktoken_tokenizer.py
+138 -44
View File
@@ -1,11 +1,14 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any
from typing import Any, cast
from agent_framework import (
GROUP_ANNOTATION_KEY,
GROUP_TOKEN_COUNT_KEY,
SUMMARY_OF_MESSAGE_IDS_KEY,
CharacterEstimatorTokenizer,
ChatResponse,
Content,
Message,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
@@ -15,36 +18,48 @@ from agent_framework import (
apply_compaction,
included_token_count,
)
from agent_framework.openai import OpenAIChatClient
from dotenv import load_dotenv
"""This sample demonstrates composed in-run compaction with a token budget.
load_dotenv()
"""This sample demonstrates composed in-run compaction under a token budget.
A long, tool-using conversation is compacted with a single
``TokenBudgetComposedStrategy`` that runs three strategies in order until the
included-token count fits the budget:
1. ``SelectiveToolCallCompactionStrategy`` — drop older tool-call groups
(assistant ``function_call`` + ``tool`` result messages) that are expensive
and rarely needed verbatim once acted upon.
2. ``SummarizationStrategy`` — use a *real* chat client to summarize the oldest
remaining turns into a single linked summary message.
3. ``SlidingWindowStrategy`` — as a final guard, keep only the most recent
groups if the budget is still exceeded.
Key components:
- TokenBudgetComposedStrategy
- Sequential strategy composition
- Summarization with a SupportsChatGetResponse-compatible summarizer client
- TokenBudgetComposedStrategy with ordered, escalating strategies
- A real OpenAIChatClient used as the summarizer (not a stub)
- Tool-call groups in the history so tool-call compaction is meaningful
- Token accounting before/after via a TokenizerProtocol
Run with:
uv run samples/02-agents/compaction/advanced.py # requires OPENAI_API_KEY
"""
class BudgetSummaryClient:
async def get_response(
self,
messages: list[Message],
*,
stream: bool = False,
options: dict[str, Any] | None = None,
**kwargs: Any,
) -> ChatResponse:
summary_text = f"Budget summary generated from {len(messages)} prompt messages."
return ChatResponse(messages=[Message(role="assistant", contents=[summary_text])])
def _build_long_history() -> list[Message]:
history = [Message(role="system", contents=["You are a migration copilot."])]
for i in range(1, 8):
"""Build a long, tool-using migration conversation to create token pressure."""
history: list[Message] = [
Message(role="system", contents=["You are a migration copilot that plans and executes database migrations."]),
]
# A few verbose planning turns to build up token pressure.
for i in range(1, 5):
history.append(
Message(
role="user",
contents=[f"Iteration {i}: capture migration requirements and edge cases."],
contents=[f"Iteration {i}: capture migration requirements, constraints, and edge cases in detail."],
)
)
history.append(
@@ -52,17 +67,62 @@ def _build_long_history() -> list[Message]:
role="assistant",
contents=[
(
f"Iteration {i}: detailed plan with dependencies, rollback guidance, and testing details. "
"This sentence is intentionally long to create token pressure."
f"Iteration {i}: produced a detailed plan covering dependencies, rollback guidance, data "
"backfill, and a full testing matrix. This response is intentionally verbose to add pressure."
)
],
)
)
# A tool-call group: the assistant inspects the schema via a tool.
history.append(
Message(
role="assistant",
contents=[Content.from_function_call(call_id="call_1", name="inspect_schema", arguments='{"db":"legacy"}')],
)
)
history.append(
Message(
role="tool",
contents=[Content.from_function_result(call_id="call_1", result="tables: users, orders, invoices, events")],
)
)
history.append(Message(role="assistant", contents=["Schema inspection found four core tables to migrate."]))
# The most recent turn — this should survive compaction verbatim.
history.append(Message(role="user", contents=["What is the safest order to migrate these tables?"]))
history.append(
Message(
role="assistant",
contents=["Migrate reference tables (users) first, then orders, then invoices, and events last."],
)
)
return history
def _annotation(message: Message) -> dict[str, Any] | None:
annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY)
return cast("dict[str, Any]", annotation) if isinstance(annotation, dict) else None
def _token_count(message: Message) -> int | None:
annotation = _annotation(message)
return annotation.get(GROUP_TOKEN_COUNT_KEY) if annotation else None
def _relation(message: Message) -> str:
"""Describe how a projected message relates to the original messages."""
annotation = _annotation(message)
if annotation is None:
return ""
summarizes = annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY)
if summarizes:
return f" <- summary of {summarizes}"
return ""
async def main() -> None:
# 1. Build synthetic history representing long-running in-run growth.
# 1. Build synthetic history representing long-running, tool-using growth.
messages = _build_long_history()
# 2. Configure tokenizer and measure token count before compaction.
@@ -70,22 +130,35 @@ async def main() -> None:
annotate_message_groups(messages, tokenizer=tokenizer)
budget_before = included_token_count(messages)
# 3. Configure composed strategy stack.
print("Before compaction message set:")
for msg in messages:
text_preview = msg.text[:80] if msg.text else "<non-text>"
print(f"- [{msg.role}] {text_preview} ({msg.message_id}, {_token_count(msg)} tokens)")
print()
# 3. Create a real summarizer client. SummarizationStrategy only requires a
# SupportsChatGetResponse-compatible client.
summarizer = OpenAIChatClient(model="gpt-4o-mini")
# 4. Configure the composed strategy stack. Strategies run in order and the
# composed strategy stops as soon as the included-token budget is met.
# The budget is set high enough that the generated summary fits within it:
# a tighter budget would trip the composed fallback, which excludes the
# oldest group first (the summary) once the included set exceeds the
# budget. SlidingWindowStrategy remains as a recency safety net for longer
# histories; for this sample summarization alone reaches budget, so the
# window does not need to fire.
composed = TokenBudgetComposedStrategy(
token_budget=200,
token_budget=400,
tokenizer=tokenizer,
strategies=[
SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=0),
SummarizationStrategy(
client=BudgetSummaryClient(),
target_count=3,
threshold=3,
),
SummarizationStrategy(client=summarizer, target_count=3, threshold=2),
SlidingWindowStrategy(keep_last_groups=4),
],
)
# 4. Apply compaction and inspect the budget result.
# 5. Apply compaction and inspect the budget result.
projected = await apply_compaction(messages, strategy=composed, tokenizer=tokenizer)
budget_after = included_token_count(messages)
@@ -95,23 +168,44 @@ async def main() -> None:
print("Projected roles:", [m.role for m in projected])
print("Projected messages with token counts:")
for msg in projected:
group = msg.additional_properties.get("_group")
token_count = group.get("token_count") if isinstance(group, dict) else None
text_preview = msg.text[:80] if msg.text else "<non-text>"
print(f"- [{msg.role}] {text_preview} ({token_count} tokens)")
print(f"- [{msg.role}] {text_preview} ({msg.message_id}, {_token_count(msg)} tokens){_relation(msg)}")
# 6. Surface the model-generated summary, if summarization fired.
for msg in messages:
annotation = _annotation(msg)
if annotation and annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY):
print("\nGenerated summary:")
print(f" {msg.text}")
print(f" summarizes: {annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY)}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output:
Projected messages after compaction: 3
Included token count before compaction: 793
Included token count after compaction: 144
Projected roles: ['system', 'user', 'assistant']
Sample output (summary text and token counts vary because the summary is generated by the model):
Before compaction message set:
- [system] You are a migration copilot that plans and executes database migrations. (msg_0, 46 tokens)
- [user] Iteration 1: capture migration requirements, constraints, and edge cases in deta (msg_1, 48 tokens)
- [assistant] Iteration 1: produced a detailed plan covering dependencies, rollback guidance, (msg_2, 73 tokens)
...
- [user] What is the safest order to migrate these tables? (msg_12, 40 tokens)
- [assistant] Migrate reference tables (users) first, then orders, then invoices, and events l (msg_13, 50 tokens)
Projected messages after compaction: 5
Included token count before compaction: 757
Included token count after compaction: 274
Projected roles: ['system', 'assistant', 'assistant', 'user', 'assistant']
Projected messages with token counts:
- [system] You are a migration copilot. (35 tokens)
- [user] Iteration 7: capture migration requirements and edge cases. (43 tokens)
- [assistant] Iteration 7: detailed plan with dependencies, rollback guidance, and testing det (66 tokens)
- [system] You are a migration copilot that plans and executes database migrations. (msg_0, 46 tokens)
- [assistant] Across four planning turns the user and assistant... (summary_14, 96 tokens) <- summary of [msg_1..8]
- [assistant] Schema inspection found four core tables to migrate. (msg_11, 42 tokens)
- [user] What is the safest order to migrate these tables? (msg_12, 40 tokens)
- [assistant] Migrate reference tables (users) first, then orders, then invoices, and events l (msg_13, 50 tokens)
Generated summary:
Across four planning turns the user and assistant defined the migration requirements...
summarizes: ['msg_1', 'msg_2', 'msg_3', 'msg_4', 'msg_5', 'msg_6', 'msg_7', 'msg_8']
"""
@@ -0,0 +1,159 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from typing import Any, cast
from agent_framework import (
GROUP_ANNOTATION_KEY,
SUMMARIZED_BY_SUMMARY_ID_KEY,
SUMMARY_OF_MESSAGE_IDS_KEY,
Message,
SummarizationStrategy,
apply_compaction,
)
from agent_framework.openai import OpenAIChatClient
from dotenv import load_dotenv
load_dotenv()
"""This sample demonstrates the SummarizationStrategy directly.
Unlike SlidingWindow/Truncation strategies that simply drop older groups,
``SummarizationStrategy`` calls a real chat client to *summarize* the oldest
message groups, replaces them with a single linked summary message, and keeps
the most recent turns verbatim. This preserves long-range context (decisions,
goals, unresolved items) while bounding the prompt size.
Key components:
- SummarizationStrategy with a real OpenAIChatClient summarizer
- ``apply_compaction`` to run the strategy over a message list
- Bidirectional summary trace metadata (summary -> originals, original -> summary)
Run with:
uv run samples/02-agents/compaction/summarization.py # requires OPENAI_API_KEY
"""
def _annotation(message: Message) -> dict[str, Any] | None:
annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY)
return cast("dict[str, Any]", annotation) if isinstance(annotation, dict) else None
def _build_history() -> list[Message]:
"""Build a multi-turn conversation long enough to trigger summarization."""
return [
Message(role="system", contents=["You are a project planning assistant."]),
Message(role="user", contents=["We are migrating a monolith to microservices. Where do we start?"]),
Message(
role="assistant",
contents=["Start by mapping bounded contexts and identifying the highest-churn modules to extract first."],
),
Message(role="user", contents=["The billing module changes most often. What are the risks of extracting it?"]),
Message(
role="assistant",
contents=["Main risks: distributed transactions, invoices-table ownership, and latency on hot paths."],
),
Message(role="user", contents=["How should we handle the shared invoices table?"]),
Message(
role="assistant",
contents=["Use the strangler-fig pattern: dual-write during transition, then make billing the owner."],
),
Message(role="user", contents=["What is the most recent decision we made?"]),
Message(role="assistant", contents=["We decided to extract billing first using the strangler-fig pattern."]),
]
def _print_messages(label: str, messages: list[Message]) -> None:
print(f"\n--- {label} ---")
print(f"Message count: {len(messages)}")
for index, message in enumerate(messages, start=1):
text = message.text or ", ".join(content.type for content in message.contents)
print(f"{index:02d}. [{message.role}] {text[:90]}")
async def main() -> None:
# 1. Create a real summarizing client. SummarizationStrategy only requires a
# SupportsChatGetResponse-compatible client, so any chat client works.
summarizer = OpenAIChatClient(model="gpt-4o-mini")
# 2. Build a conversation and show it before compaction.
messages = _build_history()
_print_messages("Before compaction", messages)
# 3. Configure the strategy. It triggers once the included non-system message
# count exceeds ``target_count + threshold`` (here 4 + 2 = 6), summarizing
# the oldest groups down toward ``target_count`` while keeping recent turns.
strategy = SummarizationStrategy(
client=summarizer,
target_count=4,
threshold=2,
)
# 4. Apply the strategy. The oldest groups are summarized into a single
# assistant message; the projected list is what the model would receive.
projected = await apply_compaction(messages, strategy=strategy)
_print_messages("After compaction (SummarizationStrategy)", projected)
# 5. Inspect the generated summary and its bidirectional trace metadata.
print("\n--- Summary trace ---")
for message in messages:
annotation = _annotation(message)
if annotation is None:
continue
summarizes = annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY)
if summarizes:
print(f"Generated summary ({message.message_id}):")
print(f" {message.text}")
print(f" summarizes original ids: {summarizes}")
summarized_by: dict[str | None, Any] = {}
for message in messages:
annotation = _annotation(message)
if annotation is None:
continue
summary_id = annotation.get(SUMMARIZED_BY_SUMMARY_ID_KEY)
if summary_id:
summarized_by[message.message_id] = summary_id
if summarized_by:
print("Originals replaced by the summary:")
for original_id, summary_id in summarized_by.items():
print(f" {original_id} -> {summary_id}")
if __name__ == "__main__":
asyncio.run(main())
"""
Sample output (summary text varies because it is generated by the model):
--- Before compaction ---
Message count: 9
01. [system] You are a project planning assistant.
02. [user] We are migrating a monolith to microservices. Where do we start?
03. [assistant] Start by mapping bounded contexts and identifying the highest-churn modules to ex
04. [user] The billing module changes most often. What are the risks of extracting it?
05. [assistant] Main risks: distributed transactions, data ownership of the invoices table, and lat
06. [user] How should we handle the shared invoices table?
07. [assistant] Use the strangler-fig pattern: dual-write during transition, then make billing the
08. [user] What is the most recent decision we made?
09. [assistant] We decided to extract billing first using the strangler-fig pattern.
--- After compaction (SummarizationStrategy) ---
Message count: 6
01. [system] You are a project planning assistant.
02. [assistant] The user is migrating a monolith to microservices and decided to extract the billin
03. [user] How should we handle the shared invoices table?
04. [assistant] Use the strangler-fig pattern: dual-write during transition, then make billing the
05. [user] What is the most recent decision we made?
06. [assistant] We decided to extract billing first using the strangler-fig pattern.
--- Summary trace ---
Generated summary (summary_9):
The user is migrating a monolith to microservices and decided to extract the billing module first...
summarizes original ids: ['msg_1', 'msg_2', 'msg_3', 'msg_4', 'msg_5']
Originals replaced by the summary:
msg_1 -> summary_9
msg_2 -> summary_9
msg_3 -> summary_9
msg_4 -> summary_9
msg_5 -> summary_9
"""