mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
bad05a2bdc
* Add initial harness console for python * Add textual to project * Add planning and approval flows with list selector * Address PR comments * Fix list selection bug * Fix PR #6312 round 2 review comments - Escape untrusted agent text with rich.markup.escape() in observers (text_output, planning_output, reasoning_display) to prevent markup injection - Remove non-functional 'Always approve' choices from tool_approval.py (framework lacks CreateAlwaysApproveToolResponse support) - Remove textual from root pyproject.toml dev deps (sample-specific) - Add PEP 723 inline script metadata to harness_research.py - Narrow except Exception to except NoMatches in list_selection.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix build error * Fix build errors --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
339 lines
11 KiB
Python
339 lines
11 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""State driver interface for UI updates.
|
|
|
|
This module defines the IUXStateDriver Protocol, which observers use to
|
|
update the UI during agent streaming. This is an interface-only definition;
|
|
the concrete implementation will be in a separate module.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Protocol
|
|
|
|
if TYPE_CHECKING:
|
|
from agent_framework import AgentSession
|
|
|
|
from .app_state import FollowUpAction
|
|
|
|
|
|
class IUXStateDriver(Protocol):
|
|
"""Protocol for UI state driver.
|
|
|
|
Observers call these methods to update the UI during agent streaming.
|
|
This is an interface-only definition - concrete implementation comes later.
|
|
|
|
The state driver acts as a controller between the agent framework (model)
|
|
and the Textual UI components (view), coordinating all UI updates.
|
|
"""
|
|
|
|
def append_info_line(self, text: str, color: str | None = None) -> None:
|
|
"""Append an informational line to the output.
|
|
|
|
Used for displaying tool calls, errors, warnings, and other
|
|
informational messages that aren't part of the agent's text response.
|
|
|
|
Args:
|
|
text: The text to display.
|
|
color: Optional Rich color string (e.g., "yellow", "red", "dim").
|
|
"""
|
|
...
|
|
|
|
def append_stream_footer(self, text: str) -> None:
|
|
"""Append a footer line after streaming ends.
|
|
|
|
Used for displaying final status messages like "(no text response)"
|
|
or other closing information.
|
|
|
|
Args:
|
|
text: The footer text to display.
|
|
"""
|
|
...
|
|
|
|
def begin_streaming(self) -> None:
|
|
"""Begin streaming mode.
|
|
|
|
Switches the bottom panel to streaming mode (shows "Streaming..." indicator),
|
|
starts the spinner animation, and prepares for streaming text updates.
|
|
"""
|
|
...
|
|
|
|
def update_streaming_text(self, accumulated_text: str) -> None:
|
|
"""Update the accumulated streaming text.
|
|
|
|
Called repeatedly during streaming to update the displayed text as
|
|
new chunks arrive from the agent. The text should accumulate across
|
|
multiple calls.
|
|
|
|
Args:
|
|
accumulated_text: The full accumulated text so far.
|
|
"""
|
|
...
|
|
|
|
def write_text(self, text: str, color: str | None = None) -> None:
|
|
"""Write a streaming text chunk incrementally.
|
|
|
|
Appends the text to the current streaming entry. If the streaming
|
|
entry is no longer the last output item (e.g., an info_line was
|
|
inserted), creates a new streaming entry.
|
|
|
|
Args:
|
|
text: The text chunk to append.
|
|
color: Optional Rich color string.
|
|
"""
|
|
...
|
|
|
|
def end_streaming(self) -> None:
|
|
"""End streaming mode.
|
|
|
|
Stops the spinner, switches the bottom panel back to text input mode,
|
|
and finalizes the streaming output.
|
|
"""
|
|
...
|
|
|
|
def enqueue_follow_up_action(self, action: FollowUpAction) -> None:
|
|
"""Add a follow-up action to the queue.
|
|
|
|
Follow-up actions can be questions to ask the user or messages to
|
|
inject into the next agent turn. The state driver queues these and
|
|
processes them after streaming completes.
|
|
|
|
Args:
|
|
action: The follow-up action to queue.
|
|
"""
|
|
...
|
|
|
|
def has_pending_questions(self) -> bool:
|
|
"""Check if there are pending follow-up questions awaiting user answers.
|
|
|
|
Returns:
|
|
True if there are unanswered questions in the queue.
|
|
"""
|
|
...
|
|
|
|
def take_follow_up_responses(self) -> list:
|
|
"""Take and clear all accumulated follow-up response messages.
|
|
|
|
Returns:
|
|
List of Message objects accumulated from follow-up actions.
|
|
"""
|
|
...
|
|
|
|
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
|
|
"""Write a warning if the agent produced no text output.
|
|
|
|
Called after streaming completes. If no text was received and no
|
|
follow-up actions exist, writes a "(no text response)" footer.
|
|
|
|
Args:
|
|
has_follow_up_actions: Whether follow-up actions exist.
|
|
"""
|
|
...
|
|
|
|
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
|
|
"""Set the current agent mode.
|
|
|
|
Updates the mode indicator in the UI (e.g., "[plan]", "[execute]")
|
|
with the specified color.
|
|
|
|
Args:
|
|
mode: The mode name (e.g., "plan", "execute"), or None to hide.
|
|
mode_color: Optional Rich color string for the mode label.
|
|
"""
|
|
...
|
|
|
|
def set_show_spinner(self, show: bool) -> None:
|
|
"""Show or hide the spinner animation.
|
|
|
|
The spinner provides visual feedback that the agent is processing.
|
|
|
|
Args:
|
|
show: True to show the spinner, False to hide it.
|
|
"""
|
|
...
|
|
|
|
def set_usage_text(self, usage_text: str | None) -> None:
|
|
"""Set the token usage text.
|
|
|
|
Displays token usage statistics (e.g., "1.2K in / 856 out") in
|
|
the status bar.
|
|
|
|
Args:
|
|
usage_text: The formatted usage text, or None to hide.
|
|
"""
|
|
...
|
|
|
|
@property
|
|
def current_mode(self) -> str | None:
|
|
"""Get the current agent mode.
|
|
|
|
Returns:
|
|
The current mode name, or None if no mode is set.
|
|
"""
|
|
...
|
|
|
|
def begin_streaming_output(self) -> None:
|
|
"""Reset per-turn streaming bookkeeping.
|
|
|
|
Called at the start of each agent turn to reset streaming state
|
|
(e.g., clear accumulated text, reset flags).
|
|
"""
|
|
...
|
|
|
|
def write_user_input_echo(self, text: str) -> None:
|
|
"""Echo user input to the output area.
|
|
|
|
Displays the user's submitted input in the conversation history,
|
|
typically with a "You: " prefix.
|
|
|
|
Args:
|
|
text: The user's input text.
|
|
"""
|
|
...
|
|
|
|
def request_shutdown(self) -> None:
|
|
"""Request the application to shut down.
|
|
|
|
Called by the /exit command handler to signal that the user
|
|
wants to quit the console.
|
|
"""
|
|
...
|
|
|
|
def replace_session(self, session: AgentSession) -> None:
|
|
"""Replace the current agent session.
|
|
|
|
Called by the /session-import command handler to swap the
|
|
active session with one loaded from a file.
|
|
|
|
Args:
|
|
session: The new session to use.
|
|
"""
|
|
...
|
|
|
|
|
|
class SimpleConsoleStateDriver:
|
|
"""Simple console-based state driver for testing.
|
|
|
|
This is a minimal implementation that logs all operations to the console.
|
|
Useful for testing the agent runner without a full UI.
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
"""Initialize the simple state driver."""
|
|
self._streaming = False
|
|
self._spinner_visible = False
|
|
self._current_mode: str | None = None
|
|
print("[SimpleConsoleStateDriver initialized]")
|
|
|
|
def append_info_line(self, text: str, color: str | None = None) -> None:
|
|
"""Append an informational line to the output."""
|
|
color_prefix = f"[{color}]" if color else ""
|
|
print(f"{color_prefix} {text}")
|
|
|
|
def append_stream_footer(self, text: str) -> None:
|
|
"""Append a footer line after streaming ends."""
|
|
print(f"[Footer] {text}")
|
|
|
|
async def write_info_line(self, text: str, color: str | None = None) -> None:
|
|
"""Async version of append_info_line."""
|
|
self.append_info_line(text, color)
|
|
|
|
def write_user_input_echo(self, text: str) -> None:
|
|
"""Echo user input to the output."""
|
|
print(f"\n[User] {text}\n")
|
|
|
|
def begin_streaming(self) -> None:
|
|
"""Begin streaming mode."""
|
|
self._streaming = True
|
|
print("[▶ Streaming started]")
|
|
|
|
def begin_streaming_output(self) -> None:
|
|
"""Begin streaming output to the scroll panel."""
|
|
print("[▶ Streaming output started]")
|
|
|
|
def update_streaming_text(self, text: str) -> None:
|
|
"""Update the currently streaming text."""
|
|
# Truncate for readability
|
|
display_text = text[:80] + "..." if len(text) > 80 else text
|
|
print(f"[Assistant] {display_text}", end="", flush=True)
|
|
|
|
def write_text(self, text: str, color: str | None = None) -> None:
|
|
"""Write a streaming text chunk."""
|
|
print(text, end="", flush=True)
|
|
|
|
async def end_streaming_output(self) -> None:
|
|
"""End streaming output."""
|
|
print("\n[▪ Streaming output ended]")
|
|
|
|
def end_streaming(self) -> None:
|
|
"""End streaming mode."""
|
|
self._streaming = False
|
|
print("[▪ Streaming ended]")
|
|
|
|
def set_show_spinner(self, show: bool) -> None:
|
|
"""Show or hide the spinner."""
|
|
self._spinner_visible = show
|
|
status = "visible" if show else "hidden"
|
|
print(f"[Spinner: {status}]")
|
|
|
|
def set_mode(self, mode: str | None, mode_color: str | None = None) -> None:
|
|
"""Set the current mode text."""
|
|
self._current_mode = mode
|
|
color_str = f" ({mode_color})" if mode_color else ""
|
|
print(f"[Mode: {mode or 'default'}{color_str}]")
|
|
|
|
@property
|
|
def current_mode(self) -> str | None:
|
|
"""Get the current agent mode."""
|
|
return self._current_mode
|
|
|
|
def set_usage_text(self, usage_text: str | None) -> None:
|
|
"""Set the usage display text."""
|
|
if usage_text:
|
|
print(f"[Usage: {usage_text}]")
|
|
|
|
def enqueue_follow_up_action(self, action) -> None:
|
|
"""Enqueue a follow-up action.
|
|
|
|
Args:
|
|
action: The follow-up action to enqueue.
|
|
"""
|
|
action_type = type(action).__name__
|
|
print(f"[Follow-up queued: {action_type}]")
|
|
|
|
def has_pending_questions(self) -> bool:
|
|
"""Check if there are pending follow-up questions."""
|
|
return False
|
|
|
|
def take_follow_up_responses(self) -> list:
|
|
"""Take and clear all accumulated follow-up responses."""
|
|
return []
|
|
|
|
async def write_no_text_warning(self, has_follow_up_actions: bool) -> None:
|
|
"""Write a warning if no text was produced."""
|
|
if not has_follow_up_actions:
|
|
print("[▪ (no text response from agent)]")
|
|
|
|
def update_last_entry(self, entry_type, new_text: str) -> None:
|
|
"""Update the last output entry (placeholder for now).
|
|
|
|
Args:
|
|
entry_type: The type of entry to update.
|
|
new_text: The new text content.
|
|
"""
|
|
# Simplified: just print the update
|
|
display_text = new_text[:80] + "..." if len(new_text) > 80 else new_text
|
|
print(f"[Update last entry: {display_text}]", flush=True)
|
|
|
|
def request_shutdown(self) -> None:
|
|
"""Request application shutdown."""
|
|
print("[Shutdown requested]")
|
|
|
|
def replace_session(self, session) -> None:
|
|
"""Replace the active session.
|
|
|
|
Args:
|
|
session: The new session to use.
|
|
"""
|
|
print(f"[Session replaced: {getattr(session, 'id', 'unknown')}]")
|