Files
agent-framework/python/samples/02-agents/harness/console/state_driver.py
westey bad05a2bdc Python: Harness console for python (#6312)
* Add initial harness console for python

* Add textual to project

* Add planning and approval flows with list selector

* Address PR comments

* Fix list selection bug

* Fix PR #6312 round 2 review comments

- Escape untrusted agent text with rich.markup.escape() in observers
  (text_output, planning_output, reasoning_display) to prevent markup injection
- Remove non-functional 'Always approve' choices from tool_approval.py
  (framework lacks CreateAlwaysApproveToolResponse support)
- Remove textual from root pyproject.toml dev deps (sample-specific)
- Add PEP 723 inline script metadata to harness_research.py
- Narrow except Exception to except NoMatches in list_selection.py

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix build error

* Fix build errors

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-09 05:48:35 +00:00

339 lines
11 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""State driver interface for UI updates.
This module defines the IUXStateDriver Protocol, which observers use to
update the UI during agent streaming. This is an interface-only definition;
the concrete implementation will be in a separate module.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Protocol
if TYPE_CHECKING:
from agent_framework import AgentSession
from .app_state import FollowUpAction
class IUXStateDriver(Protocol):
"""Protocol for UI state driver.
Observers call these methods to update the UI during agent streaming.
This is an interface-only definition - concrete implementation comes later.
The state driver acts as a controller between the agent framework (model)
and the Textual UI components (view), coordinating all UI updates.
"""
def append_info_line(self, text: str, color: str | None = None) -> None:
"""Append an informational line to the output.
Used for displaying tool calls, errors, warnings, and other
informational messages that aren't part of the agent's text response.
Args:
text: The text to display.
color: Optional Rich color string (e.g., "yellow", "red", "dim").
"""
...
def append_stream_footer(self, text: str) -> None:
"""Append a footer line after streaming ends.
Used for displaying final status messages like "(no text response)"
or other closing information.
Args:
text: The footer text to display.
"""
...
def begin_streaming(self) -> None:
"""Begin streaming mode.
Switches the bottom panel to streaming mode (shows "Streaming..." indicator),
starts the spinner animation, and prepares for streaming text updates.
"""
...
def update_streaming_text(self, accumulated_text: str) -> None:
"""Update the accumulated streaming text.
Called repeatedly during streaming to update the displayed text as
new chunks arrive from the agent. The text should accumulate across
multiple calls.
Args:
accumulated_text: The full accumulated text so far.
"""
...
def write_text(self, text: str, color: str | None = None) -> None:
"""Write a streaming text chunk incrementally.
Appends the text to the current streaming entry. If the streaming
entry is no longer the last output item (e.g., an info_line was
inserted), creates a new streaming entry.
Args:
text: The text chunk to append.
color: Optional Rich color string.
"""
...
def end_streaming(self) -> None:
"""End streaming mode.
Stops the spinner, switches the bottom panel back to text input mode,
and finalizes the streaming output.
"""
...
def enqueue_follow_up_action(self, action: FollowUpAction) -> None:
"""Add a follow-up action to the queue.
Follow-up actions can be questions to ask the user or messages to
inject into the next agent turn. The state driver queues these and
processes them after streaming completes.
Args:
action: The follow-up action to queue.
"""
...
def has_pending_questions(self) -> bool:
"""Check if there are pending follow-up questions awaiting user answers.
Returns:
True if there are unanswered questions in the queue.
"""
...
def take_follow_up_responses(self) -> list:
"""Take and clear all accumulated follow-up response messages.
Returns:
List of Message objects accumulated from follow-up actions.
"""
...
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
"""Write a warning if the agent produced no text output.
Called after streaming completes. If no text was received and no
follow-up actions exist, writes a "(no text response)" footer.
Args:
has_follow_up_actions: Whether follow-up actions exist.
"""
...
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
"""Set the current agent mode.
Updates the mode indicator in the UI (e.g., "[plan]", "[execute]")
with the specified color.
Args:
mode: The mode name (e.g., "plan", "execute"), or None to hide.
mode_color: Optional Rich color string for the mode label.
"""
...
def set_show_spinner(self, show: bool) -> None:
"""Show or hide the spinner animation.
The spinner provides visual feedback that the agent is processing.
Args:
show: True to show the spinner, False to hide it.
"""
...
def set_usage_text(self, usage_text: str | None) -> None:
"""Set the token usage text.
Displays token usage statistics (e.g., "1.2K in / 856 out") in
the status bar.
Args:
usage_text: The formatted usage text, or None to hide.
"""
...
@property
def current_mode(self) -> str | None:
"""Get the current agent mode.
Returns:
The current mode name, or None if no mode is set.
"""
...
def begin_streaming_output(self) -> None:
"""Reset per-turn streaming bookkeeping.
Called at the start of each agent turn to reset streaming state
(e.g., clear accumulated text, reset flags).
"""
...
def write_user_input_echo(self, text: str) -> None:
"""Echo user input to the output area.
Displays the user's submitted input in the conversation history,
typically with a "You: " prefix.
Args:
text: The user's input text.
"""
...
def request_shutdown(self) -> None:
"""Request the application to shut down.
Called by the /exit command handler to signal that the user
wants to quit the console.
"""
...
def replace_session(self, session: AgentSession) -> None:
"""Replace the current agent session.
Called by the /session-import command handler to swap the
active session with one loaded from a file.
Args:
session: The new session to use.
"""
...
class SimpleConsoleStateDriver:
"""Simple console-based state driver for testing.
This is a minimal implementation that logs all operations to the console.
Useful for testing the agent runner without a full UI.
"""
def __init__(self) -> None:
"""Initialize the simple state driver."""
self._streaming = False
self._spinner_visible = False
self._current_mode: str | None = None
print("[SimpleConsoleStateDriver initialized]")
def append_info_line(self, text: str, color: str | None = None) -> None:
"""Append an informational line to the output."""
color_prefix = f"[{color}]" if color else ""
print(f"{color_prefix} {text}")
def append_stream_footer(self, text: str) -> None:
"""Append a footer line after streaming ends."""
print(f"[Footer] {text}")
async def write_info_line(self, text: str, color: str | None = None) -> None:
"""Async version of append_info_line."""
self.append_info_line(text, color)
def write_user_input_echo(self, text: str) -> None:
"""Echo user input to the output."""
print(f"\n[User] {text}\n")
def begin_streaming(self) -> None:
"""Begin streaming mode."""
self._streaming = True
print("[▶ Streaming started]")
def begin_streaming_output(self) -> None:
"""Begin streaming output to the scroll panel."""
print("[▶ Streaming output started]")
def update_streaming_text(self, text: str) -> None:
"""Update the currently streaming text."""
# Truncate for readability
display_text = text[:80] + "..." if len(text) > 80 else text
print(f"[Assistant] {display_text}", end="", flush=True)
def write_text(self, text: str, color: str | None = None) -> None:
"""Write a streaming text chunk."""
print(text, end="", flush=True)
async def end_streaming_output(self) -> None:
"""End streaming output."""
print("\n[▪ Streaming output ended]")
def end_streaming(self) -> None:
"""End streaming mode."""
self._streaming = False
print("[▪ Streaming ended]")
def set_show_spinner(self, show: bool) -> None:
"""Show or hide the spinner."""
self._spinner_visible = show
status = "visible" if show else "hidden"
print(f"[Spinner: {status}]")
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
"""Set the current mode text."""
self._current_mode = mode
color_str = f" ({mode_color})" if mode_color else ""
print(f"[Mode: {mode or 'default'}{color_str}]")
@property
def current_mode(self) -> str | None:
"""Get the current agent mode."""
return self._current_mode
def set_usage_text(self, usage_text: str | None) -> None:
"""Set the usage display text."""
if usage_text:
print(f"[Usage: {usage_text}]")
def enqueue_follow_up_action(self, action) -> None:
"""Enqueue a follow-up action.
Args:
action: The follow-up action to enqueue.
"""
action_type = type(action).__name__
print(f"[Follow-up queued: {action_type}]")
def has_pending_questions(self) -> bool:
"""Check if there are pending follow-up questions."""
return False
def take_follow_up_responses(self) -> list:
"""Take and clear all accumulated follow-up responses."""
return []
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
"""Write a warning if no text was produced."""
if not has_follow_up_actions:
print("[▪ (no text response from agent)]")
def update_last_entry(self, entry_type, new_text: str) -> None:
"""Update the last output entry (placeholder for now).
Args:
entry_type: The type of entry to update.
new_text: The new text content.
"""
# Simplified: just print the update
display_text = new_text[:80] + "..." if len(new_text) > 80 else new_text
print(f"[Update last entry: {display_text}]", flush=True)
def request_shutdown(self) -> None:
"""Request application shutdown."""
print("[Shutdown requested]")
def replace_session(self, session) -> None:
"""Replace the active session.
Args:
session: The new session to use.
"""
print(f"[Session replaced: {getattr(session, 'id', 'unknown')}]")