Files
agent-framework/python/samples/02-agents/harness/console/agent_runner.py
westey bad05a2bdc Python: Harness console for python (#6312)
* Add initial harness console for python

* Add textual to project

* Add planning and approval flows with list selector

* Address PR comments

* Fix list selection bug

* Fix PR #6312 round 2 review comments

- Escape untrusted agent text with rich.markup.escape() in observers
  (text_output, planning_output, reasoning_display) to prevent markup injection
- Remove non-functional 'Always approve' choices from tool_approval.py
  (framework lacks CreateAlwaysApproveToolResponse support)
- Remove textual from root pyproject.toml dev deps (sample-specific)
- Add PEP 723 inline script metadata to harness_research.py
- Narrow except Exception to except NoMatches in list_selection.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build error

* Fix build errors

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-09 05:48:35 +00:00

344 lines
12 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Agent runner orchestration for the harness console.
This module provides the HarnessAgentRunner class, which orchestrates agent
invocations with observer lifecycle management. It handles:
- User input dispatch
- Agent streaming with observer notifications
- Follow-up action collection
- Streaming state management
"""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from agent_framework import Agent, AgentSession
from .app_state import FollowUpAction
from .observers.base import ConsoleObserver
from .state_driver import IUXStateDriver
class HarnessAgentRunner:
"""Orchestrates agent invocations driven by user-input events from the UI.
The component invokes the runner's input handlers (run_turn) directly;
the runner mutates UI state through the supplied IUXStateDriver.
This is a minimal implementation focusing on the core agent loop without
command handling or complex message injection (those can be added later).
"""
def __init__(
self,
agent: Agent,
observers: list[ConsoleObserver],
state_driver: IUXStateDriver,
*,
max_context_window_tokens: int | None = None,
max_output_tokens: int | None = None,
) -> None:
"""Initialize the agent runner.
Args:
agent: The agent to orchestrate.
observers: List of console observers for lifecycle events.
state_driver: The UI state driver for observer updates.
max_context_window_tokens: Optional max context window size for usage display.
max_output_tokens: Optional max output tokens for usage display.
"""
self._agent = agent
self._observers = observers
self._ux = state_driver
self._max_context_window_tokens = max_context_window_tokens
self._max_output_tokens = max_output_tokens
self._input_gate = asyncio.Semaphore(1) # Single turn at a time
async def run_turn(
self,
user_input: str,
session: AgentSession | None = None,
) -> None:
"""Run a single agent turn with the given user input.
Echoes the input, then delegates to the agent loop.
Args:
user_input: The user's input text.
session: Optional agent session for conversation history.
"""
async with self._input_gate:
self._ux.write_user_input_echo(user_input)
from agent_framework import Message
messages = [Message(role="user", contents=[user_input])]
await self._run_agent_loop(messages, session)
async def start_agent_turn(
self,
messages: list,
session: AgentSession | None = None,
) -> None:
"""Resume the agent loop with pre-built messages (from follow-up responses).
Called by the app after the user finishes answering follow-up questions.
If messages is empty, just completes the turn.
Args:
messages: List of Message objects to send to the agent.
session: Optional agent session.
"""
async with self._input_gate:
if not messages:
self._complete_turn()
return
await self._run_agent_loop(messages, session)
async def _run_agent_loop(
self,
messages: list,
session: AgentSession | None,
) -> None:
"""Run the agent loop, re-invoking as needed for follow-up messages.
Loops while there are messages to send. After each stream:
- Collects follow-up actions from observers
- If questions exist → queue them and return (UI will collect answers)
- If only direct messages → loop with those messages
- If nothing → complete the turn
Args:
messages: Initial messages to send.
session: Optional agent session.
"""
next_messages = messages
while next_messages:
# Configure run options
options = self._configure_run_options(session)
# Begin streaming
self._ux.begin_streaming()
self._ux.begin_streaming_output()
self._ux.set_show_spinner(True)
try:
await self._stream_response_messages(next_messages, session, options)
except Exception as ex:
self._ux.append_info_line(
f"❌ Stream error: {ex.__class__.__name__}:\n{ex}",
color="red",
)
# Stop spinner and end streaming output
self._ux.set_show_spinner(False)
# Collect follow-up actions from observers
follow_up_actions = await self._collect_follow_up_actions(session)
# Separate direct messages from questions
has_follow_ups = len(follow_up_actions) > 0
# Write no-text warning if applicable
await self._ux.write_no_text_warning(has_follow_ups)
# Enqueue all follow-up actions
for action in follow_up_actions:
self._ux.enqueue_follow_up_action(action)
# Check if there are pending questions (UI needs user input)
if self._ux.has_pending_questions():
# Pause — the UI will collect answers and call start_agent_turn
return
# No questions — drain any accumulated direct messages and loop
drained = self._ux.take_follow_up_responses()
next_messages = drained if drained else None
self._complete_turn()
def _complete_turn(self) -> None:
"""Complete the current turn (end streaming)."""
self._ux.end_streaming()
def _configure_run_options(
self,
session: AgentSession | None,
) -> dict:
"""Configure run options via observers.
Each observer can modify the options dict to influence agent behavior.
Args:
session: Optional agent session.
Returns:
Options dict for agent.run().
"""
options = {}
for observer in self._observers:
observer.configure_run_options(options, self._agent, session)
return options
async def _stream_response(
self,
user_input: str,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from a text input and dispatch to observers.
Args:
user_input: The user's input text.
session: Optional agent session.
options: Run options configured by observers.
"""
# Stream response using agent.run(stream=True)
stream = self._agent.run(
user_input,
stream=True,
session=session,
options=options,
)
# Process each update chunk
async for update in stream:
await self._dispatch_update(update, session)
# Extract usage from the final response
self._extract_usage(stream)
async def _stream_response_messages(
self,
messages: list,
session: AgentSession | None,
options: dict,
) -> None:
"""Stream agent response from Message objects and dispatch to observers.
Args:
messages: List of Message objects to send.
session: Optional agent session.
options: Run options configured by observers.
"""
stream = self._agent.run(
messages,
stream=True,
session=session,
options=options,
)
async for update in stream:
await self._dispatch_update(update, session)
self._extract_usage(stream)
def _extract_usage(self, stream) -> None:
"""Extract token usage from a completed stream."""
try:
get_final = getattr(stream, "get_final_response", None)
if not get_final:
return
import inspect
if inspect.iscoroutinefunction(get_final):
return
final_response = get_final()
if final_response is None:
return
usage = getattr(final_response, "usage_details", None)
if not isinstance(usage, dict):
return
input_tokens = usage.get("input_token_count", 0) or 0
output_tokens = usage.get("output_token_count", 0) or 0
if input_tokens or output_tokens:
self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens))
except (AttributeError, TypeError):
pass
async def _dispatch_update(
self,
update, # AgentResponseUpdate
session: AgentSession | None,
) -> None:
"""Dispatch a single update to all observers.
Calls observer lifecycle methods in order:
1. on_response_update (once per update)
2. on_content (for each content item)
3. on_text (if text is present)
Args:
update: The agent response update.
session: Optional agent session.
"""
# on_response_update
for observer in self._observers:
await observer.on_response_update(self._ux, update, self._agent, session)
# on_content for each content item
if hasattr(update, "contents") and update.contents:
for content in update.contents:
for observer in self._observers:
await observer.on_content(self._ux, content, self._agent, session)
# on_text for text chunks
if hasattr(update, "text") and update.text:
for observer in self._observers:
await observer.on_text(self._ux, update.text, self._agent, session)
async def _collect_follow_up_actions(
self,
session: AgentSession | None,
) -> list[FollowUpAction]:
"""Collect follow-up actions from all observers.
Called after streaming completes to gather any follow-up questions
or messages from observers.
Args:
session: Optional agent session.
Returns:
List of follow-up actions from all observers.
"""
actions: list[FollowUpAction] = []
for observer in self._observers:
observer_actions = await observer.on_stream_complete(
self._ux, self._agent, session
)
if observer_actions:
actions.extend(observer_actions)
return actions
def _format_usage(self, input_tokens: int, output_tokens: int) -> str:
"""Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z."""
total_tokens = input_tokens + output_tokens
input_budget = None
if self._max_context_window_tokens and self._max_output_tokens:
input_budget = self._max_context_window_tokens - self._max_output_tokens
return (
f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}"
f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}"
f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}"
)
@staticmethod
def _format_token_count(count: int, budget: int | None) -> str:
"""Format a token count, optionally showing budget percentage."""
if budget and budget > 0:
pct = count / budget * 100
return f"{count:,}/{budget:,} ({pct:.1f}%)"
return f"{count:,}"