mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
bad05a2bdc
* Add initial harness console for python * Add textual to project * Add planning and approval flows with list selector * Address PR comments * Fix list selection bug * Fix PR #6312 round 2 review comments - Escape untrusted agent text with rich.markup.escape() in observers (text_output, planning_output, reasoning_display) to prevent markup injection - Remove non-functional 'Always approve' choices from tool_approval.py (framework lacks CreateAlwaysApproveToolResponse support) - Remove textual from root pyproject.toml dev deps (sample-specific) - Add PEP 723 inline script metadata to harness_research.py - Narrow except Exception to except NoMatches in list_selection.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix build error * Fix build errors --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
344 lines
12 KiB
Python
344 lines
12 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Agent runner orchestration for the harness console.
|
|
|
|
This module provides the HarnessAgentRunner class, which orchestrates agent
|
|
invocations with observer lifecycle management. It handles:
|
|
- User input dispatch
|
|
- Agent streaming with observer notifications
|
|
- Follow-up action collection
|
|
- Streaming state management
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from typing import TYPE_CHECKING
|
|
|
|
if TYPE_CHECKING:
|
|
from agent_framework import Agent, AgentSession
|
|
|
|
from .app_state import FollowUpAction
|
|
from .observers.base import ConsoleObserver
|
|
from .state_driver import IUXStateDriver
|
|
|
|
|
|
class HarnessAgentRunner:
|
|
"""Orchestrates agent invocations driven by user-input events from the UI.
|
|
|
|
The component invokes the runner's input handlers (run_turn) directly;
|
|
the runner mutates UI state through the supplied IUXStateDriver.
|
|
|
|
This is a minimal implementation focusing on the core agent loop without
|
|
command handling or complex message injection (those can be added later).
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
agent: Agent,
|
|
observers: list[ConsoleObserver],
|
|
state_driver: IUXStateDriver,
|
|
*,
|
|
max_context_window_tokens: int | None = None,
|
|
max_output_tokens: int | None = None,
|
|
) -> None:
|
|
"""Initialize the agent runner.
|
|
|
|
Args:
|
|
agent: The agent to orchestrate.
|
|
observers: List of console observers for lifecycle events.
|
|
state_driver: The UI state driver for observer updates.
|
|
max_context_window_tokens: Optional max context window size for usage display.
|
|
max_output_tokens: Optional max output tokens for usage display.
|
|
"""
|
|
self._agent = agent
|
|
self._observers = observers
|
|
self._ux = state_driver
|
|
self._max_context_window_tokens = max_context_window_tokens
|
|
self._max_output_tokens = max_output_tokens
|
|
self._input_gate = asyncio.Semaphore(1) # Single turn at a time
|
|
|
|
async def run_turn(
|
|
self,
|
|
user_input: str,
|
|
session: AgentSession | None = None,
|
|
) -> None:
|
|
"""Run a single agent turn with the given user input.
|
|
|
|
Echoes the input, then delegates to the agent loop.
|
|
|
|
Args:
|
|
user_input: The user's input text.
|
|
session: Optional agent session for conversation history.
|
|
"""
|
|
async with self._input_gate:
|
|
self._ux.write_user_input_echo(user_input)
|
|
|
|
from agent_framework import Message
|
|
|
|
messages = [Message(role="user", contents=[user_input])]
|
|
await self._run_agent_loop(messages, session)
|
|
|
|
async def start_agent_turn(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None = None,
|
|
) -> None:
|
|
"""Resume the agent loop with pre-built messages (from follow-up responses).
|
|
|
|
Called by the app after the user finishes answering follow-up questions.
|
|
If messages is empty, just completes the turn.
|
|
|
|
Args:
|
|
messages: List of Message objects to send to the agent.
|
|
session: Optional agent session.
|
|
"""
|
|
async with self._input_gate:
|
|
if not messages:
|
|
self._complete_turn()
|
|
return
|
|
await self._run_agent_loop(messages, session)
|
|
|
|
async def _run_agent_loop(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None,
|
|
) -> None:
|
|
"""Run the agent loop, re-invoking as needed for follow-up messages.
|
|
|
|
Loops while there are messages to send. After each stream:
|
|
- Collects follow-up actions from observers
|
|
- If questions exist → queue them and return (UI will collect answers)
|
|
- If only direct messages → loop with those messages
|
|
- If nothing → complete the turn
|
|
|
|
Args:
|
|
messages: Initial messages to send.
|
|
session: Optional agent session.
|
|
"""
|
|
next_messages = messages
|
|
|
|
while next_messages:
|
|
# Configure run options
|
|
options = self._configure_run_options(session)
|
|
|
|
# Begin streaming
|
|
self._ux.begin_streaming()
|
|
self._ux.begin_streaming_output()
|
|
self._ux.set_show_spinner(True)
|
|
|
|
try:
|
|
await self._stream_response_messages(next_messages, session, options)
|
|
except Exception as ex:
|
|
self._ux.append_info_line(
|
|
f"❌ Stream error: {ex.__class__.__name__}:\n{ex}",
|
|
color="red",
|
|
)
|
|
|
|
# Stop spinner and end streaming output
|
|
self._ux.set_show_spinner(False)
|
|
|
|
# Collect follow-up actions from observers
|
|
follow_up_actions = await self._collect_follow_up_actions(session)
|
|
|
|
# Separate direct messages from questions
|
|
has_follow_ups = len(follow_up_actions) > 0
|
|
|
|
# Write no-text warning if applicable
|
|
await self._ux.write_no_text_warning(has_follow_ups)
|
|
|
|
# Enqueue all follow-up actions
|
|
for action in follow_up_actions:
|
|
self._ux.enqueue_follow_up_action(action)
|
|
|
|
# Check if there are pending questions (UI needs user input)
|
|
if self._ux.has_pending_questions():
|
|
# Pause — the UI will collect answers and call start_agent_turn
|
|
return
|
|
|
|
# No questions — drain any accumulated direct messages and loop
|
|
drained = self._ux.take_follow_up_responses()
|
|
next_messages = drained if drained else None
|
|
|
|
self._complete_turn()
|
|
|
|
def _complete_turn(self) -> None:
|
|
"""Complete the current turn (end streaming)."""
|
|
self._ux.end_streaming()
|
|
|
|
def _configure_run_options(
|
|
self,
|
|
session: AgentSession | None,
|
|
) -> dict:
|
|
"""Configure run options via observers.
|
|
|
|
Each observer can modify the options dict to influence agent behavior.
|
|
|
|
Args:
|
|
session: Optional agent session.
|
|
|
|
Returns:
|
|
Options dict for agent.run().
|
|
"""
|
|
options = {}
|
|
for observer in self._observers:
|
|
observer.configure_run_options(options, self._agent, session)
|
|
return options
|
|
|
|
async def _stream_response(
|
|
self,
|
|
user_input: str,
|
|
session: AgentSession | None,
|
|
options: dict,
|
|
) -> None:
|
|
"""Stream agent response from a text input and dispatch to observers.
|
|
|
|
Args:
|
|
user_input: The user's input text.
|
|
session: Optional agent session.
|
|
options: Run options configured by observers.
|
|
"""
|
|
# Stream response using agent.run(stream=True)
|
|
stream = self._agent.run(
|
|
user_input,
|
|
stream=True,
|
|
session=session,
|
|
options=options,
|
|
)
|
|
|
|
# Process each update chunk
|
|
async for update in stream:
|
|
await self._dispatch_update(update, session)
|
|
|
|
# Extract usage from the final response
|
|
self._extract_usage(stream)
|
|
|
|
async def _stream_response_messages(
|
|
self,
|
|
messages: list,
|
|
session: AgentSession | None,
|
|
options: dict,
|
|
) -> None:
|
|
"""Stream agent response from Message objects and dispatch to observers.
|
|
|
|
Args:
|
|
messages: List of Message objects to send.
|
|
session: Optional agent session.
|
|
options: Run options configured by observers.
|
|
"""
|
|
stream = self._agent.run(
|
|
messages,
|
|
stream=True,
|
|
session=session,
|
|
options=options,
|
|
)
|
|
|
|
async for update in stream:
|
|
await self._dispatch_update(update, session)
|
|
|
|
self._extract_usage(stream)
|
|
|
|
def _extract_usage(self, stream) -> None:
|
|
"""Extract token usage from a completed stream."""
|
|
try:
|
|
get_final = getattr(stream, "get_final_response", None)
|
|
if not get_final:
|
|
return
|
|
|
|
import inspect
|
|
|
|
if inspect.iscoroutinefunction(get_final):
|
|
return
|
|
|
|
final_response = get_final()
|
|
if final_response is None:
|
|
return
|
|
|
|
usage = getattr(final_response, "usage_details", None)
|
|
if not isinstance(usage, dict):
|
|
return
|
|
|
|
input_tokens = usage.get("input_token_count", 0) or 0
|
|
output_tokens = usage.get("output_token_count", 0) or 0
|
|
if input_tokens or output_tokens:
|
|
self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens))
|
|
except (AttributeError, TypeError):
|
|
pass
|
|
|
|
async def _dispatch_update(
|
|
self,
|
|
update, # AgentResponseUpdate
|
|
session: AgentSession | None,
|
|
) -> None:
|
|
"""Dispatch a single update to all observers.
|
|
|
|
Calls observer lifecycle methods in order:
|
|
1. on_response_update (once per update)
|
|
2. on_content (for each content item)
|
|
3. on_text (if text is present)
|
|
|
|
Args:
|
|
update: The agent response update.
|
|
session: Optional agent session.
|
|
"""
|
|
# on_response_update
|
|
for observer in self._observers:
|
|
await observer.on_response_update(self._ux, update, self._agent, session)
|
|
|
|
# on_content for each content item
|
|
if hasattr(update, "contents") and update.contents:
|
|
for content in update.contents:
|
|
for observer in self._observers:
|
|
await observer.on_content(self._ux, content, self._agent, session)
|
|
|
|
# on_text for text chunks
|
|
if hasattr(update, "text") and update.text:
|
|
for observer in self._observers:
|
|
await observer.on_text(self._ux, update.text, self._agent, session)
|
|
|
|
async def _collect_follow_up_actions(
|
|
self,
|
|
session: AgentSession | None,
|
|
) -> list[FollowUpAction]:
|
|
"""Collect follow-up actions from all observers.
|
|
|
|
Called after streaming completes to gather any follow-up questions
|
|
or messages from observers.
|
|
|
|
Args:
|
|
session: Optional agent session.
|
|
|
|
Returns:
|
|
List of follow-up actions from all observers.
|
|
"""
|
|
actions: list[FollowUpAction] = []
|
|
for observer in self._observers:
|
|
observer_actions = await observer.on_stream_complete(
|
|
self._ux, self._agent, session
|
|
)
|
|
if observer_actions:
|
|
actions.extend(observer_actions)
|
|
return actions
|
|
|
|
def _format_usage(self, input_tokens: int, output_tokens: int) -> str:
|
|
"""Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z."""
|
|
total_tokens = input_tokens + output_tokens
|
|
|
|
input_budget = None
|
|
if self._max_context_window_tokens and self._max_output_tokens:
|
|
input_budget = self._max_context_window_tokens - self._max_output_tokens
|
|
|
|
return (
|
|
f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}"
|
|
f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}"
|
|
f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}"
|
|
)
|
|
|
|
@staticmethod
|
|
def _format_token_count(count: int, budget: int | None) -> str:
|
|
"""Format a token count, optionally showing budget percentage."""
|
|
if budget and budget > 0:
|
|
pct = count / budget * 100
|
|
return f"{count:,}/{budget:,} ({pct:.1f}%)"
|
|
return f"{count:,}"
|