Python: Add a HarnessAgent with available features and sample (#6041)

* Add a HarnessAgent with available features and sample

* Fix formatting

* Address PR comments and fix mypy error

* Add web search support to HarnessAgent

* Fix build warning

* Apply suggestions from code review

Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>

* Address PR comments

* Address PR comments

* Address further PR comments.

* Fix markdown broken link

---------

Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
This commit is contained in:
westey
2026-05-27 14:54:00 +01:00
committed by GitHub
Unverified
parent d5c07f2623
commit ef86fb51d5
11 changed files with 1262 additions and 5 deletions
+6 -2
View File
@@ -1,15 +1,19 @@
{
"name": "Python 3",
"image": "mcr.microsoft.com/devcontainers/python:3.13-bullseye",
"image": "mcr.microsoft.com/devcontainers/python:3.14-bookworm",
"features": {
"ghcr.io/va-h/devcontainers-features/uv:1": {},
"ghcr.io/devcontainers/features/azure-cli:1.2.8": {}
"ghcr.io/devcontainers/features/docker-in-docker:3": {},
"ghcr.io/devcontainers/features/azure-cli:1.2.9": {},
"ghcr.io/devcontainers/features/copilot-cli:1": {}
},
"postCreateCommand": "bash ./devsetup.sh",
"workspaceFolder": "/workspaces/agent-framework/python/",
"customizations": {
"vscode": {
"extensions": [
"GitHub.copilot",
"GitHub.vscode-github-actions",
"ms-python.python",
"ms-windows-ai-studio.windows-ai-studio",
"littlefoxteam.vscode-python-test-adapter"
@@ -7,7 +7,7 @@
## v1.0.0-preview.260219.1
- [BREAKING] Changed ChatHistory and AIContext Providers to have pipeline semantics ([#3806](https://github.com/microsoft/agent-framework/pull/3806))
- Marked all `RunAsync<T>` overloads as `new`, added missing ones, and added support for primitives and arrays ([#3803](https://github.com/microsoft/agent-framework/pull/3803))
- Marked all `RunAsync<T>` overloads as `new`, added missing ones, and added support for primitives and arrays #3803
- Improve session cast error message quality and consistency ([#3973](https://github.com/microsoft/agent-framework/pull/3973))
## v1.0.0-preview.260212.1
@@ -45,6 +45,7 @@ from ._compaction import (
CharacterEstimatorTokenizer,
CompactionProvider,
CompactionStrategy,
ContextWindowCompactionStrategy,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
SummarizationStrategy,
@@ -79,6 +80,10 @@ from ._evaluation import (
tool_calls_present,
)
from ._feature_stage import ExperimentalFeature, ReleaseCandidateFeature
from ._harness._agent import (
DEFAULT_HARNESS_INSTRUCTIONS,
create_harness_agent,
)
from ._harness._background_agents import (
DEFAULT_BACKGROUND_AGENTS_SOURCE_ID,
BackgroundAgentsProvider,
@@ -304,6 +309,7 @@ __all__ = [
"APP_INFO",
"COMPACTION_STATE_KEY",
"DEFAULT_BACKGROUND_AGENTS_SOURCE_ID",
"DEFAULT_HARNESS_INSTRUCTIONS",
"DEFAULT_MAX_ITERATIONS",
"DEFAULT_MEMORY_SOURCE_ID",
"DEFAULT_MODE_SOURCE_ID",
@@ -362,6 +368,7 @@ __all__ = [
"CompactionStrategy",
"Content",
"ContextProvider",
"ContextWindowCompactionStrategy",
"ContinuationToken",
"ConversationSplit",
"ConversationSplitter",
@@ -509,6 +516,7 @@ __all__ = [
"apply_compaction",
"chat_middleware",
"create_edge_runner",
"create_harness_agent",
"detect_media_type_from_base64",
"evaluate_agent",
"evaluate_workflow",
@@ -1277,6 +1277,121 @@ class CompactionProvider(ContextProvider):
# whether excluded messages are loaded on the next turn.
class ContextWindowCompactionStrategy:
"""Token-budget compaction derived from a model's context window size.
Computes an input budget from the model's context window and output token
limits, then applies a two-phase compaction pipeline:
1. **Tool result eviction** — collapses older tool-call groups into summaries
when included tokens exceed ``tool_eviction_threshold`` of the input budget.
2. **Truncation** — removes oldest non-system groups when included tokens
exceed ``truncation_threshold`` of the input budget.
The class uses two independent :class:`TokenBudgetComposedStrategy`
instances — one per phase — so each fires only when its own threshold
is exceeded.
Examples:
.. code-block:: python
from agent_framework import ContextWindowCompactionStrategy, CompactionProvider
strategy = ContextWindowCompactionStrategy(
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
provider = CompactionProvider(before_strategy=strategy)
"""
DEFAULT_TOOL_EVICTION_THRESHOLD: float = 0.5
"""Default fraction of input budget at which tool result eviction triggers."""
DEFAULT_TRUNCATION_THRESHOLD: float = 0.8
"""Default fraction of input budget at which truncation triggers."""
def __init__(
self,
*,
max_context_window_tokens: int,
max_output_tokens: int,
tokenizer: TokenizerProtocol | None = None,
tool_eviction_threshold: float = DEFAULT_TOOL_EVICTION_THRESHOLD,
truncation_threshold: float = DEFAULT_TRUNCATION_THRESHOLD,
keep_last_tool_call_groups: int = 4,
) -> None:
"""Create a context-window compaction strategy.
Keyword Args:
max_context_window_tokens: The model's maximum context window size
in tokens (e.g. 128,000).
max_output_tokens: The model's maximum output tokens per response
(e.g. 16,384).
tokenizer: Token counter for measuring message sizes. Defaults to
:class:`CharacterEstimatorTokenizer` (4 chars/token heuristic).
tool_eviction_threshold: Fraction of input budget (0.0, 1.0] at
which tool result eviction triggers. Defaults to 0.5.
truncation_threshold: Fraction of input budget (0.0, 1.0] at which
truncation triggers. Must be ≥ ``tool_eviction_threshold``.
Defaults to 0.8.
keep_last_tool_call_groups: Number of most recent tool-call groups
to retain verbatim during tool eviction. Older groups are
collapsed into summaries. Defaults to 4.
Raises:
ValueError: If thresholds are out of range or inconsistent.
"""
if max_context_window_tokens <= 0:
raise ValueError("max_context_window_tokens must be positive.")
if max_output_tokens < 0 or max_output_tokens >= max_context_window_tokens:
raise ValueError("max_output_tokens must be >= 0 and < max_context_window_tokens.")
if not (0.0 < tool_eviction_threshold <= 1.0):
raise ValueError("tool_eviction_threshold must be in (0.0, 1.0].")
if not (0.0 < truncation_threshold <= 1.0):
raise ValueError("truncation_threshold must be in (0.0, 1.0].")
if truncation_threshold < tool_eviction_threshold:
raise ValueError("truncation_threshold must be >= tool_eviction_threshold.")
resolved_tokenizer = tokenizer or CharacterEstimatorTokenizer()
input_budget = max_context_window_tokens - max_output_tokens
tool_eviction_tokens = int(input_budget * tool_eviction_threshold)
truncation_tokens = int(input_budget * truncation_threshold)
self.max_context_window_tokens = max_context_window_tokens
self.max_output_tokens = max_output_tokens
self.input_budget_tokens = input_budget
self.tool_eviction_threshold = tool_eviction_threshold
self.truncation_threshold = truncation_threshold
self._tool_eviction = TokenBudgetComposedStrategy(
token_budget=tool_eviction_tokens,
tokenizer=resolved_tokenizer,
strategies=[
ToolResultCompactionStrategy(keep_last_tool_call_groups=keep_last_tool_call_groups),
],
)
self._truncation = TokenBudgetComposedStrategy(
token_budget=truncation_tokens,
tokenizer=resolved_tokenizer,
strategies=[
TruncationStrategy(
max_n=truncation_tokens,
compact_to=tool_eviction_tokens,
tokenizer=resolved_tokenizer,
),
],
)
async def __call__(self, messages: list[Message]) -> bool:
"""Apply the two-phase compaction pipeline.
Returns:
True if compaction changed message inclusion; otherwise False.
"""
changed = await self._tool_eviction(messages)
return (await self._truncation(messages)) or changed
__all__ = [
"COMPACTION_STATE_KEY",
"EXCLUDED_KEY",
@@ -1293,6 +1408,7 @@ __all__ = [
"CharacterEstimatorTokenizer",
"CompactionProvider",
"CompactionStrategy",
"ContextWindowCompactionStrategy",
"GroupKind",
"SelectiveToolCallCompactionStrategy",
"SlidingWindowStrategy",
@@ -0,0 +1,349 @@
# Copyright (c) Microsoft. All rights reserved.
"""Harness agent factory: a pre-configured bundled agent with batteries included.
This module provides :func:`create_harness_agent`, a factory function that assembles
the full agent pipeline from a chat client, wiring up function invocation,
per-service-call history persistence, compaction, and a rich set of default
context providers (todo, mode, memory, skills).
"""
from __future__ import annotations
import logging
from collections.abc import Callable, Sequence
from typing import TYPE_CHECKING, Any
from .._agents import Agent
from .._clients import SupportsWebSearchTool
from .._compaction import CompactionProvider, ContextWindowCompactionStrategy, ToolResultCompactionStrategy
from .._feature_stage import ExperimentalFeature, experimental
from .._sessions import ContextProvider, HistoryProvider, InMemoryHistoryProvider
from .._skills import SkillsProvider
from ._memory import MemoryContextProvider, MemoryStore
from ._mode import AgentModeProvider
from ._todo import TodoProvider
if TYPE_CHECKING:
from collections.abc import Mapping
from .._clients import SupportsChatGetResponse
from .._compaction import CompactionStrategy, TokenizerProtocol
from .._middleware import MiddlewareTypes
from .._tools import ToolTypes
logger = logging.getLogger(__name__)
DEFAULT_HARNESS_INSTRUCTIONS = """\
You are a helpful AI assistant that uses tools to complete tasks.
## General guidelines
- Think through the task before acting. Break complex work into clear steps.
- Use the tools available to you to gather information, perform actions, and verify results.
- Explain your reasoning and thought process as you work through tasks.
- Explain what you learned and what you are going to do next between tool calls, \
so the user can follow along with your thought process.
- Avoid making more than 4 tool calls in a row without explaining what you are doing.
- If a tool call fails or returns unexpected results, adapt your approach rather than \
repeating the same call.
- When you have completed the task, present a clear and concise summary of what you did \
and what you found.
"""
def _assemble_instructions(
harness_instructions: str | None,
agent_instructions: str | None,
) -> str | None:
"""Assemble final instructions from harness + agent instructions."""
harness = harness_instructions if harness_instructions is not None else DEFAULT_HARNESS_INSTRUCTIONS
return f"{harness}\n\n{agent_instructions or ''}".strip() or None
def _assemble_compaction_provider(
*,
disable_compaction: bool,
max_context_window_tokens: int,
max_output_tokens: int,
history_source_id: str,
before_compaction_strategy: CompactionStrategy | None,
after_compaction_strategy: CompactionStrategy | None,
tokenizer: TokenizerProtocol | None,
) -> CompactionProvider | None:
"""Build the compaction provider from parameters or defaults."""
if disable_compaction:
return None
before_strategy = before_compaction_strategy or ContextWindowCompactionStrategy(
max_context_window_tokens=max_context_window_tokens,
max_output_tokens=max_output_tokens,
tokenizer=tokenizer,
)
after_strategy = after_compaction_strategy or ToolResultCompactionStrategy(keep_last_tool_call_groups=2)
return CompactionProvider(
before_strategy=before_strategy,
after_strategy=after_strategy,
tokenizer=tokenizer,
history_source_id=history_source_id,
)
def _assemble_context_providers(
*,
history_provider: HistoryProvider,
compaction_provider: CompactionProvider | None,
disable_todo: bool,
todo_provider: TodoProvider | None,
disable_mode: bool,
mode_provider: AgentModeProvider | None,
disable_memory: bool,
memory_store: MemoryStore | None,
skills_provider: SkillsProvider | None,
skills_paths: Sequence[str] | None,
extra_context_providers: Sequence[ContextProvider] | None,
) -> list[ContextProvider]:
"""Assemble the ordered list of context providers."""
providers: list[ContextProvider] = []
# History first so other providers can access loaded messages.
providers.append(history_provider)
# Compaction runs after history loads messages.
if compaction_provider is not None:
providers.append(compaction_provider)
if not disable_todo:
providers.append(todo_provider or TodoProvider())
if not disable_mode:
providers.append(mode_provider or AgentModeProvider())
if not disable_memory and memory_store is not None:
providers.append(MemoryContextProvider(store=memory_store))
# Skills are opt-in: only added when skills_provider or skills_paths is provided.
if skills_provider:
providers.append(skills_provider)
if skills_paths:
providers.append(SkillsProvider.from_paths(*skills_paths))
# Append any user-supplied additional providers.
if extra_context_providers:
providers.extend(extra_context_providers)
return providers
HARNESS_AGENT_PROVIDER_NAME = "microsoft.agent_framework.harness"
@experimental(feature_id=ExperimentalFeature.HARNESS)
def create_harness_agent(
client: SupportsChatGetResponse[Any],
*,
id: str | None = None,
name: str | None = None,
description: str | None = None,
harness_instructions: str | None = None,
agent_instructions: str | None = None,
tools: ToolTypes | Callable[..., Any] | Sequence[ToolTypes | Callable[..., Any]] | None = None,
max_context_window_tokens: int,
max_output_tokens: int,
history_provider: HistoryProvider | None = None,
disable_compaction: bool = False,
before_compaction_strategy: CompactionStrategy | None = None,
after_compaction_strategy: CompactionStrategy | None = None,
tokenizer: TokenizerProtocol | None = None,
disable_todo: bool = False,
todo_provider: TodoProvider | None = None,
disable_mode: bool = False,
mode_provider: AgentModeProvider | None = None,
disable_memory: bool = False,
memory_store: MemoryStore | None = None,
skills_provider: SkillsProvider | None = None,
skills_paths: Sequence[str] | None = None,
disable_web_search: bool = False,
otel_provider_name: str | None = None,
context_providers: Sequence[ContextProvider] | None = None,
middleware: Sequence[MiddlewareTypes] | None = None,
default_options: Mapping[str, Any] | None = None,
) -> Agent[Any]:
"""Create a pre-configured agent with batteries included.
Assembles an :class:`~agent_framework.Agent` from a chat client, automatically wiring:
- **Function invocation** — automatic tool calling loop
- **Per-service-call history persistence** — persists history after every model call
- **Compaction** — context-window compaction before/after each run
- **TodoProvider** — todo list management
- **AgentModeProvider** — plan/execute mode tracking
- **MemoryContextProvider** — file-based durable memory (when ``memory_store`` provided)
- **SkillsProvider** — skill discovery and progressive loading
- **OpenTelemetry** — observability via ``AgentTelemetryLayer``
Each feature can be disabled or customized via keyword arguments.
Examples:
Basic usage:
.. code-block:: python
from agent_framework import create_harness_agent
from agent_framework.openai import OpenAIChatClient
agent = create_harness_agent(
OpenAIChatClient(model="gpt-4o"),
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
session = agent.create_session()
response = await agent.run("Plan a weekend trip to Seattle", session=session)
With customization:
.. code-block:: python
agent = create_harness_agent(
client=client,
max_context_window_tokens=200_000,
max_output_tokens=32_000,
name="research-agent",
agent_instructions="Focus on academic sources.",
disable_todo=True,
skills_paths=["./skills", "./custom-skills"],
)
Args:
client: The chat client providing access to the underlying AI model.
Keyword Args:
id: Optional agent ID (auto-generated UUID if omitted).
name: Optional agent name.
description: Optional agent description.
harness_instructions: Override the default harness-level system instructions that
govern agent behavior (how to use tools, report progress, structure responses).
These provide general "operating guidelines" independent of any specific task.
When None, ``DEFAULT_HARNESS_INSTRUCTIONS`` is used. Set to empty string ``""``
to omit harness instructions entirely.
agent_instructions: Domain or task-specific instructions appended after harness
instructions. Use this for the agent's purpose, persona, or specialization
(e.g., "You are a research assistant focused on academic sources.").
tools: Additional tools to include in the agent's toolset.
max_context_window_tokens: Maximum tokens the model's context window supports.
max_output_tokens: Maximum output tokens per response.
history_provider: Custom history provider. When None, an InMemoryHistoryProvider is used.
disable_compaction: When True, skip compaction provider setup.
before_compaction_strategy: Custom before-run compaction strategy.
Defaults to ContextWindowCompactionStrategy (token-budget aware).
after_compaction_strategy: Custom after-run compaction strategy.
Defaults to ToolResultCompactionStrategy.
tokenizer: Custom tokenizer for compaction strategies.
disable_todo: When True, skip the TodoProvider.
todo_provider: Custom TodoProvider instance. Ignored when disable_todo is True.
disable_mode: When True, skip the AgentModeProvider.
mode_provider: Custom AgentModeProvider instance. Ignored when disable_mode is True.
disable_memory: When True, skip the MemoryContextProvider.
memory_store: Memory store instance. When provided (and disable_memory is False),
a MemoryContextProvider is added.
skills_provider: Custom SkillsProvider instance for code-defined skills.
Can be combined with ``skills_paths`` to aggregate file and code-based skills.
skills_paths: Paths for file-based skill discovery (looks for SKILL.md files).
Can be combined with ``skills_provider``. When neither ``skills_provider``
nor ``skills_paths`` is provided, no SkillsProvider is added.
disable_web_search: When True, skip automatic web search tool inclusion.
When False (default), the web search tool is automatically added if the
client implements SupportsWebSearchTool. A warning is logged if the client
does not support web search.
otel_provider_name: Custom OpenTelemetry provider/source name for telemetry.
context_providers: Additional context providers to include after the built-in ones.
middleware: Additional middleware to include.
default_options: Provider-specific chat options (temperature, max_tokens, etc.).
Returns:
A fully configured :class:`~agent_framework.Agent` instance.
Raises:
ValueError: If max_context_window_tokens <= 0 or max_output_tokens < 0
or max_output_tokens >= max_context_window_tokens.
"""
if max_context_window_tokens <= 0:
raise ValueError("max_context_window_tokens must be positive.")
if max_output_tokens < 0:
raise ValueError("max_output_tokens must be non-negative.")
if max_output_tokens >= max_context_window_tokens:
raise ValueError("max_output_tokens must be less than max_context_window_tokens.")
# Build history provider.
resolved_history = history_provider or InMemoryHistoryProvider()
# Build compaction provider.
compaction_provider = _assemble_compaction_provider(
disable_compaction=disable_compaction,
max_context_window_tokens=max_context_window_tokens,
max_output_tokens=max_output_tokens,
history_source_id=resolved_history.source_id,
before_compaction_strategy=before_compaction_strategy,
after_compaction_strategy=after_compaction_strategy,
tokenizer=tokenizer,
)
# Build context providers.
assembled_providers = _assemble_context_providers(
history_provider=resolved_history,
compaction_provider=compaction_provider,
disable_todo=disable_todo,
todo_provider=todo_provider,
disable_mode=disable_mode,
mode_provider=mode_provider,
disable_memory=disable_memory,
memory_store=memory_store,
skills_provider=skills_provider,
skills_paths=skills_paths,
extra_context_providers=context_providers,
)
# Build instructions.
instructions = _assemble_instructions(harness_instructions, agent_instructions)
# Assemble tools, auto-adding web search if supported.
assembled_tools: list[ToolTypes | Callable[..., Any]] = []
if not disable_web_search:
if isinstance(client, SupportsWebSearchTool):
assembled_tools.append(client.get_web_search_tool())
else:
logger.warning(
"Web search tool not available: client %r does not implement SupportsWebSearchTool. "
"Set disable_web_search=True to suppress this warning.",
type(client).__name__,
)
if tools is not None:
if isinstance(tools, Sequence):
assembled_tools.extend(tools) # pyright: ignore[reportUnknownArgumentType]
else:
assembled_tools.append(tools)
final_tools: list[ToolTypes | Callable[..., Any]] | None = assembled_tools or None
# Build default options dict.
default_opts: dict[str, Any] = dict(default_options) if default_options else {}
default_opts.setdefault("max_tokens", max_output_tokens)
agent = Agent(
client,
instructions,
id=id,
name=name,
description=description,
tools=final_tools,
default_options=default_opts, # type: ignore[arg-type]
context_providers=assembled_providers,
middleware=list(middleware) if middleware else None,
require_per_service_call_history_persistence=True,
)
# Set the telemetry provider name after construction.
agent.otel_provider_name = otel_provider_name or HARNESS_AGENT_PROVIDER_NAME
return agent
@@ -19,6 +19,7 @@ from agent_framework import (
ChatResponse,
CompactionProvider,
Content,
ContextWindowCompactionStrategy,
Message,
SelectiveToolCallCompactionStrategy,
SlidingWindowStrategy,
@@ -952,3 +953,159 @@ async def test_in_memory_history_provider_default_loads_all() -> None:
loaded = await provider.get_messages(session_id="test", state=state)
assert len(loaded) == 3
# --- ContextWindowCompactionStrategy tests ---
async def test_context_window_strategy_noop_under_threshold() -> None:
"""No compaction when total tokens are below 50% of input budget."""
# input_budget = 1000 - 200 = 800; tool eviction threshold = 50% = 400 tokens
# CharacterEstimatorTokenizer: 4 chars/token
# Each short message ~4-5 tokens, total well under 400
messages = [
Message(role="system", contents=["sys"]),
Message(role="user", contents=["hello"]),
Message(role="assistant", contents=["hi"]),
]
strategy = ContextWindowCompactionStrategy(
max_context_window_tokens=1000,
max_output_tokens=200,
)
changed = await strategy(messages)
assert changed is False
assert len(included_messages(messages)) == 3
async def test_context_window_strategy_tool_eviction_triggers_at_threshold() -> None:
"""Tool eviction fires when tokens exceed 50% but truncation does not."""
# input_budget = 20000 - 200 = 19800
# tool eviction at 50% = 9900 tokens; truncation at 80% = 15840 tokens
# CharacterEstimatorTokenizer: 4 chars/token
# Each tool result: "x" * 8000 = 8000 chars = 2000 tokens
# 5 groups * ~2000 = ~10000+ tokens (exceeds 9900, under 15840)
# Tool eviction collapses older groups; truncation threshold not reached.
messages = [
Message(role="system", contents=["system prompt"]),
Message(role="user", contents=["u1"]),
_assistant_function_call("c1"),
_tool_result("c1", "x" * 8000),
Message(role="user", contents=["u2"]),
_assistant_function_call("c2"),
_tool_result("c2", "x" * 8000),
Message(role="user", contents=["u3"]),
_assistant_function_call("c3"),
_tool_result("c3", "x" * 8000),
Message(role="user", contents=["u4"]),
_assistant_function_call("c4"),
_tool_result("c4", "x" * 8000),
Message(role="user", contents=["u5"]),
_assistant_function_call("c5"),
_tool_result("c5", "x" * 8000),
]
strategy = ContextWindowCompactionStrategy(
max_context_window_tokens=20000,
max_output_tokens=200,
keep_last_tool_call_groups=2,
)
changed = await strategy(messages)
assert changed is True
projected = included_messages(messages)
# Verify that tool results were compacted (summary messages present).
summary_msgs = [m for m in projected if m.text and "[Tool results:" in m.text]
assert len(summary_msgs) > 0
# Verify that the truncation phase did NOT fire — no messages excluded with "truncation" reason.
from agent_framework._compaction import EXCLUDE_REASON_KEY
truncation_excluded = [m for m in messages if m.additional_properties.get(EXCLUDE_REASON_KEY) == "truncation"]
assert len(truncation_excluded) == 0
async def test_context_window_strategy_truncation_triggers_above_80_pct() -> None:
"""Truncation fires when tokens exceed 80% of input budget."""
# input_budget = 1000 - 100 = 900
# tool eviction at 50% = 450 tokens; truncation at 80% = 720 tokens
# We'll create messages with no tool calls (so tool eviction does nothing)
# but exceeding 720 tokens total (>2880 chars)
messages = [
Message(role="system", contents=["sys"]),
Message(role="user", contents=["u1 " * 400]), # ~1200 chars = 300 tokens
Message(role="assistant", contents=["a1 " * 400]), # ~1200 chars = 300 tokens
Message(role="user", contents=["u2 " * 400]), # ~1200 chars = 300 tokens
Message(role="assistant", contents=["a2 " * 400]), # ~1200 chars = 300 tokens
]
strategy = ContextWindowCompactionStrategy(
max_context_window_tokens=1000,
max_output_tokens=100,
)
changed = await strategy(messages)
assert changed is True
projected = included_messages(messages)
# System message should always be preserved
assert projected[0].role == "system"
# Some messages should have been excluded
assert len(projected) < 5
async def test_context_window_strategy_keep_last_tool_call_groups_respected() -> None:
"""The keep_last_tool_call_groups parameter controls how many groups are retained."""
# Create enough tokens to trigger tool eviction (>50% of input budget)
# input_budget = 1000 - 100 = 900; threshold = 450 tokens
messages = [
Message(role="system", contents=["sys"]),
Message(role="user", contents=["u1"]),
_assistant_function_call("c1"),
_tool_result("c1", "r1 " * 200),
Message(role="user", contents=["u2"]),
_assistant_function_call("c2"),
_tool_result("c2", "r2 " * 200),
Message(role="user", contents=["u3"]),
_assistant_function_call("c3"),
_tool_result("c3", "r3 " * 200),
]
# keep_last_tool_call_groups=1: only the last group (c3) should be kept verbatim
strategy = ContextWindowCompactionStrategy(
max_context_window_tokens=1000,
max_output_tokens=100,
keep_last_tool_call_groups=1,
)
changed = await strategy(messages)
assert changed is True
projected = included_messages(messages)
# The last tool call group (c3) should be in the projected messages
has_c3 = any(
c.call_id == "c3" for m in projected for c in m.contents if c.type in ("function_call", "function_result")
)
assert has_c3
def test_context_window_strategy_validates_thresholds() -> None:
"""Invalid threshold combinations raise ValueError."""
import pytest
with pytest.raises(ValueError, match="max_context_window_tokens must be positive"):
ContextWindowCompactionStrategy(max_context_window_tokens=0, max_output_tokens=0)
with pytest.raises(ValueError, match="max_output_tokens must be >= 0"):
ContextWindowCompactionStrategy(max_context_window_tokens=1000, max_output_tokens=1000)
with pytest.raises(ValueError, match="tool_eviction_threshold must be in"):
ContextWindowCompactionStrategy(
max_context_window_tokens=1000, max_output_tokens=100, tool_eviction_threshold=0.0
)
with pytest.raises(ValueError, match="truncation_threshold must be >= tool_eviction_threshold"):
ContextWindowCompactionStrategy(
max_context_window_tokens=1000,
max_output_tokens=100,
tool_eviction_threshold=0.8,
truncation_threshold=0.5,
)
@@ -0,0 +1,396 @@
# Copyright (c) Microsoft. All rights reserved.
from __future__ import annotations
from collections.abc import AsyncIterator, Mapping
from typing import Any
import pytest
from agent_framework import (
AgentSession,
ChatResponse,
CompactionProvider,
InMemoryHistoryProvider,
Message,
SkillsProvider,
TodoProvider,
create_harness_agent,
)
from agent_framework._harness._agent import DEFAULT_HARNESS_INSTRUCTIONS, _assemble_instructions
from agent_framework._harness._mode import AgentModeProvider
from agent_framework._sessions import ContextProvider
class _FakeChatClient:
"""Minimal chat client stub for testing assembly."""
model = "test-model"
async def get_response(
self,
*,
messages: list[Message],
options: Mapping[str, Any] | None = None,
**kwargs: Any,
) -> ChatResponse:
return ChatResponse(messages=[Message(role="assistant", contents=["Hello"])])
async def get_streaming_response(
self,
*,
messages: list[Message],
options: Mapping[str, Any] | None = None,
**kwargs: Any,
) -> AsyncIterator[Any]:
yield Message(role="assistant", contents=["Hello"]) # pragma: no cover
# --- Assembly Tests ---
def test_create_harness_agent_with_defaults() -> None:
"""create_harness_agent should assemble successfully with default options."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
assert agent.id is not None
def test_create_harness_agent_includes_all_default_providers() -> None:
"""Default assembly should include history, compaction, todo, mode (no skills by default)."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
providers = agent.context_providers
provider_types = [type(p) for p in providers]
assert InMemoryHistoryProvider in provider_types
assert CompactionProvider in provider_types
assert TodoProvider in provider_types
assert AgentModeProvider in provider_types
assert SkillsProvider not in provider_types
def test_create_harness_agent_disable_todo() -> None:
"""disable_todo=True should exclude TodoProvider."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
disable_todo=True,
)
provider_types = [type(p) for p in agent.context_providers]
assert TodoProvider not in provider_types
def test_create_harness_agent_disable_mode() -> None:
"""disable_mode=True should exclude AgentModeProvider."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
disable_mode=True,
)
provider_types = [type(p) for p in agent.context_providers]
assert AgentModeProvider not in provider_types
def test_create_harness_agent_disable_memory() -> None:
"""disable_memory=True should exclude MemoryContextProvider even when memory_store is provided."""
from agent_framework import MemoryContextProvider
from agent_framework._harness._memory import MemoryStore
class _FakeMemoryStore(MemoryStore):
def list_topics(self, session, *, source_id):
return []
def get_topic(self, session, *, source_id, topic):
raise NotImplementedError
def write_topic(self, session, record, *, source_id):
pass
def delete_topic(self, session, *, source_id, topic):
pass
def get_index_text(self, session, *, source_id):
return ""
def get_transcripts_directory(self, session, *, source_id):
return ""
def read_state(self, session, *, source_id):
return {}
def rebuild_index(self, session, *, source_id):
pass
def search_transcripts(self, session, *, source_id, query):
return []
def write_state(self, session, state, *, source_id):
pass
# With memory_store provided and disable_memory=False, MemoryContextProvider should be present.
agent_with_memory = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
memory_store=_FakeMemoryStore(),
)
provider_types = [type(p) for p in agent_with_memory.context_providers]
assert MemoryContextProvider in provider_types
# With memory_store provided and disable_memory=True, MemoryContextProvider should be absent.
agent_disabled = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
memory_store=_FakeMemoryStore(),
disable_memory=True,
)
provider_types = [type(p) for p in agent_disabled.context_providers]
assert MemoryContextProvider not in provider_types
def test_create_harness_agent_skills_paths_adds_provider() -> None:
"""skills_paths should add a SkillsProvider."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
skills_paths=["./test-skills"],
)
provider_types = [type(p) for p in agent.context_providers]
assert SkillsProvider in provider_types
def test_create_harness_agent_disable_compaction() -> None:
"""disable_compaction=True should exclude CompactionProvider."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
disable_compaction=True,
)
provider_types = [type(p) for p in agent.context_providers]
assert CompactionProvider not in provider_types
def test_create_harness_agent_returns_full_agent() -> None:
"""Factory should return an Agent instance (with telemetry)."""
from agent_framework._agents import Agent as FullAgent
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
assert isinstance(agent, FullAgent)
# --- Validation Tests ---
def test_create_harness_agent_rejects_invalid_context_tokens() -> None:
"""max_context_window_tokens must be positive."""
with pytest.raises(ValueError, match="max_context_window_tokens must be positive"):
create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=0,
max_output_tokens=100,
)
def test_create_harness_agent_rejects_negative_output_tokens() -> None:
"""max_output_tokens must be non-negative."""
with pytest.raises(ValueError, match="max_output_tokens must be non-negative"):
create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=1000,
max_output_tokens=-1,
)
def test_create_harness_agent_rejects_output_gte_context() -> None:
"""max_output_tokens must be less than max_context_window_tokens."""
with pytest.raises(ValueError, match="max_output_tokens must be less than"):
create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=1000,
max_output_tokens=1000,
)
# --- Instructions Tests ---
def test_default_instructions() -> None:
"""None args should produce default harness instructions."""
result = _assemble_instructions(None, None)
assert result == DEFAULT_HARNESS_INSTRUCTIONS.strip()
def test_custom_agent_instructions_appended() -> None:
"""Agent instructions should be appended after harness instructions."""
result = _assemble_instructions(None, "Focus on code review.")
assert DEFAULT_HARNESS_INSTRUCTIONS in result # type: ignore[operator]
assert "Focus on code review." in result # type: ignore[operator]
def test_empty_harness_instructions_uses_agent_only() -> None:
"""Empty harness_instructions should return agent instructions only."""
result = _assemble_instructions("", "Custom only.")
assert result == "Custom only."
# --- Identity Tests ---
def test_create_harness_agent_custom_identity() -> None:
"""Custom id, name, description should propagate."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
id="my-agent-id",
name="my-agent",
description="A test agent",
)
assert agent.id == "my-agent-id"
assert agent.name == "my-agent"
assert agent.description == "A test agent"
# --- Session Tests ---
def test_create_harness_agent_create_session() -> None:
"""create_session should return an AgentSession."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
session = agent.create_session()
assert isinstance(session, AgentSession)
def test_create_harness_agent_create_session_with_id() -> None:
"""create_session should accept a custom session_id."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
session = agent.create_session(session_id="custom-id")
assert session.session_id == "custom-id"
async def test_create_harness_agent_run_returns_response() -> None:
"""agent.run() should return a response."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
session = agent.create_session()
response = await agent.run("hello", session=session)
assert response.messages
assert response.messages[-1].role == "assistant"
# --- Protocol Tests ---
def test_create_harness_agent_satisfies_protocol() -> None:
"""Returned agent should satisfy SupportsAgentRun protocol."""
from agent_framework import SupportsAgentRun
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
assert isinstance(agent, SupportsAgentRun)
# --- Additional providers ---
def test_create_harness_agent_extra_context_providers() -> None:
"""Additional context_providers should be appended."""
class _CustomProvider(ContextProvider):
pass
custom = _CustomProvider("custom")
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
context_providers=[custom],
)
assert custom in agent.context_providers
# --- Web Search Tool Tests ---
class _FakeWebSearchClient(_FakeChatClient):
"""Fake client that supports web search tool."""
def get_web_search_tool(self, **kwargs: Any) -> str:
return "web_search_tool_instance"
def test_create_harness_agent_auto_adds_web_search_tool() -> None:
"""Web search tool should be auto-added when client supports it."""
agent = create_harness_agent(
client=_FakeWebSearchClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
tools = agent.default_options.get("tools", [])
assert "web_search_tool_instance" in tools
def test_create_harness_agent_disable_web_search() -> None:
"""disable_web_search=True should skip auto-adding the web search tool."""
agent = create_harness_agent(
client=_FakeWebSearchClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
disable_web_search=True,
)
tools = agent.default_options.get("tools", [])
assert "web_search_tool_instance" not in tools
def test_create_harness_agent_no_web_search_when_unsupported() -> None:
"""Web search tool should NOT be added when client does not support it."""
agent = create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
tools = agent.default_options.get("tools", [])
assert "web_search_tool_instance" not in tools
def test_create_harness_agent_logs_warning_when_no_web_search(caplog: pytest.LogCaptureFixture) -> None:
"""A warning should be logged when client doesn't support web search."""
import logging
with caplog.at_level(logging.WARNING, logger="agent_framework._harness._agent"):
create_harness_agent(
client=_FakeChatClient(), # type: ignore[arg-type]
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
assert any("SupportsWebSearchTool" in msg for msg in caplog.messages)
@@ -10,10 +10,9 @@ import os
import tempfile
import threading
from collections.abc import AsyncIterable, AsyncIterator, Generator, Sequence
from contextlib import suppress
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from dataclasses import asdict, is_dataclass
from pathlib import Path
from contextlib import AbstractAsyncContextManager, AsyncExitStack, suppress
from typing import Protocol, cast
from agent_framework import (
@@ -2923,6 +2923,8 @@ class TestCheckpointContextPathValidation:
f"before={before} after={after}"
)
assert list(root.iterdir()) == [], f"Checkpoint directory created inside root for {context_field}={bad_id!r}"
# region Agent lifecycle (lazy entry & OAuth consent surfacing)
@@ -0,0 +1,83 @@
# Harness Agent Samples
This folder demonstrates `create_harness_agent` — a factory function that builds a
pre-configured, batteries-included agent by assembling the full agent pipeline
from a chat client.
## What is `create_harness_agent`?
`create_harness_agent` bundles the following features into a single `Agent` instance:
| Feature | Description |
|---------|-------------|
| Function invocation | Automatic tool calling loop |
| Per-service-call persistence | History persisted after every model call |
| Compaction | Context-window management (sliding window + tool result compaction) |
| TodoProvider | Todo list management for planning and tracking |
| AgentModeProvider | Plan/execute mode tracking |
| MemoryContextProvider | File-based durable memory (when `memory_store` provided) |
| SkillsProvider | File-based skill discovery and progressive loading |
| OpenTelemetry | Built-in observability |
Each feature can be disabled or customized via keyword arguments.
## Samples
| File | Description |
|------|-------------|
| `harness_research.py` | Interactive research assistant with web search and planning workflow |
## Running
```bash
# Set your Foundry environment variables
export FOUNDRY_PROJECT_ENDPOINT="https://your-project.services.ai.azure.com/api/projects/your-project-name"
export FOUNDRY_MODEL="your-model-deployment-name"
# Authenticate with Azure (required for AzureCliCredential)
az login
# Run the research sample
python samples/02-agents/harness/harness_research.py
```
## Key Concepts
### Minimal Setup
`create_harness_agent` requires only a chat client and token budget parameters:
```python
from agent_framework import create_harness_agent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
agent = create_harness_agent(
client=FoundryChatClient(credential=AzureCliCredential()),
max_context_window_tokens=128_000,
max_output_tokens=16_384,
)
```
### Customization
Disable or customize any feature:
```python
agent = create_harness_agent(
client=client,
max_context_window_tokens=128_000,
max_output_tokens=16_384,
name="my-agent",
agent_instructions="Custom instructions here.",
disable_todo=True, # Skip todo management
disable_mode=True, # Skip plan/execute modes
disable_compaction=True, # Skip compaction
)
```
### Plan/Execute Workflow
The `AgentModeProvider` enables a two-phase workflow:
1. **Plan mode** — Interactive: the agent asks questions, creates todos, gets approval
2. **Execute mode** — Autonomous: the agent works through todos independently
@@ -0,0 +1,143 @@
# Copyright (c) Microsoft. All rights reserved.
"""Harness Research Assistant.
Demonstrates ``create_harness_agent`` — a factory function that builds a
pre-configured agent with batteries included, automatically wiring up function
invocation, per-service-call history persistence, compaction, and a rich set of
context providers:
- **TodoProvider** — the agent can create, track, and complete work items
- **AgentModeProvider** — plan/execute mode tracking (interactive vs. autonomous)
- **SkillsProvider** — file-based skill discovery and progressive loading
- **CompactionProvider** — automatic context-window management
- **InMemoryHistoryProvider** — session history with per-service-call persistence
- **OpenTelemetry** — built-in observability via AgentTelemetryLayer
- **Web Search** — real-time web search via ``get_web_search_tool()``
The sample creates a research-focused agent with web search capability and runs
a simple interactive chat loop. The agent will plan research tasks using todos,
switch between plan and execute modes, search the web for current information,
and track its progress.
Special commands:
/exit — End the session.
Environment variables:
FOUNDRY_PROJECT_ENDPOINT — Azure AI Foundry project endpoint URL
FOUNDRY_MODEL — Model deployment name
Authentication:
Run ``az login`` before running this sample.
"""
import asyncio
from agent_framework import create_harness_agent
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
from dotenv import load_dotenv
RESEARCH_INSTRUCTIONS = """\
## Research Assistant Instructions
You are a research assistant. When given a research topic, research it thoroughly using web search and web browsing.
Use your knowledge to form good search queries and hypotheses, but always verify claims with the tools available to you rather than relying on memory alone.
### Research quality
Consult multiple sources when possible and cross-reference key claims.
When sources disagree, note the discrepancy and explain which source you consider more reliable and why.
If a web page fails to load or a search returns irrelevant results, try alternative search queries or sources before moving on.
Track your sources — you will need them when presenting results.
### Presenting results
When presenting your final findings:
- Use Markdown formatting for clarity.
- Use clear sections with headings for each major topic or sub-question.
- Cite your sources inline (e.g., "According to [source name](URL), ...").
- End with a brief summary of key takeaways.
- In addition to returning the results to the user, save the final research report to file memory so it survives compaction and can be referenced later.
"""
async def main() -> None:
load_dotenv()
# Create the chat client.
# For authentication, run `az login` in terminal or replace AzureCliCredential
# with your preferred authentication option.
client = FoundryChatClient(credential=AzureCliCredential())
# Create a harness agent with research-specific instructions.
# All other features (todo, mode, compaction, skills, telemetry, web search) are
# automatically configured with sensible defaults.
agent = create_harness_agent(
client=client,
max_context_window_tokens=128_000,
max_output_tokens=16_384,
name="ResearchAgent",
description="A research assistant that plans and executes research tasks.",
agent_instructions=RESEARCH_INSTRUCTIONS,
)
# Create a session to maintain conversation state across turns.
session = agent.create_session()
print("Research Assistant (powered by create_harness_agent)")
print("=" * 50)
print("Enter a research topic to get started.")
print("Type /exit to end the session.\n")
# Simple interactive chat loop.
while True:
user_input = input("You: ").strip()
if not user_input:
continue
if user_input.lower() == "/exit":
print("\nGoodbye!")
break
# Run the agent with streaming and print the response as it arrives.
print("\nAssistant: ", end="", flush=True)
async for update in agent.run(user_input, session=session, stream=True):
if update.contents:
for content in update.contents:
# Print a brief message for each tool call in the stream.
if content.type == "function_call":
print(f"\n [calling tool: {content.name}]", flush=True)
print(" ", end="", flush=True)
# Show web search activity when the result arrives with action details.
elif content.type in ("search_tool_call", "search_tool_result") and getattr(content, "tool_name", None) == "web_search":
action = None
if content.type == "search_tool_result" and isinstance(content.result, dict):
action = content.result.get("action", {})
elif content.type == "search_tool_call":
action = content.arguments if isinstance(content.arguments, dict) else None
if action:
action_type = action.get("type", "search")
if action_type == "search":
queries = action.get("queries") or []
query_str = ", ".join(f'"{q}"' for q in queries) if queries else action.get("query", "")
print(f"\n 🌐 Web search: {query_str}", flush=True)
print(" ", end="", flush=True)
elif action_type == "open_page":
url = action.get("url", "(unknown)")
print(f"\n 🌐 Opening: {url}", flush=True)
print(" ", end="", flush=True)
elif action_type == "find_in_page":
pattern = action.get("pattern", "")
print(f'\n 🌐 Find in page: "{pattern}"', flush=True)
print(" ", end="", flush=True)
else:
print(f"\n 🌐 Web search: {action_type}", flush=True)
print(" ", end="", flush=True)
# Print text content as it streams in.
if update.text:
print(update.text, end="", flush=True)
print("\n")
if __name__ == "__main__":
asyncio.run(main())