diff --git a/python/packages/core/agent_framework/_clients.py b/python/packages/core/agent_framework/_clients.py index ddd765e654..fd004003b4 100644 --- a/python/packages/core/agent_framework/_clients.py +++ b/python/packages/core/agent_framework/_clients.py @@ -380,8 +380,15 @@ class BaseChatClient(SerializationMixin, ABC, Generic[OptionsCoT]): return prepared_messages from ._compaction import apply_compaction + # Compact the caller's list in place when possible. A compaction operation has + # two halves: exclusion flags (mutated on shared Message objects) and inserted + # summary messages. Operating on the original list keeps both halves on the list + # the function-invocation tool loop reuses across iterations; otherwise inserted + # summaries would be lost on a throwaway copy while exclusions persisted, silently + # dropping older groups (issue #4991). + working_messages = messages if isinstance(messages, list) else prepared_messages return await apply_compaction( - prepared_messages, + working_messages, strategy=compaction_strategy, tokenizer=tokenizer, ) diff --git a/python/packages/core/agent_framework/_compaction.py b/python/packages/core/agent_framework/_compaction.py index 69e35726ea..3644bf4a9c 100644 --- a/python/packages/core/agent_framework/_compaction.py +++ b/python/packages/core/agent_framework/_compaction.py @@ -4,7 +4,7 @@ from __future__ import annotations import json import logging -from collections.abc import Mapping, Sequence +from collections.abc import Iterable, Mapping, Sequence from typing import ( TYPE_CHECKING, Any, @@ -92,10 +92,23 @@ def _is_reasoning_only_assistant(message: Message) -> bool: return all(content.type == "text_reasoning" for content in message.contents) -def _ensure_message_ids(messages: list[Message]) -> None: +def _ensure_message_ids( + messages: list[Message], *, id_offset: int = 0, reserved_ids: Iterable[str] | None = None +) -> None: + existing_ids: set[str] = set(reserved_ids) if reserved_ids is not None else set() + existing_ids.update(message.message_id for message in messages if message.message_id) for index, message in enumerate(messages): - if not message.message_id: - message.message_id = f"msg_{index}" + if message.message_id: + continue + candidate = f"msg_{id_offset + index}" + if candidate in existing_ids: + counter = id_offset + len(messages) + candidate = f"msg_{counter}" + while candidate in existing_ids: + counter += 1 + candidate = f"msg_{counter}" + message.message_id = candidate + existing_ids.add(candidate) def _group_id_for(message: Message, group_index: int) -> str: @@ -104,14 +117,27 @@ def _group_id_for(message: Message, group_index: int) -> str: return f"group_index_{group_index}" -def group_messages(messages: list[Message]) -> list[dict[str, Any]]: +def group_messages( + messages: list[Message], *, id_offset: int = 0, reserved_ids: Iterable[str] | None = None +) -> list[dict[str, Any]]: """Compute group spans and metadata for annotation. + Args: + messages: The messages (or a slice of them) to group. + + Keyword Args: + id_offset: Absolute starting index used when auto-assigning ``message_id`` + values, so incremental annotation of a list slice produces ids that + stay unique across the full list. + reserved_ids: Message ids that already exist outside ``messages`` (for + example in a preserved prefix). Auto-assigned ids are guaranteed not + to collide with these, preventing duplicate ids across the full list. + Returns: Ordered list of lightweight span dicts with keys: ``group_id``, ``kind``, ``start_index``, ``end_index``, ``has_reasoning``. """ - _ensure_message_ids(messages) + _ensure_message_ids(messages, id_offset=id_offset, reserved_ids=reserved_ids) spans: list[dict[str, Any]] = [] i = 0 group_index = 0 @@ -439,7 +465,8 @@ def annotate_message_groups( if previous_group_index is not None: group_index_offset = previous_group_index + 1 - spans = group_messages(messages[start_index:]) + reserved_ids = {message.message_id for message in messages[:start_index] if message.message_id} + spans = group_messages(messages[start_index:], id_offset=start_index, reserved_ids=reserved_ids) for span_index, span in enumerate(spans): group_id = str(span["group_id"]) kind = _coerce_group_kind(span["kind"]) diff --git a/python/packages/core/tests/core/test_clients.py b/python/packages/core/tests/core/test_clients.py index 7657993d56..ac110a4a17 100644 --- a/python/packages/core/tests/core/test_clients.py +++ b/python/packages/core/tests/core/test_clients.py @@ -11,10 +11,14 @@ from agent_framework import ( GROUP_TOKEN_COUNT_KEY, BaseChatClient, ChatResponse, + ChatResponseUpdate, + Content, Message, SlidingWindowStrategy, SupportsChatGetResponse, + ToolResultCompactionStrategy, TruncationStrategy, + tool, ) @@ -258,6 +262,196 @@ async def test_base_client_default_tokenizer_without_strategy_annotates_messages assert captured_token_counts == [[19, 19]] +def _tool_call_response(call_id: str, location: str) -> ChatResponse: + return ChatResponse( + messages=Message( + role="assistant", + contents=[ + Content.from_function_call( + call_id=call_id, + name="lookup_weather", + arguments=f'{{"location": "{location}"}}', + ) + ], + ), + response_id=f"resp_{call_id}", + ) + + +def _is_tool_result_summary(message: Message) -> bool: + text = message.text or "" + return message.role == "assistant" and text.startswith("[Tool results:") + + +async def test_function_loop_persists_inserted_summaries_across_iterations( + chat_client_base: SupportsChatGetResponse, +) -> None: + # Regression test for #4991: compaction inserts summary messages and excludes the + # originals. Across tool-loop iterations the exclusion flags persisted (shared Message + # objects) but the inserted summaries were dropped (they only lived on a throwaway copy), + # so older tool groups were silently lost with no summary representing them. + chat_client_base.function_invocation_configuration["enabled"] = True # type: ignore[attr-defined] + chat_client_base.function_invocation_configuration["max_iterations"] = 3 # type: ignore[attr-defined] + chat_client_base.compaction_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1) # type: ignore[attr-defined] + + @tool(name="lookup_weather", approval_mode="never_require") + def lookup_weather(location: str) -> str: + return f"Weather in {location}: sunny" + + chat_client_base.run_responses = [ # type: ignore[attr-defined] + _tool_call_response("call_1", "London"), + _tool_call_response("call_2", "Paris"), + _tool_call_response("call_3", "Tokyo"), + ] + + captured_inputs: list[list[Message]] = [] + original = chat_client_base._get_non_streaming_response # type: ignore[attr-defined] + + async def _capture( + *, + messages: list[Message], + options: dict[str, Any], + **kwargs: Any, + ) -> ChatResponse: + captured_inputs.append(list(messages)) + return await original(messages=messages, options=options, **kwargs) + + chat_client_base._get_non_streaming_response = _capture # type: ignore[attr-defined,method-assign] + + await chat_client_base.get_response( + [Message(role="user", contents=["What is the weather in London?"])], + options={"tools": [lookup_weather]}, # type: ignore[typeddict-unknown-key] + ) + + # The final model call should represent every compacted tool group with a summary. + # Two older tool groups get collapsed (London, Paris) while the last (Tokyo) is kept. + final_input = captured_inputs[-1] + summaries = [message for message in final_input if _is_tool_result_summary(message)] + summary_text = " ".join(message.text or "" for message in summaries) + + assert len(summaries) == 2, [message.text for message in final_input] + assert "London" in summary_text + assert "Paris" in summary_text + + +def _tool_call_update(call_id: str, location: str) -> list[ChatResponseUpdate]: + return [ + ChatResponseUpdate( + contents=[ + Content.from_function_call( + call_id=call_id, + name="lookup_weather", + arguments=f'{{"location": "{location}"}}', + ) + ], + role="assistant", + finish_reason="stop", + response_id=f"resp_{call_id}", + ) + ] + + +async def test_function_loop_persists_inserted_summaries_across_iterations_streaming( + chat_client_base: SupportsChatGetResponse, +) -> None: + # Streaming counterpart of the #4991 regression test: the summary persistence fix in + # ``_prepare_messages_for_model_call`` must cover the streaming tool loop too. + chat_client_base.function_invocation_configuration["enabled"] = True # type: ignore[attr-defined] + chat_client_base.function_invocation_configuration["max_iterations"] = 3 # type: ignore[attr-defined] + chat_client_base.compaction_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1) # type: ignore[attr-defined] + + @tool(name="lookup_weather", approval_mode="never_require") + def lookup_weather(location: str) -> str: + return f"Weather in {location}: sunny" + + chat_client_base.streaming_responses = [ # type: ignore[attr-defined] + _tool_call_update("call_1", "London"), + _tool_call_update("call_2", "Paris"), + _tool_call_update("call_3", "Tokyo"), + ] + + captured_inputs: list[list[Message]] = [] + original = chat_client_base._get_streaming_response # type: ignore[attr-defined] + + def _capture( + *, + messages: list[Message], + options: dict[str, Any], + **kwargs: Any, + ): + captured_inputs.append(list(messages)) + return original(messages=messages, options=options, **kwargs) + + chat_client_base._get_streaming_response = _capture # type: ignore[attr-defined,method-assign] + + stream = chat_client_base.get_response( + [Message(role="user", contents=["What is the weather in London?"])], + stream=True, + options={"tools": [lookup_weather]}, # type: ignore[typeddict-unknown-key] + ) + async for _ in stream: + pass + + final_input = captured_inputs[-1] + summaries = [message for message in final_input if _is_tool_result_summary(message)] + summary_text = " ".join(message.text or "" for message in summaries) + + assert len(summaries) == 2, [message.text for message in final_input] + assert "London" in summary_text + assert "Paris" in summary_text + + +async def test_function_loop_compaction_conversation_id_mode_does_not_resend_history( + chat_client_base: SupportsChatGetResponse, +) -> None: + # In conversation-id mode the server owns prior context, so the tool loop clears + # ``prepped_messages`` and only sends the latest message. Compaction must not fight that + # by re-inserting summaries or re-sending earlier turns. + chat_client_base.function_invocation_configuration["enabled"] = True # type: ignore[attr-defined] + chat_client_base.function_invocation_configuration["max_iterations"] = 3 # type: ignore[attr-defined] + chat_client_base.compaction_strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1) # type: ignore[attr-defined] + + @tool(name="lookup_weather", approval_mode="never_require") + def lookup_weather(location: str) -> str: + return f"Weather in {location}: sunny" + + def _conversation_tool_call(call_id: str, location: str) -> ChatResponse: + response = _tool_call_response(call_id, location) + response.conversation_id = "conv_1" + return response + + chat_client_base.run_responses = [ # type: ignore[attr-defined] + _conversation_tool_call("call_1", "London"), + _conversation_tool_call("call_2", "Paris"), + _conversation_tool_call("call_3", "Tokyo"), + ] + + captured_inputs: list[list[Message]] = [] + original = chat_client_base._get_non_streaming_response # type: ignore[attr-defined] + + async def _capture( + *, + messages: list[Message], + options: dict[str, Any], + **kwargs: Any, + ) -> ChatResponse: + captured_inputs.append(list(messages)) + return await original(messages=messages, options=options, **kwargs) + + chat_client_base._get_non_streaming_response = _capture # type: ignore[attr-defined,method-assign] + + await chat_client_base.get_response( + [Message(role="user", contents=["What is the weather in London?"])], + options={"tools": [lookup_weather]}, # type: ignore[typeddict-unknown-key] + ) + + # After the conversation id is established the loop only forwards the latest message, + # so subsequent model calls never receive the full history or summary messages. + for sent in captured_inputs[1:]: + assert len(sent) <= 1, [message.text for message in sent] + assert not any(_is_tool_result_summary(message) for message in sent) + + def test_base_client_as_agent_does_not_copy_client_compaction_defaults( chat_client_base: SupportsChatGetResponse, ) -> None: diff --git a/python/packages/core/tests/core/test_compaction.py b/python/packages/core/tests/core/test_compaction.py index 9e9cd0b466..99a90c6c0d 100644 --- a/python/packages/core/tests/core/test_compaction.py +++ b/python/packages/core/tests/core/test_compaction.py @@ -196,6 +196,64 @@ def test_append_compaction_message_annotates_new_message() -> None: assert isinstance(_group_id(messages[1]), str) +def test_incremental_annotation_assigns_unique_message_ids() -> None: + # Regression test for #5237: ``_ensure_message_ids`` assigned ``msg_{index}`` + # using the position within the slice handed to ``group_messages``. Successive + # incremental annotations restart the index at 0, so distinct messages collided + # on the same ``message_id``. + messages: list[Message] = [] + for turn in range(4): + messages.append(Message(role="user", contents=[f"user {turn}"])) + annotate_message_groups(messages) + messages.append(Message(role="assistant", contents=[f"assistant {turn}"])) + annotate_message_groups(messages) + + message_ids = [message.message_id for message in messages] + assert all(message_ids), "every message should receive an id" + assert len(set(message_ids)) == len(message_ids), f"duplicate message ids: {message_ids}" + + +def test_ensure_message_ids_avoids_existing_id_collisions() -> None: + # An auto-generated ``msg_{index}`` must not collide with an id already present + # on another message (user-supplied or assigned by an earlier annotation pass). + messages = [ + Message(role="user", contents=["zero"]), + Message(role="assistant", contents=["one"], message_id="msg_2"), + Message(role="user", contents=["two"]), + ] + annotate_message_groups(messages) + + message_ids = [message.message_id for message in messages] + assert message_ids[1] == "msg_2" + assert len(set(message_ids)) == len(message_ids), f"duplicate message ids: {message_ids}" + + +def test_incremental_annotation_avoids_prefix_id_collision() -> None: + # Regression for the PR review on #5237: when only a suffix is re-annotated, + # an auto-assigned ``msg_{index}`` in the suffix must not collide with a + # preexisting id carried by a message in the *preserved prefix* (a group + # before the one re-annotation pulls back to). Otherwise ``_group_id_for`` + # derives the same group id and merges groups across the boundary. + messages = [ + # Out-of-position, user-supplied id that matches the ``msg_{index}`` the + # suffix pass would assign to the appended message below. This message is + # two groups back, so it stays outside the re-annotated slice. + Message(role="user", contents=["zero"], message_id="msg_2"), + Message(role="user", contents=["one"]), + ] + annotate_message_groups(messages) + assert messages[0].message_id == "msg_2" + assert messages[1].message_id == "msg_1" + + messages.append(Message(role="user", contents=["two"])) + annotate_message_groups(messages, from_index=2) + + message_ids = [message.message_id for message in messages] + assert all(message_ids), "every message should receive an id" + assert len(set(message_ids)) == len(message_ids), f"duplicate message ids: {message_ids}" + assert messages[0].message_id == "msg_2" + + async def test_truncation_strategy_keeps_system_anchor() -> None: messages = [ Message(role="system", contents=["you are helpful"]), @@ -484,6 +542,44 @@ async def test_tool_result_compaction_collapses_old_groups_into_summary() -> Non assert any(m.role == "tool" for m in projected) +async def test_tool_result_compaction_is_idempotent_after_summary_insertion() -> None: + """Re-running compaction after a mid-list summary insertion must not duplicate it. + + Mirrors a subsequent tool-loop iteration (issue #4991): the inserted summary and the + excluded originals now persist on the same list, so a second annotate + compaction pass + over the same groups should be a no-op rather than collapsing the group again. + """ + messages = [ + Message(role="user", contents=["u"]), + _assistant_function_call("call-1"), + _tool_result("call-1", "r1"), + _assistant_function_call("call-2"), + _tool_result("call-2", "r2"), + Message(role="assistant", contents=["done"]), + ] + strategy = ToolResultCompactionStrategy(keep_last_tool_call_groups=1) + annotate_message_groups(messages) + assert await strategy(messages) is True + + summaries_after_first = [m for m in messages if (m.text or "").startswith("[Tool results:")] + assert len(summaries_after_first) == 1 + summary = summaries_after_first[0] + summary_group_ids = _group_unknown_value(summary, SUMMARY_OF_GROUP_IDS_KEY) + + # Second pass over the same (now partially compacted) list. + annotate_message_groups(messages) + changed = await strategy(messages) + + assert changed is False + summaries_after_second = [m for m in messages if (m.text or "").startswith("[Tool results:")] + assert len(summaries_after_second) == 1 + assert _group_unknown_value(summaries_after_second[0], SUMMARY_OF_GROUP_IDS_KEY) == summary_group_ids + + # The kept tool-call group stays atomic and included. + projected = included_messages(messages) + assert any(m.role == "tool" for m in projected) + + async def test_tool_result_compaction_zero_collapses_all() -> None: """With keep=0, all tool-call groups are collapsed into summaries.""" messages = [ diff --git a/python/samples/02-agents/compaction/README.md b/python/samples/02-agents/compaction/README.md index ed5c3dab12..42806bd6de 100644 --- a/python/samples/02-agents/compaction/README.md +++ b/python/samples/02-agents/compaction/README.md @@ -5,7 +5,8 @@ This folder demonstrates context compaction patterns introduced by ADR-0019. ## Files - `basics.py` — builds a local message list and applies each built-in strategy one at a time. -- `advanced.py` — composes multiple strategies with `TokenBudgetComposedStrategy`. +- `summarization.py` — runs `SummarizationStrategy` directly with a real summarizing chat client. +- `advanced.py` — composes multiple strategies with `TokenBudgetComposedStrategy`, including a real summarizer and tool-call groups. - `agent_client_overrides.py` — shows client defaults, agent-level overrides, and per-run compaction overrides. - `custom.py` — defines a custom strategy implementing the `CompactionStrategy` protocol. - `tiktoken_tokenizer.py` — shows a `TokenizerProtocol` implementation backed by `tiktoken`. @@ -15,7 +16,8 @@ Run samples with: ```bash uv run samples/02-agents/compaction/basics.py -uv run samples/02-agents/compaction/advanced.py +uv run samples/02-agents/compaction/summarization.py # requires OPENAI_API_KEY +uv run samples/02-agents/compaction/advanced.py # requires OPENAI_API_KEY uv run samples/02-agents/compaction/agent_client_overrides.py uv run samples/02-agents/compaction/custom.py uv run samples/02-agents/compaction/tiktoken_tokenizer.py diff --git a/python/samples/02-agents/compaction/advanced.py b/python/samples/02-agents/compaction/advanced.py index 12482f131a..4e14ceb5c2 100644 --- a/python/samples/02-agents/compaction/advanced.py +++ b/python/samples/02-agents/compaction/advanced.py @@ -1,11 +1,14 @@ # Copyright (c) Microsoft. All rights reserved. import asyncio -from typing import Any +from typing import Any, cast from agent_framework import ( + GROUP_ANNOTATION_KEY, + GROUP_TOKEN_COUNT_KEY, + SUMMARY_OF_MESSAGE_IDS_KEY, CharacterEstimatorTokenizer, - ChatResponse, + Content, Message, SelectiveToolCallCompactionStrategy, SlidingWindowStrategy, @@ -15,36 +18,48 @@ from agent_framework import ( apply_compaction, included_token_count, ) +from agent_framework.openai import OpenAIChatClient +from dotenv import load_dotenv -"""This sample demonstrates composed in-run compaction with a token budget. +load_dotenv() + +"""This sample demonstrates composed in-run compaction under a token budget. + +A long, tool-using conversation is compacted with a single +``TokenBudgetComposedStrategy`` that runs three strategies in order until the +included-token count fits the budget: + +1. ``SelectiveToolCallCompactionStrategy`` — drop older tool-call groups + (assistant ``function_call`` + ``tool`` result messages) that are expensive + and rarely needed verbatim once acted upon. +2. ``SummarizationStrategy`` — use a *real* chat client to summarize the oldest + remaining turns into a single linked summary message. +3. ``SlidingWindowStrategy`` — as a final guard, keep only the most recent + groups if the budget is still exceeded. Key components: -- TokenBudgetComposedStrategy -- Sequential strategy composition -- Summarization with a SupportsChatGetResponse-compatible summarizer client +- TokenBudgetComposedStrategy with ordered, escalating strategies +- A real OpenAIChatClient used as the summarizer (not a stub) +- Tool-call groups in the history so tool-call compaction is meaningful +- Token accounting before/after via a TokenizerProtocol + +Run with: + uv run samples/02-agents/compaction/advanced.py # requires OPENAI_API_KEY """ -class BudgetSummaryClient: - async def get_response( - self, - messages: list[Message], - *, - stream: bool = False, - options: dict[str, Any] | None = None, - **kwargs: Any, - ) -> ChatResponse: - summary_text = f"Budget summary generated from {len(messages)} prompt messages." - return ChatResponse(messages=[Message(role="assistant", contents=[summary_text])]) - - def _build_long_history() -> list[Message]: - history = [Message(role="system", contents=["You are a migration copilot."])] - for i in range(1, 8): + """Build a long, tool-using migration conversation to create token pressure.""" + history: list[Message] = [ + Message(role="system", contents=["You are a migration copilot that plans and executes database migrations."]), + ] + + # A few verbose planning turns to build up token pressure. + for i in range(1, 5): history.append( Message( role="user", - contents=[f"Iteration {i}: capture migration requirements and edge cases."], + contents=[f"Iteration {i}: capture migration requirements, constraints, and edge cases in detail."], ) ) history.append( @@ -52,17 +67,62 @@ def _build_long_history() -> list[Message]: role="assistant", contents=[ ( - f"Iteration {i}: detailed plan with dependencies, rollback guidance, and testing details. " - "This sentence is intentionally long to create token pressure." + f"Iteration {i}: produced a detailed plan covering dependencies, rollback guidance, data " + "backfill, and a full testing matrix. This response is intentionally verbose to add pressure." ) ], ) ) + + # A tool-call group: the assistant inspects the schema via a tool. + history.append( + Message( + role="assistant", + contents=[Content.from_function_call(call_id="call_1", name="inspect_schema", arguments='{"db":"legacy"}')], + ) + ) + history.append( + Message( + role="tool", + contents=[Content.from_function_result(call_id="call_1", result="tables: users, orders, invoices, events")], + ) + ) + history.append(Message(role="assistant", contents=["Schema inspection found four core tables to migrate."])) + + # The most recent turn — this should survive compaction verbatim. + history.append(Message(role="user", contents=["What is the safest order to migrate these tables?"])) + history.append( + Message( + role="assistant", + contents=["Migrate reference tables (users) first, then orders, then invoices, and events last."], + ) + ) return history +def _annotation(message: Message) -> dict[str, Any] | None: + annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY) + return cast("dict[str, Any]", annotation) if isinstance(annotation, dict) else None + + +def _token_count(message: Message) -> int | None: + annotation = _annotation(message) + return annotation.get(GROUP_TOKEN_COUNT_KEY) if annotation else None + + +def _relation(message: Message) -> str: + """Describe how a projected message relates to the original messages.""" + annotation = _annotation(message) + if annotation is None: + return "" + summarizes = annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY) + if summarizes: + return f" <- summary of {summarizes}" + return "" + + async def main() -> None: - # 1. Build synthetic history representing long-running in-run growth. + # 1. Build synthetic history representing long-running, tool-using growth. messages = _build_long_history() # 2. Configure tokenizer and measure token count before compaction. @@ -70,22 +130,35 @@ async def main() -> None: annotate_message_groups(messages, tokenizer=tokenizer) budget_before = included_token_count(messages) - # 3. Configure composed strategy stack. + print("Before compaction message set:") + for msg in messages: + text_preview = msg.text[:80] if msg.text else "" + print(f"- [{msg.role}] {text_preview} ({msg.message_id}, {_token_count(msg)} tokens)") + print() + + # 3. Create a real summarizer client. SummarizationStrategy only requires a + # SupportsChatGetResponse-compatible client. + summarizer = OpenAIChatClient(model="gpt-4o-mini") + + # 4. Configure the composed strategy stack. Strategies run in order and the + # composed strategy stops as soon as the included-token budget is met. + # The budget is set high enough that the generated summary fits within it: + # a tighter budget would trip the composed fallback, which excludes the + # oldest group first (the summary) once the included set exceeds the + # budget. SlidingWindowStrategy remains as a recency safety net for longer + # histories; for this sample summarization alone reaches budget, so the + # window does not need to fire. composed = TokenBudgetComposedStrategy( - token_budget=200, + token_budget=400, tokenizer=tokenizer, strategies=[ SelectiveToolCallCompactionStrategy(keep_last_tool_call_groups=0), - SummarizationStrategy( - client=BudgetSummaryClient(), - target_count=3, - threshold=3, - ), + SummarizationStrategy(client=summarizer, target_count=3, threshold=2), SlidingWindowStrategy(keep_last_groups=4), ], ) - # 4. Apply compaction and inspect the budget result. + # 5. Apply compaction and inspect the budget result. projected = await apply_compaction(messages, strategy=composed, tokenizer=tokenizer) budget_after = included_token_count(messages) @@ -95,23 +168,44 @@ async def main() -> None: print("Projected roles:", [m.role for m in projected]) print("Projected messages with token counts:") for msg in projected: - group = msg.additional_properties.get("_group") - token_count = group.get("token_count") if isinstance(group, dict) else None text_preview = msg.text[:80] if msg.text else "" - print(f"- [{msg.role}] {text_preview} ({token_count} tokens)") + print(f"- [{msg.role}] {text_preview} ({msg.message_id}, {_token_count(msg)} tokens){_relation(msg)}") + + # 6. Surface the model-generated summary, if summarization fired. + for msg in messages: + annotation = _annotation(msg) + if annotation and annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY): + print("\nGenerated summary:") + print(f" {msg.text}") + print(f" summarizes: {annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY)}") if __name__ == "__main__": asyncio.run(main()) """ -Sample output: -Projected messages after compaction: 3 -Included token count before compaction: 793 -Included token count after compaction: 144 -Projected roles: ['system', 'user', 'assistant'] +Sample output (summary text and token counts vary because the summary is generated by the model): + +Before compaction message set: +- [system] You are a migration copilot that plans and executes database migrations. (msg_0, 46 tokens) +- [user] Iteration 1: capture migration requirements, constraints, and edge cases in deta (msg_1, 48 tokens) +- [assistant] Iteration 1: produced a detailed plan covering dependencies, rollback guidance, (msg_2, 73 tokens) +... +- [user] What is the safest order to migrate these tables? (msg_12, 40 tokens) +- [assistant] Migrate reference tables (users) first, then orders, then invoices, and events l (msg_13, 50 tokens) + +Projected messages after compaction: 5 +Included token count before compaction: 757 +Included token count after compaction: 274 +Projected roles: ['system', 'assistant', 'assistant', 'user', 'assistant'] Projected messages with token counts: -- [system] You are a migration copilot. (35 tokens) -- [user] Iteration 7: capture migration requirements and edge cases. (43 tokens) -- [assistant] Iteration 7: detailed plan with dependencies, rollback guidance, and testing det (66 tokens) +- [system] You are a migration copilot that plans and executes database migrations. (msg_0, 46 tokens) +- [assistant] Across four planning turns the user and assistant... (summary_14, 96 tokens) <- summary of [msg_1..8] +- [assistant] Schema inspection found four core tables to migrate. (msg_11, 42 tokens) +- [user] What is the safest order to migrate these tables? (msg_12, 40 tokens) +- [assistant] Migrate reference tables (users) first, then orders, then invoices, and events l (msg_13, 50 tokens) + +Generated summary: + Across four planning turns the user and assistant defined the migration requirements... + summarizes: ['msg_1', 'msg_2', 'msg_3', 'msg_4', 'msg_5', 'msg_6', 'msg_7', 'msg_8'] """ diff --git a/python/samples/02-agents/compaction/summarization.py b/python/samples/02-agents/compaction/summarization.py new file mode 100644 index 0000000000..b05d2374e1 --- /dev/null +++ b/python/samples/02-agents/compaction/summarization.py @@ -0,0 +1,159 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +from typing import Any, cast + +from agent_framework import ( + GROUP_ANNOTATION_KEY, + SUMMARIZED_BY_SUMMARY_ID_KEY, + SUMMARY_OF_MESSAGE_IDS_KEY, + Message, + SummarizationStrategy, + apply_compaction, +) +from agent_framework.openai import OpenAIChatClient +from dotenv import load_dotenv + +load_dotenv() + +"""This sample demonstrates the SummarizationStrategy directly. + +Unlike SlidingWindow/Truncation strategies that simply drop older groups, +``SummarizationStrategy`` calls a real chat client to *summarize* the oldest +message groups, replaces them with a single linked summary message, and keeps +the most recent turns verbatim. This preserves long-range context (decisions, +goals, unresolved items) while bounding the prompt size. + +Key components: +- SummarizationStrategy with a real OpenAIChatClient summarizer +- ``apply_compaction`` to run the strategy over a message list +- Bidirectional summary trace metadata (summary -> originals, original -> summary) + +Run with: + uv run samples/02-agents/compaction/summarization.py # requires OPENAI_API_KEY +""" + + +def _annotation(message: Message) -> dict[str, Any] | None: + annotation = message.additional_properties.get(GROUP_ANNOTATION_KEY) + return cast("dict[str, Any]", annotation) if isinstance(annotation, dict) else None + + +def _build_history() -> list[Message]: + """Build a multi-turn conversation long enough to trigger summarization.""" + return [ + Message(role="system", contents=["You are a project planning assistant."]), + Message(role="user", contents=["We are migrating a monolith to microservices. Where do we start?"]), + Message( + role="assistant", + contents=["Start by mapping bounded contexts and identifying the highest-churn modules to extract first."], + ), + Message(role="user", contents=["The billing module changes most often. What are the risks of extracting it?"]), + Message( + role="assistant", + contents=["Main risks: distributed transactions, invoices-table ownership, and latency on hot paths."], + ), + Message(role="user", contents=["How should we handle the shared invoices table?"]), + Message( + role="assistant", + contents=["Use the strangler-fig pattern: dual-write during transition, then make billing the owner."], + ), + Message(role="user", contents=["What is the most recent decision we made?"]), + Message(role="assistant", contents=["We decided to extract billing first using the strangler-fig pattern."]), + ] + + +def _print_messages(label: str, messages: list[Message]) -> None: + print(f"\n--- {label} ---") + print(f"Message count: {len(messages)}") + for index, message in enumerate(messages, start=1): + text = message.text or ", ".join(content.type for content in message.contents) + print(f"{index:02d}. [{message.role}] {text[:90]}") + + +async def main() -> None: + # 1. Create a real summarizing client. SummarizationStrategy only requires a + # SupportsChatGetResponse-compatible client, so any chat client works. + summarizer = OpenAIChatClient(model="gpt-4o-mini") + + # 2. Build a conversation and show it before compaction. + messages = _build_history() + _print_messages("Before compaction", messages) + + # 3. Configure the strategy. It triggers once the included non-system message + # count exceeds ``target_count + threshold`` (here 4 + 2 = 6), summarizing + # the oldest groups down toward ``target_count`` while keeping recent turns. + strategy = SummarizationStrategy( + client=summarizer, + target_count=4, + threshold=2, + ) + + # 4. Apply the strategy. The oldest groups are summarized into a single + # assistant message; the projected list is what the model would receive. + projected = await apply_compaction(messages, strategy=strategy) + _print_messages("After compaction (SummarizationStrategy)", projected) + + # 5. Inspect the generated summary and its bidirectional trace metadata. + print("\n--- Summary trace ---") + for message in messages: + annotation = _annotation(message) + if annotation is None: + continue + summarizes = annotation.get(SUMMARY_OF_MESSAGE_IDS_KEY) + if summarizes: + print(f"Generated summary ({message.message_id}):") + print(f" {message.text}") + print(f" summarizes original ids: {summarizes}") + summarized_by: dict[str | None, Any] = {} + for message in messages: + annotation = _annotation(message) + if annotation is None: + continue + summary_id = annotation.get(SUMMARIZED_BY_SUMMARY_ID_KEY) + if summary_id: + summarized_by[message.message_id] = summary_id + if summarized_by: + print("Originals replaced by the summary:") + for original_id, summary_id in summarized_by.items(): + print(f" {original_id} -> {summary_id}") + + +if __name__ == "__main__": + asyncio.run(main()) + +""" +Sample output (summary text varies because it is generated by the model): + +--- Before compaction --- +Message count: 9 +01. [system] You are a project planning assistant. +02. [user] We are migrating a monolith to microservices. Where do we start? +03. [assistant] Start by mapping bounded contexts and identifying the highest-churn modules to ex +04. [user] The billing module changes most often. What are the risks of extracting it? +05. [assistant] Main risks: distributed transactions, data ownership of the invoices table, and lat +06. [user] How should we handle the shared invoices table? +07. [assistant] Use the strangler-fig pattern: dual-write during transition, then make billing the +08. [user] What is the most recent decision we made? +09. [assistant] We decided to extract billing first using the strangler-fig pattern. + +--- After compaction (SummarizationStrategy) --- +Message count: 6 +01. [system] You are a project planning assistant. +02. [assistant] The user is migrating a monolith to microservices and decided to extract the billin +03. [user] How should we handle the shared invoices table? +04. [assistant] Use the strangler-fig pattern: dual-write during transition, then make billing the +05. [user] What is the most recent decision we made? +06. [assistant] We decided to extract billing first using the strangler-fig pattern. + +--- Summary trace --- +Generated summary (summary_9): + The user is migrating a monolith to microservices and decided to extract the billing module first... + summarizes original ids: ['msg_1', 'msg_2', 'msg_3', 'msg_4', 'msg_5'] +Originals replaced by the summary: + msg_1 -> summary_9 + msg_2 -> summary_9 + msg_3 -> summary_9 + msg_4 -> summary_9 + msg_5 -> summary_9 +"""