diff --git a/python/pyrightconfig.samples.json b/python/pyrightconfig.samples.json index 4e77569f94..c2d1274e05 100644 --- a/python/pyrightconfig.samples.json +++ b/python/pyrightconfig.samples.json @@ -7,6 +7,7 @@ "**/demos/**", "**/_to_delete/**", "**/05-end-to-end/**", + "**/harness/**", "**/agent_with_foundry_tracing.py", "**/azure_responses_client_with_foundry.py" ], diff --git a/python/pyrightconfig.samples.py310.json b/python/pyrightconfig.samples.py310.json index 694abcb914..581f856c36 100644 --- a/python/pyrightconfig.samples.py310.json +++ b/python/pyrightconfig.samples.py310.json @@ -7,6 +7,7 @@ "**/demos/**", "**/_to_delete/**", "**/05-end-to-end/**", + "**/harness/**", "**/agent_with_foundry_tracing.py", "**/azure_responses_client_with_foundry.py", "**/github_copilot/**" diff --git a/python/samples/02-agents/harness/console/README.md b/python/samples/02-agents/harness/console/README.md new file mode 100644 index 0000000000..c7d3bd256b --- /dev/null +++ b/python/samples/02-agents/harness/console/README.md @@ -0,0 +1,94 @@ +# Harness Console + +A Textual-based terminal UI for running and observing AI agents built with the Agent Framework. + +## Quick Start + +```python +from console import run_agent_async, build_default_observers + +await run_agent_async( + agent=my_agent, + session=my_session, + observers=build_default_observers(), +) +``` + +See [`harness_research.py`](../harness_research.py) for a complete example. + +## Package Structure + +``` +console/ +├── __init__.py # Public API exports +├── harness_console.py # run_agent_async() entry point +├── app.py # HarnessApp (Textual application) +├── app_state.py # HarnessAppState, enums, data types +├── agent_runner.py # HarnessAgentRunner (streaming orchestration) +├── state_driver.py # IUXStateDriver protocol +├── textual_state_driver.py # Textual implementation of IUXStateDriver +├── formatters.py # Tool call formatters +├── observers/ # Lifecycle observers +│ ├── base.py # ConsoleObserver abstract base +│ ├── text_output.py # Streaming text display +│ ├── tool_call_display.py # Tool call formatting +│ ├── tool_approval.py # User approval for tool calls +│ ├── error_display.py # Error messages +│ ├── usage_display.py # Token usage tracking +│ └── reasoning_display.py # Reasoning/thinking blocks +├── components/ # Textual UI widgets +│ ├── scroll_panel.py # Conversation history +│ ├── text_input.py # User text input +│ ├── list_selection.py # Multiple choice selector +│ ├── agent_status.py # Spinner + usage display +│ └── agent_mode_help.py # Mode indicator + help text +└── commands/ # Slash command handlers + ├── base.py # CommandHandler abstract base + ├── exit_handler.py # /exit + ├── mode_handler.py # /mode [plan|execute] + ├── todo_handler.py # /todos + └── session_handler.py # /session-export, /session-import +``` + +## Public API + +| Export | Description | +|--------|-------------| +| `run_agent_async` | Main entry point — runs the Textual app with an agent | +| `build_default_observers` | Factory for the standard observer set | +| `build_default_command_handlers` | Factory for slash command handlers | +| `ConsoleObserver` | Base class for custom observers | +| `ToolCallFormatter` | Base class for custom tool formatters | +| `CommandHandler` | Base class for custom slash commands | + +## Architecture + +The console follows a unidirectional data flow: + +``` +AgentRunner → Observers → StateDriver → AppState → Textual UI + ↑ + User Input (app.py) +``` + +- **AgentRunner** streams responses from the agent and dispatches events to observers. +- **Observers** process events (text chunks, tool calls, errors) and update the state driver. +- **StateDriver** (`IUXStateDriver`) mutates `HarnessAppState` and notifies the UI. +- **Textual App** reads state and syncs widgets on each notification. + +### Key Design Choices + +| Concern | Approach | +|---------|----------| +| Rendering | Textual widgets + Rich markup (no manual ANSI) | +| State | Single `HarnessAppState` dataclass, mutated by driver | +| Streaming text | Truncate-and-rewrite on RichLog for flicker-free updates | +| Extensibility | Custom observers, formatters, and commands via base classes | +| Follow-up questions | Observer returns `FollowUpQuestion` → UI shows prompt/choices | + +## Dependencies + +- `textual` — TUI framework +- `rich` — Text formatting +- `agent-framework` — Core agent framework + diff --git a/python/samples/02-agents/harness/console/__init__.py b/python/samples/02-agents/harness/console/__init__.py new file mode 100644 index 0000000000..d65f00cfda --- /dev/null +++ b/python/samples/02-agents/harness/console/__init__.py @@ -0,0 +1,27 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Harness Console - A Textual-based TUI for AI agent interactions. + +This package provides a rich terminal interface for running and observing +AI agents, with streaming output, tool call display, follow-up questions, +and token usage tracking. +""" + +from .commands import CommandHandler, build_default_command_handlers +from .formatters import ToolCallFormatter +from .harness_console import run_agent_async +from .observers import ( + ConsoleObserver, + build_default_observers, + build_observers_with_planning, +) + +__all__ = [ + "CommandHandler", + "ConsoleObserver", + "ToolCallFormatter", + "build_default_command_handlers", + "build_default_observers", + "build_observers_with_planning", + "run_agent_async", +] diff --git a/python/samples/02-agents/harness/console/agent_runner.py b/python/samples/02-agents/harness/console/agent_runner.py new file mode 100644 index 0000000000..3b7c685dbd --- /dev/null +++ b/python/samples/02-agents/harness/console/agent_runner.py @@ -0,0 +1,343 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Agent runner orchestration for the harness console. + +This module provides the HarnessAgentRunner class, which orchestrates agent +invocations with observer lifecycle management. It handles: +- User input dispatch +- Agent streaming with observer notifications +- Follow-up action collection +- Streaming state management +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agent_framework import Agent, AgentSession + + from .app_state import FollowUpAction + from .observers.base import ConsoleObserver + from .state_driver import IUXStateDriver + + +class HarnessAgentRunner: + """Orchestrates agent invocations driven by user-input events from the UI. + + The component invokes the runner's input handlers (run_turn) directly; + the runner mutates UI state through the supplied IUXStateDriver. + + This is a minimal implementation focusing on the core agent loop without + command handling or complex message injection (those can be added later). + """ + + def __init__( + self, + agent: Agent, + observers: list[ConsoleObserver], + state_driver: IUXStateDriver, + *, + max_context_window_tokens: int | None = None, + max_output_tokens: int | None = None, + ) -> None: + """Initialize the agent runner. + + Args: + agent: The agent to orchestrate. + observers: List of console observers for lifecycle events. + state_driver: The UI state driver for observer updates. + max_context_window_tokens: Optional max context window size for usage display. + max_output_tokens: Optional max output tokens for usage display. + """ + self._agent = agent + self._observers = observers + self._ux = state_driver + self._max_context_window_tokens = max_context_window_tokens + self._max_output_tokens = max_output_tokens + self._input_gate = asyncio.Semaphore(1) # Single turn at a time + + async def run_turn( + self, + user_input: str, + session: AgentSession | None = None, + ) -> None: + """Run a single agent turn with the given user input. + + Echoes the input, then delegates to the agent loop. + + Args: + user_input: The user's input text. + session: Optional agent session for conversation history. + """ + async with self._input_gate: + self._ux.write_user_input_echo(user_input) + + from agent_framework import Message + + messages = [Message(role="user", contents=[user_input])] + await self._run_agent_loop(messages, session) + + async def start_agent_turn( + self, + messages: list, + session: AgentSession | None = None, + ) -> None: + """Resume the agent loop with pre-built messages (from follow-up responses). + + Called by the app after the user finishes answering follow-up questions. + If messages is empty, just completes the turn. + + Args: + messages: List of Message objects to send to the agent. + session: Optional agent session. + """ + async with self._input_gate: + if not messages: + self._complete_turn() + return + await self._run_agent_loop(messages, session) + + async def _run_agent_loop( + self, + messages: list, + session: AgentSession | None, + ) -> None: + """Run the agent loop, re-invoking as needed for follow-up messages. + + Loops while there are messages to send. After each stream: + - Collects follow-up actions from observers + - If questions exist → queue them and return (UI will collect answers) + - If only direct messages → loop with those messages + - If nothing → complete the turn + + Args: + messages: Initial messages to send. + session: Optional agent session. + """ + next_messages = messages + + while next_messages: + # Configure run options + options = self._configure_run_options(session) + + # Begin streaming + self._ux.begin_streaming() + self._ux.begin_streaming_output() + self._ux.set_show_spinner(True) + + try: + await self._stream_response_messages(next_messages, session, options) + except Exception as ex: + self._ux.append_info_line( + f"❌ Stream error: {ex.__class__.__name__}:\n{ex}", + color="red", + ) + + # Stop spinner and end streaming output + self._ux.set_show_spinner(False) + + # Collect follow-up actions from observers + follow_up_actions = await self._collect_follow_up_actions(session) + + # Separate direct messages from questions + has_follow_ups = len(follow_up_actions) > 0 + + # Write no-text warning if applicable + await self._ux.write_no_text_warning(has_follow_ups) + + # Enqueue all follow-up actions + for action in follow_up_actions: + self._ux.enqueue_follow_up_action(action) + + # Check if there are pending questions (UI needs user input) + if self._ux.has_pending_questions(): + # Pause — the UI will collect answers and call start_agent_turn + return + + # No questions — drain any accumulated direct messages and loop + drained = self._ux.take_follow_up_responses() + next_messages = drained if drained else None + + self._complete_turn() + + def _complete_turn(self) -> None: + """Complete the current turn (end streaming).""" + self._ux.end_streaming() + + def _configure_run_options( + self, + session: AgentSession | None, + ) -> dict: + """Configure run options via observers. + + Each observer can modify the options dict to influence agent behavior. + + Args: + session: Optional agent session. + + Returns: + Options dict for agent.run(). + """ + options = {} + for observer in self._observers: + observer.configure_run_options(options, self._agent, session) + return options + + async def _stream_response( + self, + user_input: str, + session: AgentSession | None, + options: dict, + ) -> None: + """Stream agent response from a text input and dispatch to observers. + + Args: + user_input: The user's input text. + session: Optional agent session. + options: Run options configured by observers. + """ + # Stream response using agent.run(stream=True) + stream = self._agent.run( + user_input, + stream=True, + session=session, + options=options, + ) + + # Process each update chunk + async for update in stream: + await self._dispatch_update(update, session) + + # Extract usage from the final response + self._extract_usage(stream) + + async def _stream_response_messages( + self, + messages: list, + session: AgentSession | None, + options: dict, + ) -> None: + """Stream agent response from Message objects and dispatch to observers. + + Args: + messages: List of Message objects to send. + session: Optional agent session. + options: Run options configured by observers. + """ + stream = self._agent.run( + messages, + stream=True, + session=session, + options=options, + ) + + async for update in stream: + await self._dispatch_update(update, session) + + self._extract_usage(stream) + + def _extract_usage(self, stream) -> None: + """Extract token usage from a completed stream.""" + try: + get_final = getattr(stream, "get_final_response", None) + if not get_final: + return + + import inspect + + if inspect.iscoroutinefunction(get_final): + return + + final_response = get_final() + if final_response is None: + return + + usage = getattr(final_response, "usage_details", None) + if not isinstance(usage, dict): + return + + input_tokens = usage.get("input_token_count", 0) or 0 + output_tokens = usage.get("output_token_count", 0) or 0 + if input_tokens or output_tokens: + self._ux.set_usage_text(self._format_usage(input_tokens, output_tokens)) + except (AttributeError, TypeError): + pass + + async def _dispatch_update( + self, + update, # AgentResponseUpdate + session: AgentSession | None, + ) -> None: + """Dispatch a single update to all observers. + + Calls observer lifecycle methods in order: + 1. on_response_update (once per update) + 2. on_content (for each content item) + 3. on_text (if text is present) + + Args: + update: The agent response update. + session: Optional agent session. + """ + # on_response_update + for observer in self._observers: + await observer.on_response_update(self._ux, update, self._agent, session) + + # on_content for each content item + if hasattr(update, "contents") and update.contents: + for content in update.contents: + for observer in self._observers: + await observer.on_content(self._ux, content, self._agent, session) + + # on_text for text chunks + if hasattr(update, "text") and update.text: + for observer in self._observers: + await observer.on_text(self._ux, update.text, self._agent, session) + + async def _collect_follow_up_actions( + self, + session: AgentSession | None, + ) -> list[FollowUpAction]: + """Collect follow-up actions from all observers. + + Called after streaming completes to gather any follow-up questions + or messages from observers. + + Args: + session: Optional agent session. + + Returns: + List of follow-up actions from all observers. + """ + actions: list[FollowUpAction] = [] + for observer in self._observers: + observer_actions = await observer.on_stream_complete( + self._ux, self._agent, session + ) + if observer_actions: + actions.extend(observer_actions) + return actions + + def _format_usage(self, input_tokens: int, output_tokens: int) -> str: + """Format token counts matching C# harness style: 📊 Tokens — input: X | output: Y | total: Z.""" + total_tokens = input_tokens + output_tokens + + input_budget = None + if self._max_context_window_tokens and self._max_output_tokens: + input_budget = self._max_context_window_tokens - self._max_output_tokens + + return ( + f"📊 Tokens — input: {self._format_token_count(input_tokens, input_budget)}" + f" | output: {self._format_token_count(output_tokens, self._max_output_tokens)}" + f" | total: {self._format_token_count(total_tokens, self._max_context_window_tokens)}" + ) + + @staticmethod + def _format_token_count(count: int, budget: int | None) -> str: + """Format a token count, optionally showing budget percentage.""" + if budget and budget > 0: + pct = count / budget * 100 + return f"{count:,}/{budget:,} ({pct:.1f}%)" + return f"{count:,}" diff --git a/python/samples/02-agents/harness/console/app.py b/python/samples/02-agents/harness/console/app.py new file mode 100644 index 0000000000..c56360c661 --- /dev/null +++ b/python/samples/02-agents/harness/console/app.py @@ -0,0 +1,541 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Main Textual application for the harness console. + +This module provides the HarnessApp - the main Textual application that +composes all UI components and integrates with the agent runner. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from textual import on, work +from textual.app import App, ComposeResult +from textual.binding import Binding +from textual.containers import Container, Vertical +from textual.css.query import NoMatches +from textual.widgets import Input, Static + +from .app_state import ( + BottomPanelMode, + HarnessAppState, + OutputEntryType, +) +from .components import ( + AgentModeAndHelp, + AgentStatus, + HarnessListSelection, + HarnessScrollPanel, + HarnessTextInput, + PromptRule, +) +from .textual_state_driver import HarnessConsoleUXStateDriver + +if TYPE_CHECKING: + from agent_framework import Agent, AgentSession + + from .agent_runner import HarnessAgentRunner + from .commands import CommandHandler + from .observers.base import ConsoleObserver + + +class HarnessApp(App[None]): + """Main Textual application for the harness console. + + Composes the scroll panel (conversation history), status bar (spinner, usage), + mode/help display, and bottom panel (text input, list selection, or streaming + indicator). Routes user input to the agent runner. + """ + + CSS = """ + Screen { + background: $background; + } + + #scroll-panel { + height: 1fr; + padding: 0 1; + background: transparent; + } + + #bottom-panel { + height: auto; + } + + #text-input-container { + height: 1; + display: block; + } + + #list-selection-container { + height: auto; + max-height: 12; + display: none; + } + + #streaming-indicator { + height: 1; + display: none; + } + + #status-bar { + height: 1; + } + + #mode-help { + height: 1; + } + + #top-rule { + height: 1; + } + + #bottom-rule { + height: 1; + } + + #separator-rule { + height: 1; + } + + #text-input { + height: 1; + } + + .hidden { + display: none; + } + + .visible { + display: block; + } + + .input-field { + border: none; + padding: 0; + min-height: 1; + height: 1; + background: transparent; + } + + .input-field:focus { + border: none; + background: transparent; + } + + .prompt-container { + height: 1; + } + + .prompt-label { + width: 2; + min-width: 2; + height: 1; + } + """ + + BINDINGS = [ + Binding("ctrl+c", "quit", "Quit", show=False), + Binding("ctrl+q", "quit", "Quit", show=False), + ] + + def __init__( + self, + agent: Agent, + observers: list[ConsoleObserver], + session: AgentSession | None = None, + mode_colors: dict[str, str] | None = None, + initial_mode: str | None = None, + placeholder: str = "Type a message and press Enter...", + title: str = "Harness Console", + max_context_window_tokens: int | None = None, + max_output_tokens: int | None = None, + command_handlers: list[CommandHandler] | None = None, + ) -> None: + """Initialize the harness console application. + + Args: + agent: The agent to run. + observers: List of console observers. + session: Optional agent session. + mode_colors: Optional mode color mapping. + initial_mode: Initial agent mode. + placeholder: Input placeholder text. + title: Application title. + max_context_window_tokens: Optional max context window tokens for usage display. + max_output_tokens: Optional max output tokens for usage display. + command_handlers: Optional list of command handlers. If None, auto-detected. + """ + super().__init__() + self.title = title + self._agent = agent + self._observers = observers + self._session = session + self._mode_colors = mode_colors + self._initial_mode = initial_mode + self._placeholder = placeholder + self._max_context_window_tokens = max_context_window_tokens + self._max_output_tokens = max_output_tokens + + # Build command handlers + if command_handlers is None: + from .commands import build_default_command_handlers + + self._command_handlers = build_default_command_handlers( + agent, mode_colors=mode_colors + ) + else: + self._command_handlers = command_handlers + + # Compute help text from command handlers + help_parts = [ + h.get_help_text() + for h in self._command_handlers + if h.get_help_text() is not None + ] + help_text = ", ".join(help_parts) if help_parts else None + + # State and driver + self._app_state = HarnessAppState( + placeholder=placeholder, + mode_text=initial_mode, + help_text=help_text, + ) + self._ux_driver = HarnessConsoleUXStateDriver( + app_state=self._app_state, + on_state_changed=self._on_state_changed, + mode_colors=mode_colors, + ) + + # Agent runner (created after init) + self._runner: HarnessAgentRunner | None = None + + @property + def ux_driver(self) -> HarnessConsoleUXStateDriver: + """Get the UX state driver.""" + return self._ux_driver + + @property + def runner(self) -> HarnessAgentRunner | None: + """Get the agent runner.""" + return self._runner + + def compose(self) -> ComposeResult: + """Compose the application layout.""" + with Vertical(): + # Main scroll panel for conversation history + yield HarnessScrollPanel(id="scroll-panel") + + # Blank line separating scroll content from status area + yield Static(" ", id="separator-rule") + + # Status bar (spinner + usage) + yield AgentStatus(id="status-bar") + + # Top rule (mode-colored) + yield PromptRule(id="top-rule") + + # Bottom panel - switches between text input, list selection, streaming + with Container(id="bottom-panel"): + # Text input (default) + with Container(id="text-input-container"): + text_input = HarnessTextInput(id="text-input") + text_input.placeholder = self._placeholder + yield text_input + + # List selection (for follow-up questions) + with Container(id="list-selection-container"): + yield HarnessListSelection(id="list-selection") + + # Bottom rule (mode-colored) + yield PromptRule(id="bottom-rule") + + # Mode and help + yield AgentModeAndHelp(id="mode-help") + + def on_mount(self) -> None: + """Initialize after mount.""" + # Create agent runner now that everything is set up + from .agent_runner import HarnessAgentRunner + + self._runner = HarnessAgentRunner( + agent=self._agent, + observers=self._observers, + state_driver=self._ux_driver, + max_context_window_tokens=self._max_context_window_tokens, + max_output_tokens=self._max_output_tokens, + ) + + # Set initial mode + if self._initial_mode: + self._ux_driver.current_mode = self._initial_mode + + # Focus the text input + try: + text_input = self.query_one("#text-input", HarnessTextInput) + text_input.focus_input() + except NoMatches: + pass + + # Set initial rule colors and mode display + self._sync_mode_help() + + # --- Event handlers --- + + @on(HarnessTextInput.Submitted) + def on_text_submitted(self, event: HarnessTextInput.Submitted) -> None: + """Handle text input submission.""" + text = event.value.strip() + if not text: + return + + if self._app_state.pending_questions: + # Answer the current follow-up question + self._handle_follow_up_answer(text) + elif self._app_state.mode == BottomPanelMode.STREAMING: + # Input during streaming (message injection placeholder) + pass + elif text.startswith("/"): + # Try command handlers + self._try_command_handlers(text) + else: + # Normal user input - run agent turn + self._run_agent_turn(text) + + @work(exclusive=True, thread=False) + async def _try_command_handlers(self, text: str) -> None: + """Try each command handler; fall through to agent if none match.""" + session = self._session + if session is None: + # No session — fall through to agent turn + self._run_agent_turn(text) + return + + for handler in self._command_handlers: + if await handler.try_handle(text, session, self._ux_driver): + # Command handled — check for shutdown/session swap signals + self._process_command_signals() + return + + # No handler matched — treat as normal agent input + self._run_agent_turn(text) + + def _process_command_signals(self) -> None: + """Check and process signals set by command handlers.""" + if self._app_state.shutdown_requested: + self.exit() + return + + if self._app_state.replaced_session is not None: + self._session = self._app_state.replaced_session # type: ignore[assignment] + self._app_state.replaced_session = None + self._ux_driver.append_info_line("Session replaced.") + + self._sync_ui_from_state() + + @on(HarnessListSelection.Selected) + def on_list_selected(self, event: HarnessListSelection.Selected) -> None: + """Handle list selection.""" + self._handle_follow_up_answer(event.value) + + # --- Agent turn --- + + @work(exclusive=True, thread=False) + async def _run_agent_turn(self, text: str) -> None: + """Run an agent turn in a background worker.""" + if self._runner is None: + return + + await self._runner.run_turn(text, session=self._session) + + # After turn completes, check for follow-up questions + self._sync_ui_from_state() + + # --- Follow-up question handling --- + + @work(exclusive=True, thread=False) + async def _handle_follow_up_answer(self, answer: str) -> None: + """Handle a user's answer to a follow-up question.""" + if not self._app_state.pending_questions: + return + + question = self._app_state.pending_questions[0] + + # Call the continuation + result_message = await question.continuation(answer, self._ux_driver) + + # Add result to accumulated responses + if result_message is not None: + self._ux_driver.add_follow_up_response(result_message) + + # Advance to next question + self._ux_driver.advance_follow_up_question() + + # If no more questions, resume the agent with accumulated responses + if not self._app_state.pending_questions: + responses = self._ux_driver.take_follow_up_responses() + if responses and self._runner: + await self._runner.start_agent_turn(responses, session=self._session) + + self._sync_ui_from_state() + + # --- State synchronization --- + + def _on_state_changed(self) -> None: + """Called by state driver when state changes - schedule UI sync. + + Since the agent runner uses @work(thread=False), state changes happen + on the main event loop. We use call_later to batch updates. + """ + self.call_later(self._sync_ui_from_state) + + def _sync_ui_from_state(self) -> None: + """Synchronize UI components with current application state.""" + state = self._app_state + + # Update scroll panel with new entries + self._sync_scroll_panel() + + # Update bottom panel mode + self._sync_bottom_panel(state.mode) + + # Hide status bar and mode/help during list selection (matching C#) + is_list_mode = state.mode == BottomPanelMode.LIST_SELECTION + self._sync_chrome_visibility(not is_list_mode) + + # Update status bar + self._sync_status_bar() + + # Update mode/help display + self._sync_mode_help() + + def _sync_scroll_panel(self) -> None: + """Sync the scroll panel with output entries.""" + try: + panel = self.query_one("#scroll-panel", HarnessScrollPanel) + except NoMatches: + return + + entries = self._app_state.output_entries + rendered_count = getattr(self, "_rendered_entry_count", 0) + + if rendered_count < len(entries): + # There are new entries to render + for entry in entries[rendered_count:]: + if entry.type == OutputEntryType.STREAMING_TEXT: + panel.set_streaming_entry(entry) + else: + # End any active streaming before appending other entry types + panel.end_streaming() + panel.append_entry(entry) + self._rendered_entry_count = len(entries) + elif rendered_count == len(entries) and entries: + # Same count — check if the last entry is a streaming entry that was mutated + last_entry = entries[-1] + if last_entry.type == OutputEntryType.STREAMING_TEXT: + panel.set_streaming_entry(last_entry) + + def _sync_bottom_panel(self, mode: BottomPanelMode) -> None: + """Switch the bottom panel between text input, list, and streaming.""" + try: + text_container = self.query_one("#text-input-container") + list_container = self.query_one("#list-selection-container") + except NoMatches: + return + + if mode == BottomPanelMode.TEXT_INPUT: + text_container.display = True + list_container.display = False + # Restore focus to text input + try: + text_input = self.query_one("#text-input", HarnessTextInput) + text_input.focus_input() + except NoMatches: + pass + elif mode == BottomPanelMode.LIST_SELECTION: + text_container.display = False + list_container.display = True + self._sync_list_selection() + elif mode == BottomPanelMode.STREAMING: + text_container.display = True + list_container.display = False + + def _sync_list_selection(self) -> None: + """Sync the list selection widget with state.""" + try: + list_widget = self.query_one("#list-selection", HarnessListSelection) + except NoMatches: + return + + state = self._app_state + list_widget.title = state.list_selection_title or "" + list_widget.options = list(state.list_selection_options) + list_widget.allow_custom_text = state.list_selection_custom_text_placeholder is not None + + if state.list_selection_custom_text_placeholder: + try: + custom_input = list_widget.query_one("#custom-input", Input) + custom_input.placeholder = state.list_selection_custom_text_placeholder + except Exception: + pass + + # Focus the option list so keyboard navigation works immediately + list_widget.focus_list() + + def _sync_status_bar(self) -> None: + """Sync the status bar with state.""" + try: + status = self.query_one("#status-bar", AgentStatus) + except NoMatches: + return + + state = self._app_state + status.show_spinner = state.show_spinner + status.usage_text = state.usage_text or "" + + def _sync_mode_help(self) -> None: + """Sync the mode/help display and rule colors with state.""" + try: + mode_help = self.query_one("#mode-help", AgentModeAndHelp) + except NoMatches: + return + + state = self._app_state + mode_help.mode = state.mode_text or "" + mode_help.mode_color = state.mode_color or "blue" + mode_help.help_text = state.help_text or "" + + # Sync rule colors to match mode + color = state.mode_color or "cyan" + try: + top_rule = self.query_one("#top-rule", PromptRule) + top_rule.rule_color = color + except NoMatches: + pass + + try: + bottom_rule = self.query_one("#bottom-rule", PromptRule) + bottom_rule.rule_color = color + except NoMatches: + pass + + def _sync_chrome_visibility(self, visible: bool) -> None: + """Show or hide chrome elements (status bar, mode/help). + + During list selection mode, these are hidden to give more vertical + space to the scroll panel and list picker. + + Args: + visible: Whether chrome elements should be visible. + """ + import contextlib + + with contextlib.suppress(NoMatches): + self.query_one("#status-bar", AgentStatus).display = visible + with contextlib.suppress(NoMatches): + self.query_one("#mode-help", AgentModeAndHelp).display = visible + + # --- Rendering count tracking --- + + _rendered_entry_count: int = 0 diff --git a/python/samples/02-agents/harness/console/app_state.py b/python/samples/02-agents/harness/console/app_state.py new file mode 100644 index 0000000000..4d0e0efb36 --- /dev/null +++ b/python/samples/02-agents/harness/console/app_state.py @@ -0,0 +1,260 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Application state and core data types for the harness console. + +This module defines enums, dataclasses, follow-up action types, and the +HarnessAppState dataclass which holds all UI state that may change during +application execution. The state driver mutates this state to coordinate +between the agent runner and the Textual UI components. +""" + +from __future__ import annotations + +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agent_framework import Message + + from .state_driver import IUXStateDriver + + +# region Enums + + +class OutputEntryType(Enum): + """Type of output entry in the console conversation.""" + + USER_INPUT = "user_input" + """User input echo (e.g., 'You: hello').""" + + STREAMING_TEXT = "streaming_text" + """In-progress streaming text from the agent (accumulated chunk by chunk).""" + + INFO_LINE = "info_line" + """Informational line (tool calls, errors, usage, approval requests, etc.).""" + + STREAM_FOOTER = "stream_footer" + """Stream footer (e.g., '(no text response from agent)').""" + + PENDING_MESSAGE = "pending_message" + """Pending injected message notification.""" + + +class BottomPanelMode(Enum): + """Mode of the bottom panel UI.""" + + TEXT_INPUT = "text_input" + """Show text input for user messages.""" + + LIST_SELECTION = "list_selection" + """Show choice list for user selection.""" + + STREAMING = "streaming" + """Show 'streaming...' indicator while agent is generating.""" + + +# endregion + +# region Output Entry + + +@dataclass +class OutputEntry: + """A single output entry in the console conversation history. + + Used internally by the state driver to track conversation output, + including streaming text, tool calls, errors, and user input echoes. + + Args: + type: The type of output entry. + text: The text content of the entry. + color: Optional Rich color string (e.g., "cyan", "red", "dim"). + """ + + type: OutputEntryType + text: str + color: str | None = None + + +# endregion + +# region Follow-Up Actions + + +class FollowUpAction: + """Base class for follow-up actions returned by observers. + + Follow-up actions describe either a question to ask the user + (via FollowUpQuestion subclasses) or a message to add directly + to the next agent input (FollowUpMessage). + """ + + pass + + +@dataclass +class FollowUpQuestion(FollowUpAction): + """A question to ask the user with a continuation. + + The continuation delegate is invoked with the user's answer and the + UX state driver, and returns an optional Message to add to the next + agent invocation. + + Args: + prompt: The question text shown to the user. + continuation: Async function invoked with the user's answer and state driver. + Returns an optional Message to add to the next agent input. + """ + + prompt: str + continuation: Callable[[str, IUXStateDriver], Awaitable[Message | None]] + + +@dataclass +class TextFollowUpQuestion(FollowUpQuestion): + """A free-form text question. + + The user may type any response. This is the base FollowUpQuestion type + with no additional constraints. + """ + + pass + + +@dataclass +class ChoiceFollowUpQuestion(FollowUpQuestion): + """A multiple choice question. + + The user picks from the provided choices, with an optional ability to + enter custom text when allow_custom_text is True. + + Args: + prompt: The question text shown to the user. + choices: List of pre-defined choices. + allow_custom_text: If True, the user may type a custom response in + addition to the listed choices. + continuation: Async function invoked with the user's choice/text and + state driver. Returns an optional Message to add to the next agent input. + """ + + choices: list[str] + allow_custom_text: bool = False + + +@dataclass +class FollowUpMessage(FollowUpAction): + """A message to add directly to the next agent invocation without prompting. + + Used when an observer wants to inject a message into the conversation + without user interaction (e.g., automatic tool results, system messages). + + Args: + message: The Message to add to the conversation. + """ + + message: Message + + +# endregion + +# region Application State + + +@dataclass +class HarnessAppState: + """All UI state for the harness console application. + + This state is mutated by the UX state driver and read by the Textual + app to update the UI. + """ + + # --- Bottom panel mode --- + + mode: BottomPanelMode = BottomPanelMode.TEXT_INPUT + """Which component is shown in the bottom panel.""" + + # --- Follow-up question queue --- + + pending_questions: list[FollowUpQuestion] = field(default_factory=list) + """Queue of follow-up questions waiting for user answers. + + The head ([0]) is the question currently being displayed; subsequent items + are dispatched in order as each is answered. + """ + + accumulated_follow_up_responses: list[Message] = field(default_factory=list) + """Accumulated follow-up response messages collected during the current agent turn. + + Both direct FollowUpMessages emitted by observers and continuation results + from answered questions. Consumed by the runner via take_follow_up_responses(). + """ + + # --- Text input (active in TextInput / Streaming modes) --- + + prompt: str = "> " + """The prompt string for text input mode.""" + + placeholder: str = "" + """Placeholder text shown when the input is empty.""" + + input_text: str = "" + """The current input text being typed.""" + + input_enabled: bool = True + """Whether input is enabled (disabled during streaming without injection).""" + + streaming_prompt: str = "(agent is running...)" + """The prompt to show during streaming when input is disabled.""" + + # --- List selection (active in ListSelection mode) --- + + list_selection_title: str | None = None + """Title text displayed above the list selection.""" + + list_selection_options: list[str] = field(default_factory=list) + """The list selection options.""" + + list_selection_index: int = 0 + """The highlighted option index in list selection mode.""" + + list_selection_custom_text_placeholder: str | None = None + """Placeholder text for the custom text input option in the list.""" + + list_selection_custom_input_text: str = "" + """Current text being typed into the list's custom text option.""" + + # --- Scroll / output area --- + + output_entries: list[OutputEntry] = field(default_factory=list) + """Output entries in the scroll area conversation history.""" + + queued_items: list[str] = field(default_factory=list) + """Queued input items to display (pending injected messages).""" + + # --- Agent mode + status display --- + + mode_color: str | None = None + """Rich color string for the rule borders and mode label.""" + + mode_text: str | None = None + """Current mode name displayed (e.g., 'plan', 'execute').""" + + help_text: str | None = None + """Help text displayed below the bottom rule (available commands).""" + + show_spinner: bool = False + """Whether the agent status spinner is visible.""" + + usage_text: str | None = None + """Formatted token usage text to display in the status bar.""" + + # --- Command handler signals --- + + shutdown_requested: bool = False + """Set to True when /exit is invoked; the app should exit.""" + + replaced_session: object | None = None + """When set, the app should swap its session to this AgentSession.""" diff --git a/python/samples/02-agents/harness/console/commands/__init__.py b/python/samples/02-agents/harness/console/commands/__init__.py new file mode 100644 index 0000000000..b9823cd4b4 --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/__init__.py @@ -0,0 +1,65 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Command handler package for the harness console. + +Provides slash-command handling (e.g., /exit, /mode, /todos, /session-export) +that intercepts user input before it reaches the agent. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .base import CommandHandler +from .exit_handler import ExitCommandHandler +from .mode_handler import ModeCommandHandler +from .session_handler import SessionCommandHandler +from .todo_handler import TodoCommandHandler + +if TYPE_CHECKING: + from agent_framework import Agent + +__all__ = [ + "CommandHandler", + "ExitCommandHandler", + "ModeCommandHandler", + "SessionCommandHandler", + "TodoCommandHandler", + "build_default_command_handlers", +] + + +def build_default_command_handlers( + agent: Agent, + *, + mode_colors: dict[str, str] | None = None, +) -> list[CommandHandler]: + """Build the default set of command handlers by inspecting the agent. + + Auto-detects TodoProvider and AgentModeProvider from the agent's + context_providers list. + + Args: + agent: The agent to inspect for providers. + mode_colors: Optional mapping of mode names to Rich color strings. + + Returns: + List of command handlers in evaluation order. + """ + from agent_framework import AgentModeProvider, TodoProvider + + todo_provider: TodoProvider | None = None + mode_provider: AgentModeProvider | None = None + + for provider in getattr(agent, "context_providers", []): + if isinstance(provider, TodoProvider) and todo_provider is None: + todo_provider = provider + elif isinstance(provider, AgentModeProvider) and mode_provider is None: + mode_provider = provider + + return [ + ExitCommandHandler(), + TodoCommandHandler(todo_provider), + ModeCommandHandler(mode_provider, mode_colors), + SessionCommandHandler(), + ] diff --git a/python/samples/02-agents/harness/console/commands/base.py b/python/samples/02-agents/harness/console/commands/base.py new file mode 100644 index 0000000000..bedbc2fcc6 --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/base.py @@ -0,0 +1,58 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Abstract base class for console command handlers. + +Command handlers intercept user input starting with '/' and execute +local commands before input reaches the agent. They are checked in order; +the first handler that accepts the input prevents further handlers from +being checked. +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from agent_framework import AgentSession + + from ..state_driver import IUXStateDriver + + +class CommandHandler(ABC): + """Base class for console command handlers. + + Subclasses implement get_help_text() for the mode bar and + try_handle() to intercept matching commands. + """ + + @abstractmethod + def get_help_text(self) -> str | None: + """Get the help text for this command. + + Displayed in the mode-and-help bar. Return None if the + command is not currently available. + + Returns: + Help text like '/todos (show todo list)', or None. + """ + ... + + @abstractmethod + async def try_handle( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> bool: + """Attempt to handle the given user input. + + Args: + user_input: The raw user input string. + session: The current agent session. + ux: The UX state driver for rendering output. + + Returns: + True if this handler handled the input; False otherwise. + """ + ... diff --git a/python/samples/02-agents/harness/console/commands/exit_handler.py b/python/samples/02-agents/harness/console/commands/exit_handler.py new file mode 100644 index 0000000000..2bc46e180d --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/exit_handler.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Exit command handler — /exit to quit the console.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .base import CommandHandler + +if TYPE_CHECKING: + from agent_framework import AgentSession + + from ..state_driver import IUXStateDriver + + +class ExitCommandHandler(CommandHandler): + """Handle the /exit command to shut down the console application.""" + + def get_help_text(self) -> str | None: + """Return help text for the exit command.""" + return "/exit (quit)" + + async def try_handle( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> bool: + """Handle /exit by requesting shutdown.""" + if user_input.strip().lower() != "/exit": + return False + + ux.request_shutdown() + return True diff --git a/python/samples/02-agents/harness/console/commands/mode_handler.py b/python/samples/02-agents/harness/console/commands/mode_handler.py new file mode 100644 index 0000000000..b4abc836bb --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/mode_handler.py @@ -0,0 +1,81 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Mode command handler — /mode to show or switch agent mode.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .base import CommandHandler + +if TYPE_CHECKING: + from agent_framework import AgentModeProvider, AgentSession + + from ..state_driver import IUXStateDriver + + +class ModeCommandHandler(CommandHandler): + """Handle the /mode command to display or switch the current agent mode.""" + + def __init__( + self, + mode_provider: AgentModeProvider | None, + mode_colors: dict[str, str] | None = None, + ) -> None: + """Initialize with mode provider and color mapping. + + Args: + mode_provider: The mode provider, or None if not available. + mode_colors: Optional mapping of mode names to Rich color strings. + """ + self._mode_provider = mode_provider + self._mode_colors = mode_colors or {} + + def get_help_text(self) -> str | None: + """Return help text, or None if mode provider is unavailable.""" + if self._mode_provider is None: + return None + return "/mode [plan|execute] (show or switch mode)" + + async def try_handle( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> bool: + """Handle /mode [name] command.""" + stripped = user_input.strip() + lower = stripped.lower() + + if not (lower == "/mode" or lower.startswith("/mode ")): + return False + + if self._mode_provider is None: + ux.append_info_line("AgentModeProvider is not available.") + return True + + parts = stripped.split(None, 1) + if len(parts) < 2: + # Show current mode + from agent_framework import get_agent_mode + + current = get_agent_mode(session) + ux.append_info_line(f"Current mode: {current}") + return True + + # Switch mode + new_mode = parts[1].strip() + try: + from agent_framework import set_agent_mode + + normalized = set_agent_mode(session, new_mode) + color = self._mode_colors.get(normalized) + ux.set_mode(normalized, color) + ux.append_info_line( + f"Switched to {normalized} mode.", + color=color, + ) + except ValueError as ex: + ux.append_info_line(str(ex), color="red") + + return True diff --git a/python/samples/02-agents/harness/console/commands/session_handler.py b/python/samples/02-agents/harness/console/commands/session_handler.py new file mode 100644 index 0000000000..5d9ca45a74 --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/session_handler.py @@ -0,0 +1,107 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Session command handler — /session-export and /session-import.""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING + +from .base import CommandHandler + +if TYPE_CHECKING: + from agent_framework import AgentSession + + from ..state_driver import IUXStateDriver + + +class SessionCommandHandler(CommandHandler): + """Handle /session-export and /session-import commands.""" + + def get_help_text(self) -> str | None: + """Return help text for session commands.""" + return "/session-export | /session-import " + + async def try_handle( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> bool: + """Handle session export/import commands.""" + stripped = user_input.strip() + command = stripped.split(None, 1)[0].lower() if stripped else "" + + if command == "/session-export": + await self._handle_export(stripped, session, ux) + return True + + if command == "/session-import": + await self._handle_import(stripped, ux) + return True + + return False + + async def _handle_export( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> None: + """Export the current session to a JSON file.""" + parts = user_input.split(None, 1) + if len(parts) < 2: + ux.append_info_line("Usage: /session-export ") + return + + filename = parts[1].strip() + try: + serialized = session.to_dict() + json_str = json.dumps(serialized, indent=2) + self._write_file(filename, json_str) + ux.append_info_line(f"Session exported to {filename}") + except Exception as ex: + ux.append_info_line( + f"Failed to export session to {filename}: {ex}", + color="red", + ) + + async def _handle_import( + self, + user_input: str, + ux: IUXStateDriver, + ) -> None: + """Import a session from a JSON file.""" + parts = user_input.split(None, 1) + if len(parts) < 2: + ux.append_info_line("Usage: /session-import ") + return + + filename = parts[1].strip() + try: + from agent_framework import AgentSession + + json_str = self._read_file(filename) + data = json.loads(json_str) + new_session = AgentSession.from_dict(data) + ux.replace_session(new_session) + ux.append_info_line(f"Session imported from {filename}") + except FileNotFoundError: + ux.append_info_line(f"File not found: {filename}", color="red") + except Exception as ex: + ux.append_info_line( + f"Failed to import session from {filename}: {ex}", + color="red", + ) + + @staticmethod + def _write_file(filename: str, content: str) -> None: + """Write content to a file (sync helper to satisfy ASYNC230).""" + with open(filename, "w", encoding="utf-8") as f: # noqa: ASYNC230 + f.write(content) + + @staticmethod + def _read_file(filename: str) -> str: + """Read content from a file (sync helper to satisfy ASYNC230).""" + with open(filename, encoding="utf-8") as f: # noqa: ASYNC230 + return f.read() diff --git a/python/samples/02-agents/harness/console/commands/todo_handler.py b/python/samples/02-agents/harness/console/commands/todo_handler.py new file mode 100644 index 0000000000..73703e6db3 --- /dev/null +++ b/python/samples/02-agents/harness/console/commands/todo_handler.py @@ -0,0 +1,66 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Todo command handler — /todos to display the todo list.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .base import CommandHandler + +if TYPE_CHECKING: + from agent_framework import AgentSession, TodoProvider + + from ..state_driver import IUXStateDriver + + +class TodoCommandHandler(CommandHandler): + """Handle the /todos command to display the current todo list.""" + + def __init__(self, todo_provider: TodoProvider | None) -> None: + """Initialize with the todo provider. + + Args: + todo_provider: The todo provider, or None if not available. + """ + self._todo_provider = todo_provider + + def get_help_text(self) -> str | None: + """Return help text, or None if todo provider is unavailable.""" + if self._todo_provider is None: + return None + return "/todos (show todo list)" + + async def try_handle( + self, + user_input: str, + session: AgentSession, + ux: IUXStateDriver, + ) -> bool: + """Handle /todos by displaying the todo list.""" + if user_input.strip().lower() != "/todos": + return False + + if self._todo_provider is None: + ux.append_info_line("TodoProvider is not available.") + return True + + todos = await self._todo_provider.store.load_items( + session, source_id=self._todo_provider.source_id + ) + + if not todos: + ux.append_info_line("No todos yet.") + return True + + ux.append_info_line("── Todo List ──") + for item in todos: + status = "✓" if item.is_complete else "○" + color = "dim" if item.is_complete else None + description = f" — {item.description}" if item.description else "" + ux.append_info_line( + f"[{status}] #{item.id} {item.title}{description}", + color=color, + ) + + return True diff --git a/python/samples/02-agents/harness/console/components/__init__.py b/python/samples/02-agents/harness/console/components/__init__.py new file mode 100644 index 0000000000..e7fd63e77d --- /dev/null +++ b/python/samples/02-agents/harness/console/components/__init__.py @@ -0,0 +1,23 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""UI components for the harness console. + +This module provides Textual widgets for building the harness console UI, +including status displays, input fields, choice selectors, and scrolling panels. +""" + +from .agent_status import AgentStatus +from .list_selection import HarnessListSelection +from .mode_help import AgentModeAndHelp +from .prompt_rule import PromptRule +from .scroll_panel import HarnessScrollPanel +from .text_input import HarnessTextInput + +__all__ = [ + "AgentStatus", + "AgentModeAndHelp", + "HarnessListSelection", + "PromptRule", + "HarnessScrollPanel", + "HarnessTextInput", +] diff --git a/python/samples/02-agents/harness/console/components/agent_status.py b/python/samples/02-agents/harness/console/components/agent_status.py new file mode 100644 index 0000000000..34ac521a52 --- /dev/null +++ b/python/samples/02-agents/harness/console/components/agent_status.py @@ -0,0 +1,66 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Agent status widget with spinner animation and usage statistics.""" + +from __future__ import annotations + +from textual.reactive import reactive +from textual.widgets import Static + + +class AgentStatus(Static): + """Agent status bar with animated spinner and token usage display. + + Displays an animated braille pattern spinner when the agent is active, + along with token usage statistics. The component automatically updates + the spinner animation at ~10fps for smooth visual feedback. + + Attributes: + show_spinner: Whether to display the animated spinner. + usage_text: Token usage text to display (e.g., "1.2K in / 856 out"). + """ + + # Braille pattern spinner frames for smooth animation + SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] + + show_spinner: reactive[bool] = reactive(False) + usage_text: reactive[str] = reactive("") + + def __init__(self, **kwargs) -> None: + """Initialize the agent status widget.""" + super().__init__(**kwargs) + self._spinner_index = 0 + + def on_mount(self) -> None: + """Start the spinner animation timer when the widget is mounted.""" + # Update spinner at ~10fps (every 0.1 seconds) + self.set_interval(0.1, self._advance_spinner) + + def _advance_spinner(self) -> None: + """Advance the spinner to the next frame.""" + if self.show_spinner: + self._spinner_index = (self._spinner_index + 1) % len(self.SPINNER_FRAMES) + self.refresh() + + def render(self) -> str: + """Render the status bar with spinner and usage text. + + Returns: + Formatted string with Rich markup for spinner and usage display. + """ + if not self.show_spinner and not self.usage_text: + return "" + + parts = [] + + if self.show_spinner: + frame = self.SPINNER_FRAMES[self._spinner_index] + parts.append(f"[cyan]{frame}[/cyan]") + else: + # Keep consistent spacing when spinner is off + parts.append(" ") + + if self.usage_text: + parts.append(f"[dim]{self.usage_text}[/dim]") + + return " ".join(parts) diff --git a/python/samples/02-agents/harness/console/components/list_selection.py b/python/samples/02-agents/harness/console/components/list_selection.py new file mode 100644 index 0000000000..47c4975002 --- /dev/null +++ b/python/samples/02-agents/harness/console/components/list_selection.py @@ -0,0 +1,269 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""List selection widget with optional custom text input.""" + +from __future__ import annotations + +from textual import on +from textual.app import ComposeResult +from textual.binding import Binding +from textual.containers import Container +from textual.css.query import NoMatches +from textual.events import Key +from textual.message import Message +from textual.reactive import reactive +from textual.widget import Widget +from textual.widgets import Input, Label, OptionList +from textual.widgets.option_list import Option + + +class HarnessListSelection(Widget): + """List selection widget with numbered choices and optional custom text input. + + Displays a title, a list of numbered choices that can be selected via + keyboard navigation or number keys (1-9), and an optional custom text + input field at the bottom. + + All child nodes (title label, option list, custom input) are always + present in the DOM; visibility is toggled via reactive watchers. + + Navigation: + - Down arrow on last list item moves focus to the custom text input + - Up arrow on the custom text input moves focus back to the option list + - When custom input has focus, the option list highlight is cleared + + Attributes: + title: The title text displayed above the options. + options: List of option strings to display. + allow_custom_text: Whether to show a custom text input field. + """ + + DEFAULT_CSS = """ + HarnessListSelection { + height: auto; + max-height: 12; + } + + HarnessListSelection .list-selection-container { + height: auto; + } + + HarnessListSelection #selection-title { + height: auto; + color: $text; + text-style: bold; + padding: 0 0 0 0; + } + + HarnessListSelection #option-list { + height: auto; + max-height: 8; + border: none; + padding: 0; + } + + HarnessListSelection #custom-input { + height: auto; + min-height: 1; + margin-top: 0; + border: tall transparent; + } + + HarnessListSelection #custom-input:focus { + border: tall $accent; + } + """ + + BINDINGS = [ + Binding("1", "select_option(0)", "Select option 1", show=False), + Binding("2", "select_option(1)", "Select option 2", show=False), + Binding("3", "select_option(2)", "Select option 3", show=False), + Binding("4", "select_option(3)", "Select option 4", show=False), + Binding("5", "select_option(4)", "Select option 5", show=False), + Binding("6", "select_option(5)", "Select option 6", show=False), + Binding("7", "select_option(6)", "Select option 7", show=False), + Binding("8", "select_option(7)", "Select option 8", show=False), + Binding("9", "select_option(8)", "Select option 9", show=False), + ] + + title: reactive[str] = reactive("") + options: reactive[list[str]] = reactive(list, always_update=True) + allow_custom_text: reactive[bool] = reactive(False) + + class Selected(Message): + """Message sent when an option is selected. + + Attributes: + value: The selected option text or custom text. + """ + + def __init__(self, value: str) -> None: + """Initialize the Selected message. + + Args: + value: The selected option text or custom text. + """ + self.value = value + super().__init__() + + def compose(self) -> ComposeResult: + """Compose the widget — all nodes are always present. + + Yields: + Title label (hidden if empty), option list, custom input (hidden by default). + """ + with Container(classes="list-selection-container"): + yield Label("", id="selection-title") + yield OptionList(id="option-list") + yield Input( + placeholder="Or type a custom response...", + id="custom-input", + ) + + def on_mount(self) -> None: + """Configure initial visibility after mount.""" + title_label = self.query_one("#selection-title", Label) + title_label.display = bool(self.title) + + custom_input = self.query_one("#custom-input", Input) + custom_input.display = self.allow_custom_text + + self._update_options() + + def on_key(self, event: Key) -> None: + """Handle key navigation between option list and custom input. + + Args: + event: The key event. + """ + if not self.allow_custom_text: + return + + option_list = self.query_one("#option-list", OptionList) + custom_input = self.query_one("#custom-input", Input) + + # Down arrow on last item → move to custom input + if event.key == "down" and option_list.has_focus: + last_index = option_list.option_count - 1 + if last_index >= 0 and option_list.highlighted == last_index: + option_list.highlighted = None # type: ignore[assignment] + custom_input.focus() + event.prevent_default() + event.stop() + + # Up arrow on custom input → move back to option list (last item) + elif event.key == "up" and custom_input.has_focus: + last_index = option_list.option_count - 1 + if last_index >= 0: + option_list.highlighted = last_index + option_list.focus() + event.prevent_default() + event.stop() + + @on(Input.Changed, "#custom-input") + def on_custom_input_focused_or_changed(self, event: Input.Changed) -> None: + """Clear option list highlight when user is typing in custom input. + + Args: + event: The input changed event. + """ + option_list = self.query_one("#option-list", OptionList) + option_list.highlighted = None # type: ignore[assignment] + + def watch_title(self, new_title: str) -> None: + """Update the title label when the title changes. + + Args: + new_title: The new title text. + """ + try: + label = self.query_one("#selection-title", Label) + label.update(new_title) + label.display = bool(new_title) + except NoMatches: + pass + + def watch_options(self, new_options: list[str]) -> None: + """Update the option list when options change. + + Args: + new_options: The new list of options. + """ + import contextlib + + with contextlib.suppress(NoMatches): + self._update_options() + + def watch_allow_custom_text(self, allow: bool) -> None: + """Show/hide the custom input field. + + Args: + allow: Whether to show the custom text input. + """ + try: + custom_input = self.query_one("#custom-input", Input) + custom_input.display = allow + except NoMatches: + pass + + def _update_options(self) -> None: + """Update the OptionList with numbered options.""" + try: + option_list = self.query_one("#option-list", OptionList) + option_list.clear_options() + + for i, option_text in enumerate(self.options): + display_text = f"{i + 1}. {option_text}" if i < 9 else f" {option_text}" + option_list.add_option(Option(display_text, id=str(i))) + except NoMatches: + pass + + @on(OptionList.OptionSelected) + def on_option_selected(self, event: OptionList.OptionSelected) -> None: + """Handle option selection from the list. + + Args: + event: The OptionList.OptionSelected event. + """ + option_index = int(event.option.id or "0") + if 0 <= option_index < len(self.options): + selected_value = self.options[option_index] + self.post_message(self.Selected(selected_value)) + + @on(Input.Submitted) + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle custom text input submission. + + Args: + event: The Input.Submitted event. + """ + if self.allow_custom_text and event.value: + self.post_message(self.Selected(event.value)) + event.input.clear() + + def action_select_option(self, index: int) -> None: + """Select an option by index (0-based). + + Args: + index: The option index to select. + """ + if 0 <= index < len(self.options): + selected_value = self.options[index] + self.post_message(self.Selected(selected_value)) + + def focus_list(self) -> None: + """Focus the option list.""" + try: + option_list = self.query_one("#option-list", OptionList) + option_list.focus() + except NoMatches: + pass + + def focus_custom_input(self) -> None: + """Focus the custom text input field.""" + if self.allow_custom_text: + try: + custom_input = self.query_one("#custom-input", Input) + custom_input.focus() + except NoMatches: + pass diff --git a/python/samples/02-agents/harness/console/components/mode_help.py b/python/samples/02-agents/harness/console/components/mode_help.py new file mode 100644 index 0000000000..162c671710 --- /dev/null +++ b/python/samples/02-agents/harness/console/components/mode_help.py @@ -0,0 +1,48 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Agent mode and help text display widget.""" + +from __future__ import annotations + +from rich.text import Text +from textual.reactive import reactive +from textual.widgets import Static + + +class AgentModeAndHelp(Static): + """Widget displaying the current agent mode and help text. + + Shows the current agent mode (e.g., "plan", "execute") in a colored label, + followed by available commands and help text in a dimmed style. Used in + the fixed bottom area of the console. + + Attributes: + mode: Current mode name (e.g., "plan", "execute"), or None if no mode. + mode_color: Rich color string for the mode label (e.g., "yellow", "green"). + help_text: Help text to display (e.g., "/exit to quit, /mode to switch"). + """ + + mode: reactive[str | None] = reactive(None) + mode_color: reactive[str] = reactive("yellow") + help_text: reactive[str] = reactive("") + + def render(self) -> Text: + """Render the mode indicator and help text. + + Returns: + Rich Text object with styled mode and help display. + """ + result = Text() + + if self.mode: + result.append(f"[{self.mode}]", style=self.mode_color) + + if self.help_text: + if self.mode: + result.append(" ") + result.append(self.help_text, style="dim") + + if not result.plain: + result.append(" ") + + return result diff --git a/python/samples/02-agents/harness/console/components/prompt_rule.py b/python/samples/02-agents/harness/console/components/prompt_rule.py new file mode 100644 index 0000000000..7b7d151ce1 --- /dev/null +++ b/python/samples/02-agents/harness/console/components/prompt_rule.py @@ -0,0 +1,31 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Mode-colored horizontal rule.""" + +from __future__ import annotations + +from textual.reactive import reactive +from textual.widgets import Static + + +class PromptRule(Static): + """A full-width horizontal rule colored by the current agent mode. + + Renders a line of '─' characters across the terminal width, + colored to match the current mode (e.g., cyan for plan, green for execute). + + Attributes: + rule_color: Rich color string for the rule (e.g., "cyan", "green"). + """ + + rule_color: reactive[str] = reactive("cyan") + + def render(self) -> str: + """Render the horizontal rule. + + Returns: + Formatted string with Rich markup. + """ + color = self.rule_color + width = self.size.width or 80 + return f"[{color}]{'─' * width}[/{color}]" diff --git a/python/samples/02-agents/harness/console/components/scroll_panel.py b/python/samples/02-agents/harness/console/components/scroll_panel.py new file mode 100644 index 0000000000..a9cf15a774 --- /dev/null +++ b/python/samples/02-agents/harness/console/components/scroll_panel.py @@ -0,0 +1,127 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Scrolling panel for conversation history display.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from textual.widgets import RichLog + +if TYPE_CHECKING: + from ..app_state import OutputEntry + + +class HarnessScrollPanel(RichLog): + """Scrolling panel for displaying conversation history. + + Uses Textual's RichLog widget for efficient append-only rendering with + Rich text formatting support. Automatically scrolls to the bottom when + new entries are added. + + For streaming text, the panel uses a truncate-and-rewrite strategy: it + tracks where streaming began in the RichLog lines list, and on each update + truncates back to that point and rewrites the full accumulated text as a + single write. This ensures consistent rendering without line-break artifacts + between streamed chunks. + """ + + def __init__(self, **kwargs) -> None: + """Initialize the scroll panel. + + Args: + **kwargs: Additional arguments passed to RichLog. + """ + super().__init__( + **kwargs, + auto_scroll=True, # Automatically scroll to bottom + wrap=True, # Wrap long lines instead of horizontal scroll + markup=True, # Enable Rich markup + highlight=True, # Enable syntax highlighting + ) + self._entries: list[OutputEntry] = [] + self._is_streaming = False + self._streaming_line_start: int = 0 + + def append_entry(self, entry: OutputEntry) -> None: + """Append a new output entry to the conversation history. + + Args: + entry: The output entry to append. + """ + self._entries.append(entry) + text = self._format_entry(entry) + self.write(text) + + def set_streaming_entry(self, entry: OutputEntry) -> None: + """Set or update the current streaming entry. + + On each update, truncates the RichLog back to where streaming + started, then rewrites the full streaming text as a single block. + This ensures no spurious line breaks between chunks while avoiding + a full rewrite of all entries. + + Args: + entry: The streaming entry (will be mutated externally). + """ + if not self._is_streaming: + # First streaming chunk — record where streaming lines begin + self._is_streaming = True + self._entries.append(entry) + self._streaming_line_start = len(self.lines) + + # Truncate lines back to where streaming started + if len(self.lines) > self._streaming_line_start: + del self.lines[self._streaming_line_start:] + from textual.geometry import Size + + self.virtual_size = Size(self._widest_line_width, len(self.lines)) + + # Write full streaming text as a single renderable + formatted = self._format_text(entry.text, entry.color) + self.write(formatted) + + def end_streaming(self) -> None: + """End the current streaming mode.""" + if self._is_streaming: + self._is_streaming = False + self._streaming_line_start = 0 + + def _rewrite_all(self) -> None: + """Clear and rewrite all entries from scratch.""" + self.clear() + for entry in self._entries: + self.write(self._format_entry(entry)) + + def _format_entry(self, entry: OutputEntry) -> str: + """Format an output entry with Rich markup. + + Args: + entry: The entry to format. + + Returns: + Formatted string with Rich markup for color and styling. + """ + return self._format_text(entry.text, entry.color) + + @staticmethod + def _format_text(text: str, color: str | None) -> str: + """Format text with optional Rich color markup. + + Args: + text: The text to format. + color: Optional Rich color name. + + Returns: + Formatted string. + """ + if color: + return f"[{color}]{text}[/{color}]" + return text + + def clear_history(self) -> None: + """Clear all conversation history from the panel.""" + self._entries.clear() + self._is_streaming = False + self._streaming_line_start = 0 + self.clear() diff --git a/python/samples/02-agents/harness/console/components/text_input.py b/python/samples/02-agents/harness/console/components/text_input.py new file mode 100644 index 0000000000..b7c415d1fb --- /dev/null +++ b/python/samples/02-agents/harness/console/components/text_input.py @@ -0,0 +1,102 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Text input widget with inline prompt for the harness console.""" + +from __future__ import annotations + +from textual import on +from textual.app import ComposeResult +from textual.containers import Horizontal +from textual.message import Message +from textual.reactive import reactive +from textual.widget import Widget +from textual.widgets import Input, Label + + +class HarnessTextInput(Widget): + """Text input widget with a prompt label on the left. + + Displays a prompt (e.g., "> ") followed by a borderless input field. + Sits between the two mode-colored horizontal rules. + + Attributes: + prompt: The prompt text displayed on the left (e.g., "> "). + placeholder: Placeholder text shown when the input is empty. + """ + + prompt: reactive[str] = reactive("> ") + placeholder: reactive[str] = reactive("") + + class Submitted(Message): + """Message sent when the input is submitted. + + Attributes: + value: The submitted text value. + """ + + def __init__(self, value: str) -> None: + """Initialize the Submitted message. + + Args: + value: The submitted text value. + """ + self.value = value + super().__init__() + + def compose(self) -> ComposeResult: + """Compose the prompt label and input field. + + Yields: + A horizontal container with the prompt and input field. + """ + with Horizontal(classes="prompt-container"): + yield Label(self.prompt, classes="prompt-label", id="prompt-label") + yield Input(placeholder=self.placeholder, classes="input-field", id="input-field") + + def watch_prompt(self, new_prompt: str) -> None: + """Update the prompt label when the prompt attribute changes. + + Args: + new_prompt: The new prompt text. + """ + try: + label = self.query_one("#prompt-label", Label) + label.update(new_prompt) + except Exception: + pass + + def watch_placeholder(self, new_placeholder: str) -> None: + """Update the input placeholder when the placeholder attribute changes. + + Args: + new_placeholder: The new placeholder text. + """ + try: + input_field = self.query_one("#input-field", Input) + input_field.placeholder = new_placeholder + except Exception: + # Input doesn't exist yet (before compose), ignore + pass + + @on(Input.Submitted) + def on_input_submitted(self, event: Input.Submitted) -> None: + """Handle input submission. + + Clears the input field and posts a Submitted message with the value. + + Args: + event: The Input.Submitted event. + """ + value = event.value + event.input.clear() + self.post_message(self.Submitted(value)) + + def focus_input(self) -> None: + """Focus the input field.""" + input_field = self.query_one(".input-field", Input) + input_field.focus() + + def clear_input(self) -> None: + """Clear the input field.""" + input_field = self.query_one(".input-field", Input) + input_field.clear() diff --git a/python/samples/02-agents/harness/console/formatters.py b/python/samples/02-agents/harness/console/formatters.py new file mode 100644 index 0000000000..47327c6637 --- /dev/null +++ b/python/samples/02-agents/harness/console/formatters.py @@ -0,0 +1,503 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tool call formatters for displaying function calls in the harness console. + +This module provides formatters that convert raw function call content into +human-readable display strings. Each formatter handles specific tool patterns +(e.g., web_search, todos_*, etc.) and the FallbackToolFormatter provides +generic formatting for any unmatched tools. + +Usage: + from harness.console.formatters import build_default_formatters, format_tool_call + from agent_framework import Content + + call = Content.from_function_call( + call_id="call_1", + name="web_search", + arguments={"query": "Python async"} + ) + formatters = build_default_formatters() + result = format_tool_call(formatters, call) # "web_search (Python async)" +""" + +from __future__ import annotations + +import contextlib +import json +from abc import ABC, abstractmethod +from typing import Any + +from agent_framework import Content + +# region Helper Functions + + +def get_argument_value(call: Content, param_name: str) -> Any: + """Extract an argument value from a function call. + + Handles both dict and JSON string arguments. + + Args: + call: The function call content. + param_name: The parameter name to extract. + + Returns: + The argument value, or None if not found. + """ + if call.arguments is None: + return None + + if isinstance(call.arguments, str): + # arguments is a JSON string, parse it + try: + args_dict = json.loads(call.arguments) + except (json.JSONDecodeError, TypeError): + return None + if not isinstance(args_dict, dict): + return None + elif isinstance(call.arguments, dict): + args_dict = call.arguments + else: + return None + + return args_dict.get(param_name) + + +def as_int_list(value: Any) -> list[int] | None: + """Convert a value to a list of integers, or None if not possible. + + Args: + value: The value to convert (should be a list). + + Returns: + A list of integers, or None if conversion fails. + """ + if not isinstance(value, list): + return None + + result: list[int] = [] + for item in value: + if isinstance(item, int): + result.append(item) + else: + with contextlib.suppress(ValueError, TypeError): + result.append(int(item)) + + return result if result else None + + +def as_dict_list(value: Any) -> list[dict[str, Any]] | None: + """Convert a value to a list of dicts, or None if not possible. + + Args: + value: The value to convert (should be a list). + + Returns: + A list of dicts, or None if value is not a list of dicts. + """ + if not isinstance(value, list): + return None + + result: list[dict[str, Any]] = [] + for item in value: + if isinstance(item, dict): + result.append(item) + + return result if result else None + + +def truncate(text: str, max_length: int) -> str: + """Truncate a string to the specified maximum length, appending an ellipsis if truncated. + + Args: + text: The text to truncate. + max_length: The maximum length. + + Returns: + The truncated string. + """ + return text if len(text) <= max_length else text[:max_length] + "…" + + +# endregion + +# region Base Class + + +class ToolCallFormatter(ABC): + """Base class for tool call formatters that produce human-readable display strings + for function call content items shown in the console. + """ + + @abstractmethod + def can_format(self, call: Content) -> bool: + """Return True if this formatter can handle the given function call. + + Args: + call: The function call content to check. + + Returns: + True if this formatter should be used; otherwise False. + """ + ... + + @abstractmethod + def format_detail(self, call: Content) -> str | None: + """Return the detail portion of the formatted output for the given tool call, + or None if only the tool name should be displayed. + + Args: + call: The function call content to format. + + Returns: + A detail string to append after the tool name, or None. + """ + ... + + +# endregion + +# region Concrete Formatters + + +class FallbackToolFormatter(ToolCallFormatter): + """Catch-all formatter that handles any tool not matched by a more specific formatter. + + Displays a generic summary of the tool's arguments. This formatter should always be + placed last in the formatter list. + """ + + def can_format(self, call: Content) -> bool: + """Always returns True - this formatter matches everything.""" + return True + + def format_detail(self, call: Content) -> str | None: + """Format arguments as generic (key: value, ...) pairs.""" + if call.arguments is None: + return None + + # Parse arguments + if isinstance(call.arguments, str): + try: + args_dict = json.loads(call.arguments) + except (json.JSONDecodeError, TypeError): + return None + elif isinstance(call.arguments, dict): + args_dict = call.arguments + else: + return None + + if not args_dict: + return None + + # Build argument list + parts: list[str] = [] + for key, value in args_dict.items(): + if value is None: + continue + + # Convert value to string + if isinstance(value, bool): + str_value = "true" if value else "false" + elif isinstance(value, (int, float)): + str_value = str(value) + elif isinstance(value, str): + str_value = value + else: + # Complex types - skip for now + continue + + parts.append(f"{key}: {truncate(str_value, 40)}") + + return f"({', '.join(parts)})" if parts else None + + +class WebSearchToolFormatter(ToolCallFormatter): + """Formats web_search tool calls, showing the search query.""" + + def can_format(self, call: Content) -> bool: + """Match web_search tool calls.""" + return call.name == "web_search" + + def format_detail(self, call: Content) -> str | None: + """Extract and format the query parameter.""" + value = get_argument_value(call, "query") + return f"({value})" if value else None + + +class TodoToolFormatter(ToolCallFormatter): + """Formats todos_* tool calls with tree-view output for added items + and structured output for complete/remove operations. + """ + + def can_format(self, call: Content) -> bool: + """Match todos_* tool calls.""" + return call.name is not None and call.name.startswith("todos_") + + def format_detail(self, call: Content) -> str | None: + """Format based on the specific todos operation.""" + if call.name == "todos_add": + return self._format_add_todos(call) + if call.name == "todos_complete": + return self._format_complete_todos(call) + if call.name == "todos_remove": + return self._format_id_list(call, "ids", "Remove") + return None + + def _format_add_todos(self, call: Content) -> str | None: + """Format todos_add with tree view of titles.""" + todos = as_dict_list(get_argument_value(call, "todos")) + if not todos: + return None + + titles: list[str] = [] + for todo in todos: + title = todo.get("title") + if title and isinstance(title, str): + titles.append(title) + + if not titles: + return None + + # Build tree view + count = len(titles) + plural = "s" if count != 1 else "" + lines = [f"({count} item{plural})"] + for i, title in enumerate(titles): + connector = "├─" if i < count - 1 else "└─" + lines.append(f"\n {connector} {title}") + + return "".join(lines) + + def _format_complete_todos(self, call: Content) -> str | None: + """Format todos_complete with tree view of IDs and reasons.""" + items = as_dict_list(get_argument_value(call, "items")) + if not items: + return None + + entries: list[tuple[int, str | None]] = [] + for item in items: + todo_id = item.get("id") + if not isinstance(todo_id, int): + continue + + reason = item.get("reason") + reason_str = str(reason) if reason is not None and not isinstance(reason, str) else reason + entries.append((todo_id, reason_str)) + + if not entries: + return None + + # Build tree view + lines: list[str] = [] + for i, (todo_id, reason) in enumerate(entries): + connector = "├─" if i < len(entries) - 1 else "└─" + line = f"\n {connector} Complete #{todo_id}" + if reason: + line += f" — {truncate(reason, 80)}" + lines.append(line) + + return "".join(lines) + + def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None: + """Format a list of IDs with a verb (e.g., Remove #1, Remove #2).""" + ids = as_int_list(get_argument_value(call, param_name)) + if not ids: + return None + + lines: list[str] = [] + for i, todo_id in enumerate(ids): + connector = "├─" if i < len(ids) - 1 else "└─" + lines.append(f"\n {connector} {verb} #{todo_id}") + + return "".join(lines) + + +class ModeToolFormatter(ToolCallFormatter): + """Formats AgentMode_* tool calls, showing the target mode for Set operations.""" + + def can_format(self, call: Content) -> bool: + """Match AgentMode_* tool calls.""" + return call.name is not None and call.name.startswith("AgentMode_") + + def format_detail(self, call: Content) -> str | None: + """Format based on the specific AgentMode operation.""" + if call.name == "AgentMode_Set": + value = get_argument_value(call, "mode") + return f"({value})" if value else None + return None + + +class BackgroundAgentToolFormatter(ToolCallFormatter): + """Formats BackgroundAgents_* tool calls with human-readable details + for task start, continue, wait, and result retrieval operations. + """ + + def can_format(self, call: Content) -> bool: + """Match BackgroundAgents_* tool calls.""" + return call.name is not None and call.name.startswith("BackgroundAgents_") + + def format_detail(self, call: Content) -> str | None: + """Format based on the specific BackgroundAgents operation.""" + if call.name == "BackgroundAgents_StartTask": + return self._format_start_background_task(call) + if call.name == "BackgroundAgents_WaitForFirstCompletion": + return self._format_id_list(call, "taskIds", "Wait for") + if call.name == "BackgroundAgents_GetTaskResults": + return self._format_single_id(call, "taskId") + if call.name == "BackgroundAgents_ContinueTask": + return self._format_continue_task(call) + if call.name == "BackgroundAgents_ClearCompletedTask": + return self._format_single_id(call, "taskId") + return None + + def _format_start_background_task(self, call: Content) -> str | None: + """Format StartTask with agent name and description.""" + agent_name = get_argument_value(call, "agentName") + description = get_argument_value(call, "description") + + if agent_name is None and description is None: + return None + + lines: list[str] = [] + + if agent_name is not None and description is not None: + lines.append(f"\n ├─ Agent: {agent_name}") + lines.append(f'\n └─ "{truncate(description, 80)}"') + elif agent_name is not None: + lines.append(f"\n └─ Agent: {agent_name}") + else: + lines.append(f'\n └─ "{truncate(description, 80)}"') # type: ignore[arg-type] + + return "".join(lines) + + def _format_id_list(self, call: Content, param_name: str, verb: str) -> str | None: + """Format a list of task IDs with a verb.""" + ids = as_int_list(get_argument_value(call, param_name)) + if not ids: + return None + + lines: list[str] = [] + for i, task_id in enumerate(ids): + connector = "├─" if i < len(ids) - 1 else "└─" + lines.append(f"\n {connector} {verb} #{task_id}") + + return "".join(lines) + + def _format_single_id(self, call: Content, param_name: str) -> str | None: + """Format a single task ID in parentheses.""" + task_id = get_argument_value(call, param_name) + if isinstance(task_id, int): + return f"(task #{task_id})" + return None + + def _format_continue_task(self, call: Content) -> str | None: + """Format ContinueTask with task ID and optional text.""" + task_id = get_argument_value(call, "taskId") + text = get_argument_value(call, "text") + + if not isinstance(task_id, int): + return None + + if text: + lines = [ + f"\n ├─ Task #{task_id}", + f'\n └─ "{truncate(text, 80)}"', + ] + return "".join(lines) + + return f"\n └─ Task #{task_id}" + + +class FileMemoryToolFormatter(ToolCallFormatter): + """Formats FileMemory_* tool calls, showing file names and search patterns + with tree-view corners for save operations. + """ + + def can_format(self, call: Content) -> bool: + """Match FileMemory_* tool calls.""" + return call.name is not None and call.name.startswith("FileMemory_") + + def format_detail(self, call: Content) -> str | None: + """Format based on the specific FileMemory operation.""" + if call.name == "FileMemory_SaveFile": + return self._format_save_file(call) + if call.name in ("FileMemory_ReadFile", "FileMemory_DeleteFile"): + value = get_argument_value(call, "fileName") + return f"({value})" if value else None + if call.name == "FileMemory_SearchFiles": + return self._format_search_files(call) + return None + + def _format_save_file(self, call: Content) -> str | None: + """Format SaveFile with file name and description indicator.""" + file_name = get_argument_value(call, "fileName") + description = get_argument_value(call, "description") + + if not file_name: + return None + + if description: + return f"\n └─ {file_name} (with description)" + return f"\n └─ {file_name}" + + def _format_search_files(self, call: Content) -> str | None: + """Format SearchFiles with regex pattern and optional file pattern.""" + pattern = get_argument_value(call, "regexPattern") + file_pattern = get_argument_value(call, "filePattern") + + if not pattern: + return None + + if file_pattern: + return f"(/{pattern}/ in {file_pattern})" + return f"(/{pattern}/)" + + +# endregion + +# region Public API Functions + + +def format_tool_call(formatters: list[ToolCallFormatter], call: Content) -> str: + """Format a tool call using the first matching formatter from the provided list. + + Returns "{toolName} {detail}" when a formatter produces detail, + or just "{toolName}" otherwise. + + Args: + formatters: List of formatters to try in order. + call: The function call content to format. + + Returns: + Formatted string representation of the tool call. + """ + for formatter in formatters: + if formatter.can_format(call): + detail = formatter.format_detail(call) + tool_name = call.name or "Unknown" + return f"{tool_name} {detail}" if detail is not None else tool_name + + return call.name or "Unknown" + + +def build_default_formatters() -> list[ToolCallFormatter]: + """Create the default list of tool call formatters. + + The FallbackToolFormatter is always last. Users can call this function + and combine the result with their own formatters. + + Returns: + A list of all built-in tool call formatters. + """ + return [ + TodoToolFormatter(), + ModeToolFormatter(), + BackgroundAgentToolFormatter(), + FileMemoryToolFormatter(), + WebSearchToolFormatter(), + FallbackToolFormatter(), + ] + + +# endregion diff --git a/python/samples/02-agents/harness/console/harness_console.py b/python/samples/02-agents/harness/console/harness_console.py new file mode 100644 index 0000000000..07b49c2151 --- /dev/null +++ b/python/samples/02-agents/harness/console/harness_console.py @@ -0,0 +1,87 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Main entry point for the harness console. + +Provides the top-level run_agent_async() function that creates and runs +the Textual-based harness console application. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .app import HarnessApp +from .observers import build_default_observers + +if TYPE_CHECKING: + from agent_framework import Agent, AgentSession + + from .commands import CommandHandler + from .observers.base import ConsoleObserver + + +async def run_agent_async( + agent: Agent, + *, + session: AgentSession | None = None, + observers: list[ConsoleObserver] | None = None, + command_handlers: list[CommandHandler] | None = None, + mode_colors: dict[str, str] | None = None, + initial_mode: str | None = None, + placeholder: str = "Type a message and press Enter...", + title: str = "Harness Console", + max_context_window_tokens: int | None = None, + max_output_tokens: int | None = None, +) -> None: + """Run the harness console with the given agent. + + This is the main entry point for the harness console. Creates a Textual + application with the configured observers and runs it until the user exits. + + Args: + agent: The agent to run conversations with. + session: Optional agent session for conversation history. + observers: List of console observers. If None, uses defaults. + command_handlers: List of command handlers. If None, auto-detected from agent. + mode_colors: Mapping of mode names to Rich color strings. + initial_mode: Initial agent mode text. + placeholder: Input placeholder text. + title: Application title. + max_context_window_tokens: Optional max context window size for usage display. + max_output_tokens: Optional max output tokens for usage display. + + Example: + .. code-block:: python + + from agent_framework import Agent + from agent_framework.openai import OpenAIChatClient + from console import run_agent_async + + agent = Agent( + client=OpenAIChatClient(), + instructions="You are helpful.", + ) + + await run_agent_async(agent) + """ + resolved_observers = observers or build_default_observers() + resolved_mode_colors = mode_colors or { + "plan": "cyan", + "execute": "green", + } + resolved_session = session or agent.create_session() + + app = HarnessApp( + agent=agent, + observers=resolved_observers, + session=resolved_session, + mode_colors=resolved_mode_colors, + initial_mode=initial_mode, + placeholder=placeholder, + title=title, + max_context_window_tokens=max_context_window_tokens, + max_output_tokens=max_output_tokens, + command_handlers=command_handlers, + ) + + await app.run_async() diff --git a/python/samples/02-agents/harness/console/observers/__init__.py b/python/samples/02-agents/harness/console/observers/__init__.py new file mode 100644 index 0000000000..7200939d74 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/__init__.py @@ -0,0 +1,122 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Console observers for agent streaming lifecycle. + +This module provides observers that display events during agent streaming +and collect follow-up actions. All observers use the IUXStateDriver interface +to update the UI. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .base import ConsoleObserver +from .error_display import ErrorDisplayObserver +from .planning_output import PlanningOutputObserver +from .reasoning_display import ReasoningDisplayObserver +from .text_output import TextOutputObserver +from .tool_approval import ToolApprovalObserver +from .tool_call_display import ToolCallDisplayObserver +from .usage_display import UsageDisplayObserver + +if TYPE_CHECKING: + from agent_framework import Agent + + +def build_default_observers() -> list[ConsoleObserver]: + """Build the default set of observers for the harness console. + + Returns a standard observer list covering: + - Text output (streaming text display) + - Tool call display (formatted tool invocations) + - Error display (error messages) + - Usage display (token counts) + - Reasoning display (reasoning/thinking blocks) + - Tool approval (user approval for tool calls) + + Note: PlanningOutputObserver is NOT included here because it requires + a mode_provider. Use build_observers_with_planning() for agents that + have an AgentModeProvider (i.e. agents created with create_harness_agent). + + Returns: + List of default console observers. + """ + return [ + TextOutputObserver(), + ToolCallDisplayObserver(), + ErrorDisplayObserver(), + UsageDisplayObserver(), + ReasoningDisplayObserver(), + ToolApprovalObserver(), + ] + + +def build_observers_with_planning( + agent: Agent, + plan_mode_name: str = "plan", + execution_mode_name: str = "execute", + *, + mode_colors: dict[str, str] | None = None, +) -> list[ConsoleObserver]: + """Build observers with planning support (structured output in plan mode). + + Replaces TextOutputObserver with PlanningOutputObserver, which configures + structured JSON output via response_format when in plan mode. This enables + the list picker UI for clarification and approval questions. + + Requires that the agent has an AgentModeProvider in its context_providers + (automatically added by create_harness_agent). + + Args: + agent: The agent to resolve the AgentModeProvider from. + plan_mode_name: The mode name that represents planning mode. + execution_mode_name: The mode name to switch to on approval. + mode_colors: Optional mapping of mode names to Rich color strings. + + Returns: + List of observers with planning support. + + Raises: + ValueError: If the agent has no AgentModeProvider. + """ + from agent_framework import AgentModeProvider + + mode_provider = next( + (p for p in agent.context_providers if isinstance(p, AgentModeProvider)), + None, + ) + if mode_provider is None: + msg = ( + "Planning observers require an AgentModeProvider on the agent. " + "Use create_harness_agent() or add AgentModeProvider to context_providers." + ) + raise ValueError(msg) + + return [ + ToolCallDisplayObserver(), + ToolApprovalObserver(), + ErrorDisplayObserver(), + ReasoningDisplayObserver(), + UsageDisplayObserver(), + PlanningOutputObserver( + mode_provider, + plan_mode_name, + execution_mode_name, + mode_colors=mode_colors, + ), + ] + + +__all__ = [ + "ConsoleObserver", + "ErrorDisplayObserver", + "PlanningOutputObserver", + "ReasoningDisplayObserver", + "TextOutputObserver", + "ToolApprovalObserver", + "ToolCallDisplayObserver", + "UsageDisplayObserver", + "build_default_observers", + "build_observers_with_planning", +] diff --git a/python/samples/02-agents/harness/console/observers/base.py b/python/samples/02-agents/harness/console/observers/base.py new file mode 100644 index 0000000000..40169ed4ae --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/base.py @@ -0,0 +1,125 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Base class for console observers. + +Observers participate in the agent streaming lifecycle, displaying events +and optionally returning follow-up actions. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from agent_framework import Agent, Content, Message + + from ..app_state import FollowUpAction + from ..state_driver import IUXStateDriver + + +class ConsoleObserver: + """Base class for console observers. + + Observers participate in the agent streaming lifecycle, displaying + events (tool calls, errors, reasoning, etc.) and optionally returning + follow-up actions (questions, approval requests). + + All methods have default no-op implementations, so subclasses only + override the methods they need. + """ + + def configure_run_options( + self, + options: dict[str, Any], + agent: Agent, + session: Any, + ) -> None: + """Configure run options before agent invocation. + + Override to set options such as response_format, max_tokens, etc. + + Args: + options: Dictionary of chat options to modify. + agent: The AI agent. + session: The agent session. + """ + pass + + async def on_response_update( + self, + ux: IUXStateDriver, + update: Message, + agent: Agent, + session: Any, + ) -> None: + """Called for each response update chunk. + + Override to inspect update-level metadata or handle provider-specific + events in the raw representation. + + Args: + ux: The UX state driver for UI updates. + update: The message update chunk. + agent: The AI agent. + session: The agent session. + """ + pass + + async def on_content( + self, + ux: IUXStateDriver, + content: Content, + agent: Agent, + session: Any, + ) -> None: + """Called for each content item in the response. + + Override to handle specific content types (function calls, errors, etc.). + + Args: + ux: The UX state driver for UI updates. + content: The content item from the response. + agent: The AI agent. + session: The agent session. + """ + pass + + async def on_text( + self, + ux: IUXStateDriver, + text: str, + agent: Agent, + session: Any, + ) -> None: + """Called for each text chunk in the response. + + Override to accumulate and display streaming text. + + Args: + ux: The UX state driver for UI updates. + text: The text chunk. + agent: The AI agent. + session: The agent session. + """ + pass + + async def on_stream_complete( + self, + ux: IUXStateDriver, + agent: Agent, + session: Any, + ) -> list[FollowUpAction] | None: + """Called when streaming completes. + + Override to return follow-up actions (questions to ask the user, + messages to inject into the next turn, etc.). + + Args: + ux: The UX state driver for UI updates. + agent: The AI agent. + session: The agent session. + + Returns: + Optional list of follow-up actions to queue, or None. + """ + return None diff --git a/python/samples/02-agents/harness/console/observers/error_display.py b/python/samples/02-agents/harness/console/observers/error_display.py new file mode 100644 index 0000000000..aa8bfc747c --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/error_display.py @@ -0,0 +1,72 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Error display observer for showing errors.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent, Content + + from ..state_driver import IUXStateDriver + + +class ErrorDisplayObserver(ConsoleObserver): + """Displays error content from the agent response. + + Shows errors with an ❌ prefix in red to make them easily visible. + """ + + async def on_content( + self, + ux: IUXStateDriver, + content: Content, + agent: Agent, + session: Any, + ) -> None: + """Display error content. + + Args: + ux: The UX state driver for UI updates. + content: The content item to check for errors. + agent: The AI agent. + session: The agent session. + """ + # Check if this is an error content type + # The exact content type check depends on the agent framework's Content class + if hasattr(content, "type") and content.type == "error": + error_text = self._format_error(content) + ux.append_info_line(error_text, "red") + elif getattr(content, "error", None): + error_text = f"❌ Error: {content.error}" # type: ignore[reportAttributeAccessIssue] + ux.append_info_line(error_text, "red") + + def _format_error(self, content: Content) -> str: + """Format error content for display. + + Args: + content: The error content. + + Returns: + Formatted error string. + """ + error_text = "❌ Error" + + # Try to extract error message + if hasattr(content, "message"): + error_text += f": {content.message}" + elif hasattr(content, "text"): + error_text += f": {content.text}" + + # Try to add error code if available + if hasattr(content, "error_code") and content.error_code: + error_text += f" (code: {content.error_code})" + + # Try to add details if available + if hasattr(content, "details") and getattr(content, "details", None): + error_text += f" — {content.details}" # type: ignore[reportAttributeAccessIssue] + + return error_text diff --git a/python/samples/02-agents/harness/console/observers/planning_models.py b/python/samples/02-agents/harness/console/observers/planning_models.py new file mode 100644 index 0000000000..9b4a92e575 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/planning_models.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Pydantic models for structured planning output. + +These models define the JSON schema that the agent produces when in planning +mode via `response_format`. The schema enables consistent rendering of +clarification questions and approval requests in the console UI. +""" + +from __future__ import annotations + +from enum import Enum + +from pydantic import BaseModel, Field + + +class PlanningResponseType(str, Enum): + """Type of planning response from the agent.""" + + CLARIFICATION = "clarification" + """The agent needs clarification and presents options for the user to choose from.""" + + APPROVAL = "approval" + """The agent is seeking approval to proceed with execution.""" + + +class PlanningQuestion(BaseModel): + """A single question or item within a PlanningResponse. + + For clarification: contains the question text and optional choices. + For approval: contains the plan summary for the user to approve. + """ + + message: str = Field( + description=( + "For clarifications, this has the question that needs to be clarified " + "with the user. For approvals, this would contain a summary of the " + "execution plan that the user needs to approve." + ), + ) + choices: list[str] | None = Field( + default=None, + description=( + "For clarifications, this has a list of options that the user can " + "choose from. null for approvals." + ), + ) + + +class PlanningResponse(BaseModel): + """Structured response from the agent while in planning mode. + + Used with structured output (`response_format`) to enable consistent + rendering of clarification questions and approval requests. + """ + + type: PlanningResponseType = Field( + description=( + "Use 'clarification' when you need clarification around the user " + "request and you want to present the user with options to choose from. " + "Use 'approval' when you are ready to start execution, but need " + "approval to start executing." + ), + ) + questions: list[PlanningQuestion] = Field( + description=( + "For clarifications, this has one or more questions to ask the user " + "(each with choices). For approvals, this has exactly one item " + "containing the plan summary for the user to approve." + ), + ) diff --git a/python/samples/02-agents/harness/console/observers/planning_output.py b/python/samples/02-agents/harness/console/observers/planning_output.py new file mode 100644 index 0000000000..f47bafcb15 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/planning_output.py @@ -0,0 +1,242 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Planning output observer for structured agent responses in plan mode. + +In planning mode, this observer configures structured JSON output via +response_format, collects streamed text silently, then deserializes the +result as a PlanningResponse to present clarification/approval questions. + +In execution mode, text is streamed through directly. +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +from rich.markup import escape + +from ..app_state import ( + ChoiceFollowUpQuestion, + FollowUpAction, + TextFollowUpQuestion, +) +from .base import ConsoleObserver +from .planning_models import PlanningResponse, PlanningResponseType + +if TYPE_CHECKING: + from agent_framework import Agent, AgentModeProvider, Message + + from ..state_driver import IUXStateDriver + + +class PlanningOutputObserver(ConsoleObserver): + """Mode-aware observer that uses structured output in plan mode. + + In planning mode: + - Configures response_format to PlanningResponse schema + - Collects streamed text silently + - Deserializes JSON into PlanningResponse + - Builds follow-up questions (clarification or approval) + + In execution mode: + - Streams text directly to the UX driver + + If JSON parsing fails, falls back to rendering the raw text as regular + output so the user always sees what the agent produced. + """ + + def __init__( + self, + mode_provider: AgentModeProvider, + plan_mode_name: str, + execution_mode_name: str, + *, + mode_colors: dict[str, str] | None = None, + ) -> None: + """Initialize the planning output observer. + + Args: + mode_provider: The mode provider for reading/switching modes. + plan_mode_name: The mode name that represents planning mode. + execution_mode_name: The mode name to switch to on approval. + mode_colors: Optional mapping of mode names to Rich color strings. + """ + self._mode_provider = mode_provider + self._plan_mode_name = plan_mode_name + self._execution_mode_name = execution_mode_name + self._mode_colors = mode_colors or {} + self._text_collector: list[str] = [] + + def configure_run_options( + self, + options: dict[str, Any], + agent: Agent, + session: Any, + ) -> None: + """Set response_format to PlanningResponse when in plan mode.""" + if self._is_planning_mode(session): + options["response_format"] = PlanningResponse + + async def on_text( + self, + ux: IUXStateDriver, + text: str, + agent: Agent, + session: Any, + ) -> None: + """Collect text in plan mode; stream through in execute mode.""" + if self._is_planning_mode_from_ux(ux): + self._text_collector.append(text) + else: + ux.write_text(escape(text)) + + async def on_stream_complete( + self, + ux: IUXStateDriver, + agent: Agent, + session: Any, + ) -> list[FollowUpAction] | None: + """Parse collected text as PlanningResponse and build follow-up actions.""" + if not self._is_planning_mode_from_ux(ux): + self._text_collector.clear() + return None + + collected_text = "".join(self._text_collector) + self._text_collector.clear() + + if not collected_text.strip(): + return None + + # Attempt to deserialize structured response + try: + planning_response = PlanningResponse.model_validate_json(collected_text) + except (json.JSONDecodeError, ValueError): + # JSON parsing failed — fall back to rendering as regular text + ux.write_text(escape(collected_text)) + return None + + if planning_response.type == PlanningResponseType.CLARIFICATION: + return self._build_clarification_actions(planning_response) + + if planning_response.type == PlanningResponseType.APPROVAL: + if not planning_response.questions: + ux.append_info_line("(approval response had no content)", "yellow") + return None + question = planning_response.questions[0] + return [self._build_approval_action(question, session)] + + # Unexpected type — fall back to rendering as regular text + ux.write_text(escape(collected_text)) + return None + + def _is_planning_mode(self, session: Any) -> bool: + """Check if session is in planning mode.""" + from agent_framework import get_agent_mode + + try: + current_mode = get_agent_mode(session) + except (AttributeError, TypeError): + return True # No mode provider → treat as planning + return current_mode.lower() == self._plan_mode_name.lower() + + def _is_planning_mode_from_ux(self, ux: IUXStateDriver) -> bool: + """Check if UX is in planning mode.""" + current = ux.current_mode + if current is None: + return True + return current.lower() == self._plan_mode_name.lower() + + def _build_clarification_actions( + self, + response: PlanningResponse, + ) -> list[FollowUpAction]: + """Build follow-up questions for clarification.""" + actions: list[FollowUpAction] = [] + + for question in response.questions: + prompt = question.message + cont = self._make_clarification_continuation(prompt) + + if question.choices and len(question.choices) > 0: + actions.append( + ChoiceFollowUpQuestion( + prompt=prompt, + choices=question.choices, + allow_custom_text=True, + continuation=cont, + ) + ) + else: + actions.append( + TextFollowUpQuestion( + prompt=prompt, + continuation=cont, + ) + ) + + return actions + + @staticmethod + def _make_clarification_continuation(prompt: str): + """Create a clarification continuation closure capturing the prompt.""" + + async def continuation( + answer: str, + ux: IUXStateDriver, + ) -> Message | None: + if not answer.strip(): + ux.append_info_line(f"🔹 {prompt}\n └─ (no answer)", "dim") + return None + + ux.append_info_line(f"🔹 {prompt}\n └─ [green]{answer}[/green]", "dim") + + from agent_framework import Message + + return Message(role="user", contents=[f"Q: {prompt}\nA: {answer}"]) + + return continuation + + def _build_approval_action( + self, + question: Any, + session: Any, + ) -> ChoiceFollowUpQuestion: + """Build the approval follow-up question.""" + approve_option = "Approve and switch to execute mode" + prompt = question.message + + async def continuation( + selection: str, + ux: IUXStateDriver, + ) -> Message | None: + ux.append_info_line( + f"🔹 {prompt}\n └─ [green]{selection}[/green]", + "dim", + ) + + if selection == approve_option: + from agent_framework import set_agent_mode + + set_agent_mode(session, self._execution_mode_name) + exec_color = self._mode_colors.get(self._execution_mode_name) + ux.set_mode(self._execution_mode_name, exec_color) + ux.append_info_line( + f"✅ Switched to {self._execution_mode_name} mode.", + exec_color, + ) + from agent_framework import Message + + return Message(role="user", contents=["Approved"]) + + # Custom freeform input — treat as suggested changes + from agent_framework import Message + + return Message(role="user", contents=[selection]) + + return ChoiceFollowUpQuestion( + prompt=prompt, + choices=[approve_option], + allow_custom_text=True, + continuation=continuation, + ) diff --git a/python/samples/02-agents/harness/console/observers/reasoning_display.py b/python/samples/02-agents/harness/console/observers/reasoning_display.py new file mode 100644 index 0000000000..dff6cb55d5 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/reasoning_display.py @@ -0,0 +1,80 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Reasoning display observer for showing thinking content.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from rich.markup import escape + +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent, Content + + from ..state_driver import IUXStateDriver + + +class ReasoningDisplayObserver(ConsoleObserver): + """Displays reasoning/thinking content from the agent. + + Some models (like o1) provide reasoning steps that show their + internal thought process. This observer displays them with a 💭 prefix + in a dimmed style. + """ + + async def on_content( + self, + ux: IUXStateDriver, + content: Content, + agent: Agent, + session: Any, + ) -> None: + """Display reasoning content. + + Args: + ux: The UX state driver for UI updates. + content: The content item to check for reasoning. + agent: The AI agent. + session: The agent session. + """ + reasoning_text = self._extract_reasoning(content) + if reasoning_text: + # Display reasoning in dim style to differentiate from main output + ux.append_info_line(f"💭 {escape(reasoning_text)}", "dim") + + def _extract_reasoning(self, content: Content) -> str | None: + """Extract reasoning text from content. + + Args: + content: The content item to extract reasoning from. + + Returns: + The reasoning text, or None if no reasoning is present. + """ + # Check for reasoning content type + if hasattr(content, "type") and content.type in {"text_reasoning", "reasoning"}: + if hasattr(content, "text"): + return content.text + content_attr = getattr(content, "content", None) + if content_attr: + return str(content_attr) + + # Check for reasoning attribute + reasoning = getattr(content, "reasoning", None) + if reasoning is not None: + if isinstance(reasoning, str): + return reasoning + if hasattr(reasoning, "text"): + return reasoning.text + + # Check for thinking attribute (alternative name) + thinking = getattr(content, "thinking", None) + if thinking is not None: + if isinstance(thinking, str): + return thinking + if hasattr(thinking, "text"): + return thinking.text + + return None diff --git a/python/samples/02-agents/harness/console/observers/text_output.py b/python/samples/02-agents/harness/console/observers/text_output.py new file mode 100644 index 0000000000..9603becd66 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/text_output.py @@ -0,0 +1,59 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Text output observer for streaming agent text.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from rich.markup import escape + +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent + + from ..state_driver import IUXStateDriver + + +class TextOutputObserver(ConsoleObserver): + """Displays streaming text output from the agent. + + Writes text chunks incrementally to the UX state driver as they arrive, + allowing real-time display during streaming. + """ + + async def on_text( + self, + ux: IUXStateDriver, + text: str, + agent: Agent, + session: Any, + ) -> None: + """Write each text chunk directly to the UX driver. + + Args: + ux: The UX state driver for UI updates. + text: The text chunk to display. + agent: The AI agent. + session: The agent session. + """ + ux.write_text(escape(text)) + + async def on_stream_complete( + self, + ux: IUXStateDriver, + agent: Agent, + session: Any, + ) -> list | None: + """No-op on stream complete (state managed by UX driver). + + Args: + ux: The UX state driver for UI updates. + agent: The AI agent. + session: The agent session. + + Returns: + None (no follow-up actions). + """ + return None diff --git a/python/samples/02-agents/harness/console/observers/tool_approval.py b/python/samples/02-agents/harness/console/observers/tool_approval.py new file mode 100644 index 0000000000..1fc9533a0d --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/tool_approval.py @@ -0,0 +1,139 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tool approval observer for user confirmation of tool calls. + +Detects function_approval_request content items during streaming, displays +approval notifications, and after the stream completes presents one +ChoiceFollowUpQuestion per pending approval request. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from ..app_state import ChoiceFollowUpQuestion, FollowUpAction +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent, Content, Message + + from ..state_driver import IUXStateDriver + + +class ToolApprovalObserver(ConsoleObserver): + """Asks user to approve tool calls before execution. + + Collects `function_approval_request` content during streaming and presents + a multi-choice approval question for each after the stream completes. + The continuation builds a `function_approval_response` Content to inject + into the next agent turn. + """ + + def __init__(self) -> None: + """Initialize the tool approval observer.""" + self._approval_requests: list[Content] = [] + + async def on_content( + self, + ux: IUXStateDriver, + content: Content, + agent: Agent, + session: Any, + ) -> None: + """Collect function_approval_request content for approval. + + Args: + ux: The UX state driver for UI updates. + content: The content item to check. + agent: The AI agent. + session: The agent session. + """ + if content.type == "function_approval_request": + self._approval_requests.append(content) + tool_name = self._format_tool_name(content) + ux.append_info_line(f"⚠️ Approval needed: {tool_name}", "yellow") + + async def on_stream_complete( + self, + ux: IUXStateDriver, + agent: Agent, + session: Any, + ) -> list[FollowUpAction] | None: + """Build approval questions for collected requests. + + Args: + ux: The UX state driver for UI updates. + agent: The AI agent. + session: The agent session. + + Returns: + List of ChoiceFollowUpQuestions, one per approval request. + """ + if not self._approval_requests: + return None + + actions: list[FollowUpAction] = [] + for request in self._approval_requests: + actions.append(self._build_approval_question(request)) + + self._approval_requests.clear() + return actions + + def _build_approval_question(self, request: Content) -> ChoiceFollowUpQuestion: + """Build a multi-choice approval question for a single request.""" + tool_name = self._format_tool_name(request) + prompt = f"🔐 Tool approval: {tool_name}" + + # TODO(westey-m): Add "Always approve" options when the framework supports + # CreateAlwaysApproveToolResponse / CreateAlwaysApproveToolWithArgumentsResponse. + choices = [ + "Approve this call", + "Deny", + ] + + async def continuation( + selection: str, + ux: IUXStateDriver, + ) -> Message | None: + from agent_framework import Message + + if selection == "Deny": + response_content = request.to_function_approval_response(approved=False) + action_label = "❌ Denied" + color = "red" + else: + response_content = request.to_function_approval_response(approved=True) + action_label = "✅ Approved" + color = "green" + + ux.append_info_line( + f"🔹 {prompt}\n └─ [{color}]{action_label}[/{color}]", + "dim", + ) + + return Message(role="user", contents=[response_content]) + + return ChoiceFollowUpQuestion( + prompt=prompt, + choices=choices, + allow_custom_text=False, + continuation=continuation, + ) + + @staticmethod + def _format_tool_name(content: Content) -> str: + """Extract a readable tool name from approval request content.""" + # The function_call is stored on the approval request content + function_call = getattr(content, "function_call", None) + if function_call is not None: + from ..formatters import build_default_formatters, format_tool_call + + try: + return format_tool_call(build_default_formatters(), function_call) + except (AttributeError, TypeError): + pass + # Fall back to name attribute + name = getattr(function_call, "name", None) + if name: + return str(name) + return "unknown tool" diff --git a/python/samples/02-agents/harness/console/observers/tool_call_display.py b/python/samples/02-agents/harness/console/observers/tool_call_display.py new file mode 100644 index 0000000000..e9999a9db6 --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/tool_call_display.py @@ -0,0 +1,53 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Tool call display observer using formatters.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from ..formatters import build_default_formatters, format_tool_call +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent, Content + + from ..formatters import ToolCallFormatter + from ..state_driver import IUXStateDriver + + +class ToolCallDisplayObserver(ConsoleObserver): + """Displays tool call notifications using formatters. + + Shows tool calls with a 🔧 prefix and uses the formatter system to + display them in a user-friendly format. + """ + + def __init__(self, formatters: list[ToolCallFormatter] | None = None) -> None: + """Initialize the tool call display observer. + + Args: + formatters: Optional list of tool formatters. If None, uses + default formatters from build_default_formatters(). + """ + self._formatters = formatters or build_default_formatters() + + async def on_content( + self, + ux: IUXStateDriver, + content: Content, + agent: Agent, + session: Any, + ) -> None: + """Display function call content. + + Args: + ux: The UX state driver for UI updates. + content: The content item to check for function calls. + agent: The AI agent. + session: The agent session. + """ + # Check if this is a function call content type + if content.type == "function_call": + formatted = format_tool_call(self._formatters, content) + ux.append_info_line(f"🔧 {formatted}", "yellow") diff --git a/python/samples/02-agents/harness/console/observers/usage_display.py b/python/samples/02-agents/harness/console/observers/usage_display.py new file mode 100644 index 0000000000..468dc4bd5e --- /dev/null +++ b/python/samples/02-agents/harness/console/observers/usage_display.py @@ -0,0 +1,56 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Usage display observer for token usage statistics.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from .base import ConsoleObserver + +if TYPE_CHECKING: + from agent_framework import Agent + + from ..state_driver import IUXStateDriver + + +class UsageDisplayObserver(ConsoleObserver): + """Displays token usage as a proportion of the context window. + + Shows current token usage as reported by the API immediately when + usage information becomes available (via Content items or the final response). + The display shows input/output/total relative to configured budgets. + """ + + async def on_content( + self, + ux: IUXStateDriver, + content: Any, + agent: Agent, + session: Any, + ) -> None: + """Update usage display immediately when usage content arrives. + + Args: + ux: The UX state driver for UI updates. + content: A content item from the response. + agent: The AI agent. + session: The agent session. + """ + if getattr(content, "type", None) == "usage": + usage_details = getattr(content, "usage_details", None) + if isinstance(usage_details, dict): + # Pass through to state driver — the runner handles formatting + ux.set_usage_text(self._format_from_details(usage_details)) + + @staticmethod + def _format_from_details(usage: dict) -> str: + """Format usage details dict into display text. + + This is a fallback formatter for when usage arrives as Content + before the runner's final response processing. + """ + input_tokens = usage.get("input_token_count", 0) or 0 + output_tokens = usage.get("output_token_count", 0) or 0 + total_tokens = usage.get("total_token_count", 0) or input_tokens + output_tokens + return f"📊 Tokens — input: {input_tokens:,} | output: {output_tokens:,} | total: {total_tokens:,}" diff --git a/python/samples/02-agents/harness/console/state_driver.py b/python/samples/02-agents/harness/console/state_driver.py new file mode 100644 index 0000000000..959c8757ba --- /dev/null +++ b/python/samples/02-agents/harness/console/state_driver.py @@ -0,0 +1,338 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""State driver interface for UI updates. + +This module defines the IUXStateDriver Protocol, which observers use to +update the UI during agent streaming. This is an interface-only definition; +the concrete implementation will be in a separate module. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol + +if TYPE_CHECKING: + from agent_framework import AgentSession + + from .app_state import FollowUpAction + + +class IUXStateDriver(Protocol): + """Protocol for UI state driver. + + Observers call these methods to update the UI during agent streaming. + This is an interface-only definition - concrete implementation comes later. + + The state driver acts as a controller between the agent framework (model) + and the Textual UI components (view), coordinating all UI updates. + """ + + def append_info_line(self, text: str, color: str | None = None) -> None: + """Append an informational line to the output. + + Used for displaying tool calls, errors, warnings, and other + informational messages that aren't part of the agent's text response. + + Args: + text: The text to display. + color: Optional Rich color string (e.g., "yellow", "red", "dim"). + """ + ... + + def append_stream_footer(self, text: str) -> None: + """Append a footer line after streaming ends. + + Used for displaying final status messages like "(no text response)" + or other closing information. + + Args: + text: The footer text to display. + """ + ... + + def begin_streaming(self) -> None: + """Begin streaming mode. + + Switches the bottom panel to streaming mode (shows "Streaming..." indicator), + starts the spinner animation, and prepares for streaming text updates. + """ + ... + + def update_streaming_text(self, accumulated_text: str) -> None: + """Update the accumulated streaming text. + + Called repeatedly during streaming to update the displayed text as + new chunks arrive from the agent. The text should accumulate across + multiple calls. + + Args: + accumulated_text: The full accumulated text so far. + """ + ... + + def write_text(self, text: str, color: str | None = None) -> None: + """Write a streaming text chunk incrementally. + + Appends the text to the current streaming entry. If the streaming + entry is no longer the last output item (e.g., an info_line was + inserted), creates a new streaming entry. + + Args: + text: The text chunk to append. + color: Optional Rich color string. + """ + ... + + def end_streaming(self) -> None: + """End streaming mode. + + Stops the spinner, switches the bottom panel back to text input mode, + and finalizes the streaming output. + """ + ... + + def enqueue_follow_up_action(self, action: FollowUpAction) -> None: + """Add a follow-up action to the queue. + + Follow-up actions can be questions to ask the user or messages to + inject into the next agent turn. The state driver queues these and + processes them after streaming completes. + + Args: + action: The follow-up action to queue. + """ + ... + + def has_pending_questions(self) -> bool: + """Check if there are pending follow-up questions awaiting user answers. + + Returns: + True if there are unanswered questions in the queue. + """ + ... + + def take_follow_up_responses(self) -> list: + """Take and clear all accumulated follow-up response messages. + + Returns: + List of Message objects accumulated from follow-up actions. + """ + ... + + async def write_no_text_warning(self, has_follow_up_actions: bool) -> None: + """Write a warning if the agent produced no text output. + + Called after streaming completes. If no text was received and no + follow-up actions exist, writes a "(no text response)" footer. + + Args: + has_follow_up_actions: Whether follow-up actions exist. + """ + ... + + def set_mode(self, mode: str | None, mode_color: str | None = None) -> None: + """Set the current agent mode. + + Updates the mode indicator in the UI (e.g., "[plan]", "[execute]") + with the specified color. + + Args: + mode: The mode name (e.g., "plan", "execute"), or None to hide. + mode_color: Optional Rich color string for the mode label. + """ + ... + + def set_show_spinner(self, show: bool) -> None: + """Show or hide the spinner animation. + + The spinner provides visual feedback that the agent is processing. + + Args: + show: True to show the spinner, False to hide it. + """ + ... + + def set_usage_text(self, usage_text: str | None) -> None: + """Set the token usage text. + + Displays token usage statistics (e.g., "1.2K in / 856 out") in + the status bar. + + Args: + usage_text: The formatted usage text, or None to hide. + """ + ... + + @property + def current_mode(self) -> str | None: + """Get the current agent mode. + + Returns: + The current mode name, or None if no mode is set. + """ + ... + + def begin_streaming_output(self) -> None: + """Reset per-turn streaming bookkeeping. + + Called at the start of each agent turn to reset streaming state + (e.g., clear accumulated text, reset flags). + """ + ... + + def write_user_input_echo(self, text: str) -> None: + """Echo user input to the output area. + + Displays the user's submitted input in the conversation history, + typically with a "You: " prefix. + + Args: + text: The user's input text. + """ + ... + + def request_shutdown(self) -> None: + """Request the application to shut down. + + Called by the /exit command handler to signal that the user + wants to quit the console. + """ + ... + + def replace_session(self, session: AgentSession) -> None: + """Replace the current agent session. + + Called by the /session-import command handler to swap the + active session with one loaded from a file. + + Args: + session: The new session to use. + """ + ... + + +class SimpleConsoleStateDriver: + """Simple console-based state driver for testing. + + This is a minimal implementation that logs all operations to the console. + Useful for testing the agent runner without a full UI. + """ + + def __init__(self) -> None: + """Initialize the simple state driver.""" + self._streaming = False + self._spinner_visible = False + self._current_mode: str | None = None + print("[SimpleConsoleStateDriver initialized]") + + def append_info_line(self, text: str, color: str | None = None) -> None: + """Append an informational line to the output.""" + color_prefix = f"[{color}]" if color else "" + print(f"{color_prefix} {text}") + + def append_stream_footer(self, text: str) -> None: + """Append a footer line after streaming ends.""" + print(f"[Footer] {text}") + + async def write_info_line(self, text: str, color: str | None = None) -> None: + """Async version of append_info_line.""" + self.append_info_line(text, color) + + def write_user_input_echo(self, text: str) -> None: + """Echo user input to the output.""" + print(f"\n[User] {text}\n") + + def begin_streaming(self) -> None: + """Begin streaming mode.""" + self._streaming = True + print("[▶ Streaming started]") + + def begin_streaming_output(self) -> None: + """Begin streaming output to the scroll panel.""" + print("[▶ Streaming output started]") + + def update_streaming_text(self, text: str) -> None: + """Update the currently streaming text.""" + # Truncate for readability + display_text = text[:80] + "..." if len(text) > 80 else text + print(f"[Assistant] {display_text}", end="", flush=True) + + def write_text(self, text: str, color: str | None = None) -> None: + """Write a streaming text chunk.""" + print(text, end="", flush=True) + + async def end_streaming_output(self) -> None: + """End streaming output.""" + print("\n[▪ Streaming output ended]") + + def end_streaming(self) -> None: + """End streaming mode.""" + self._streaming = False + print("[▪ Streaming ended]") + + def set_show_spinner(self, show: bool) -> None: + """Show or hide the spinner.""" + self._spinner_visible = show + status = "visible" if show else "hidden" + print(f"[Spinner: {status}]") + + def set_mode(self, mode: str | None, mode_color: str | None = None) -> None: + """Set the current mode text.""" + self._current_mode = mode + color_str = f" ({mode_color})" if mode_color else "" + print(f"[Mode: {mode or 'default'}{color_str}]") + + @property + def current_mode(self) -> str | None: + """Get the current agent mode.""" + return self._current_mode + + def set_usage_text(self, usage_text: str | None) -> None: + """Set the usage display text.""" + if usage_text: + print(f"[Usage: {usage_text}]") + + def enqueue_follow_up_action(self, action) -> None: + """Enqueue a follow-up action. + + Args: + action: The follow-up action to enqueue. + """ + action_type = type(action).__name__ + print(f"[Follow-up queued: {action_type}]") + + def has_pending_questions(self) -> bool: + """Check if there are pending follow-up questions.""" + return False + + def take_follow_up_responses(self) -> list: + """Take and clear all accumulated follow-up responses.""" + return [] + + async def write_no_text_warning(self, has_follow_up_actions: bool) -> None: + """Write a warning if no text was produced.""" + if not has_follow_up_actions: + print("[▪ (no text response from agent)]") + + def update_last_entry(self, entry_type, new_text: str) -> None: + """Update the last output entry (placeholder for now). + + Args: + entry_type: The type of entry to update. + new_text: The new text content. + """ + # Simplified: just print the update + display_text = new_text[:80] + "..." if len(new_text) > 80 else new_text + print(f"[Update last entry: {display_text}]", flush=True) + + def request_shutdown(self) -> None: + """Request application shutdown.""" + print("[Shutdown requested]") + + def replace_session(self, session) -> None: + """Replace the active session. + + Args: + session: The new session to use. + """ + print(f"[Session replaced: {getattr(session, 'id', 'unknown')}]") diff --git a/python/samples/02-agents/harness/console/textual_state_driver.py b/python/samples/02-agents/harness/console/textual_state_driver.py new file mode 100644 index 0000000000..0f50dbbdf3 --- /dev/null +++ b/python/samples/02-agents/harness/console/textual_state_driver.py @@ -0,0 +1,400 @@ +# Copyright (c) Microsoft. All rights reserved. + +"""Textual-based UX state driver implementation. + +This module provides the full HarnessConsoleUXStateDriver that connects +the agent runner and observers to the Textual UI components. It mutates +the application state and triggers UI updates through the Textual app. +""" + +from __future__ import annotations + +from collections.abc import Callable +from typing import TYPE_CHECKING + +from .app_state import ( + BottomPanelMode, + ChoiceFollowUpQuestion, + FollowUpAction, + FollowUpMessage, + FollowUpQuestion, + HarnessAppState, + OutputEntry, + OutputEntryType, +) + +if TYPE_CHECKING: + from agent_framework import Message + + +# Default mode colors (mode name -> Rich color string) +DEFAULT_MODE_COLORS: dict[str, str] = { + "plan": "cyan", + "execute": "green", + "review": "yellow", + "default": "blue", +} + + +def get_mode_color(mode: str | None, mode_colors: dict[str, str] | None = None) -> str: + """Get the color for a mode name. + + Args: + mode: The mode name. + mode_colors: Optional custom mode color mapping. + + Returns: + A Rich color string for the mode. + """ + colors = mode_colors or DEFAULT_MODE_COLORS + if mode is None: + return colors.get("default", "blue") + return colors.get(mode, colors.get("default", "blue")) + + +class HarnessConsoleUXStateDriver: + """Full Textual-based UX state driver. + + Implements the IUXStateDriver protocol by mutating application state + and calling back into the Textual app to trigger UI updates. + + The driver owns the output entry list and streaming state, and produces + state snapshots that the app uses to render the UI. + """ + + def __init__( + self, + app_state: HarnessAppState, + on_state_changed: Callable[[], None], + mode_colors: dict[str, str] | None = None, + ) -> None: + """Initialize the state driver. + + Args: + app_state: The application state object to mutate. + on_state_changed: Callback invoked after state changes to trigger UI refresh. + mode_colors: Optional mapping of mode names to Rich color strings. + """ + self._state = app_state + self._on_state_changed = on_state_changed + self._mode_colors = mode_colors + + # Streaming bookkeeping + self._has_received_any_text = False + self._current_streaming_entry: OutputEntry | None = None + self._current_streaming_entry_index: int = -1 + self._last_entry_type: OutputEntryType | None = None + + @property + def state(self) -> HarnessAppState: + """Get the current application state.""" + return self._state + + @property + def current_mode(self) -> str | None: + """Get the current agent mode.""" + return self._state.mode_text + + @current_mode.setter + def current_mode(self, value: str | None) -> None: + """Set the current agent mode.""" + self._state.mode_text = value + self._state.mode_color = get_mode_color(value, self._mode_colors) + self._notify() + + # --- Streaming lifecycle --- + + def begin_streaming(self) -> None: + """Begin streaming mode - switch bottom panel and show spinner.""" + self._state.mode = BottomPanelMode.STREAMING + self._state.show_spinner = True + self._state.input_enabled = False + self._notify() + + def begin_streaming_output(self) -> None: + """Reset per-turn streaming bookkeeping.""" + self._has_received_any_text = False + self._current_streaming_entry = None + self._current_streaming_entry_index = -1 + + def end_streaming(self) -> None: + """End streaming mode - return to text input.""" + self._state.mode = BottomPanelMode.TEXT_INPUT + self._state.show_spinner = False + self._state.input_enabled = True + self._notify() + + async def end_streaming_output(self) -> None: + """Finalize streaming output - add trailing newline if text was received.""" + if self._has_received_any_text: + self._current_streaming_entry = None + self._last_entry_type = OutputEntryType.STREAM_FOOTER + self._notify() + + def set_show_spinner(self, show: bool) -> None: + """Show or hide the spinner.""" + self._state.show_spinner = show + self._notify() + + # --- Text output --- + + def write_user_input_echo(self, text: str) -> None: + """Echo user input to the output area.""" + entry = OutputEntry( + type=OutputEntryType.USER_INPUT, + text=f"You: {text}", + color="green", + ) + self._append_entry(entry) + self._last_entry_type = OutputEntryType.USER_INPUT + self._notify() + + def append_info_line(self, text: str, color: str | None = None) -> None: + """Append an informational line to the output.""" + effective_color = color or get_mode_color(self._state.mode_text, self._mode_colors) + + # Add separator when transitioning from streaming text + prefix = "" + if self._last_entry_type in (OutputEntryType.STREAMING_TEXT, OutputEntryType.STREAM_FOOTER): + prefix = "" # Textual handles spacing via widget layout + + entry = OutputEntry( + type=OutputEntryType.INFO_LINE, + text=prefix + text, + color=effective_color, + ) + self._append_entry(entry) + self._last_entry_type = OutputEntryType.INFO_LINE + self._notify() + + def append_stream_footer(self, text: str) -> None: + """Append a footer line after streaming ends.""" + entry = OutputEntry( + type=OutputEntryType.STREAM_FOOTER, + text=text, + color="dim", + ) + self._append_entry(entry) + self._last_entry_type = OutputEntryType.STREAM_FOOTER + self._notify() + + async def write_info_line(self, text: str, color: str | None = None) -> None: + """Async version of append_info_line.""" + self.append_info_line(text, color) + + def write_text(self, text: str, color: str | None = None) -> None: + """Write streaming text from the agent. + + Accumulates text into the current streaming entry. If the streaming + entry is still the last output item, appends to it in place. Otherwise + starts a new streaming entry. + + Args: + text: The text chunk to append. + color: Optional Rich color. + """ + self._last_entry_type = OutputEntryType.STREAMING_TEXT + self._has_received_any_text = True + + effective_color = color or get_mode_color(self._state.mode_text, self._mode_colors) + + if ( + self._current_streaming_entry is not None + and self._current_streaming_entry_index == len(self._state.output_entries) - 1 + ): + # Append to existing streaming entry in place + self._current_streaming_entry.text += text + # Update the entry in the list (same object, but trigger notify) + else: + # Start a fresh streaming entry + self._current_streaming_entry = OutputEntry( + type=OutputEntryType.STREAMING_TEXT, + text=text, + color=effective_color, + ) + self._state.output_entries.append(self._current_streaming_entry) + self._current_streaming_entry_index = len(self._state.output_entries) - 1 + + self._notify() + + def update_streaming_text(self, accumulated_text: str) -> None: + """Update the accumulated streaming text (full replacement). + + Alternative to write_text() - replaces the entire streaming entry text. + If an info_line was appended after the streaming entry (e.g., a tool + call), creates a new streaming entry at the end of the list so the + UI can render it. + + Args: + accumulated_text: The full accumulated text so far. + """ + effective_color = get_mode_color(self._state.mode_text, self._mode_colors) + + if ( + self._current_streaming_entry is not None + and self._current_streaming_entry_index == len(self._state.output_entries) - 1 + ): + # Streaming entry is still the last entry — update in place + self._current_streaming_entry.text = accumulated_text + else: + # Either no current entry, or it's no longer at the end (an + # info_line was appended after it). Create a new streaming entry + # so the panel can render the continued text. + self._current_streaming_entry = OutputEntry( + type=OutputEntryType.STREAMING_TEXT, + text=accumulated_text, + color=effective_color, + ) + self._state.output_entries.append(self._current_streaming_entry) + self._current_streaming_entry_index = len(self._state.output_entries) - 1 + + self._last_entry_type = OutputEntryType.STREAMING_TEXT + self._has_received_any_text = True + self._notify() + + async def write_no_text_warning(self, has_follow_up_actions: bool) -> None: + """Write '(no text response)' warning if no text was received.""" + if not self._has_received_any_text and not has_follow_up_actions: + self.append_stream_footer("(no text response from agent)") + + # --- Usage and mode --- + + def set_usage_text(self, usage_text: str | None) -> None: + """Set the token usage text.""" + self._state.usage_text = usage_text + self._notify() + + def set_mode(self, mode: str | None, mode_color: str | None = None) -> None: + """Set the current mode.""" + self._state.mode_text = mode + self._state.mode_color = mode_color or get_mode_color(mode, self._mode_colors) + self._notify() + + # --- Follow-up actions --- + + def enqueue_follow_up_action(self, action: FollowUpAction) -> None: + """Enqueue a follow-up action.""" + if isinstance(action, FollowUpMessage): + self._state.accumulated_follow_up_responses.append(action.message) + elif isinstance(action, FollowUpQuestion): + self.queue_follow_up_questions([action]) + + def queue_follow_up_questions(self, questions: list[FollowUpQuestion]) -> None: + """Queue follow-up questions for user interaction. + + Args: + questions: List of questions to queue. + """ + if not questions: + return + + was_empty = len(self._state.pending_questions) == 0 + self._state.pending_questions.extend(questions) + + if was_empty: + self._configure_for_head_question(self._state.pending_questions[0]) + + self._notify() + + def add_follow_up_response(self, response: Message) -> None: + """Add a follow-up response message.""" + self._state.accumulated_follow_up_responses.append(response) + + def advance_follow_up_question(self) -> None: + """Advance to the next follow-up question. + + Removes the head question from the queue. If more questions remain, + configures the UI for the next one. Otherwise returns to text input. + """ + if not self._state.pending_questions: + return + + self._state.pending_questions.pop(0) + + if self._state.pending_questions: + self._configure_for_head_question(self._state.pending_questions[0]) + else: + # No more questions - return to text input + self._state.mode = BottomPanelMode.TEXT_INPUT + self._state.list_selection_options = [] + self._state.list_selection_title = None + self._state.list_selection_custom_text_placeholder = None + self._state.list_selection_index = 0 + self._state.list_selection_custom_input_text = "" + + self._notify() + + def take_follow_up_responses(self) -> list[Message]: + """Take and clear all accumulated follow-up responses. + + Returns: + List of accumulated response messages. + """ + responses = list(self._state.accumulated_follow_up_responses) + self._state.accumulated_follow_up_responses.clear() + return responses + + def has_pending_questions(self) -> bool: + """Check if there are pending follow-up questions. + + Returns: + True if unanswered questions exist in the queue. + """ + return len(self._state.pending_questions) > 0 + + # --- Queued messages (message injection) --- + + def set_queued_messages(self, pending: list[str]) -> None: + """Set the queued message display. + + Args: + pending: List of pending message texts. + """ + self._state.queued_items = [f"💬 {text}" for text in pending] + self._notify() + + # --- Internal helpers --- + + def _append_entry(self, entry: OutputEntry) -> None: + """Append an output entry to the state.""" + self._state.output_entries.append(entry) + + def _configure_for_head_question(self, question: FollowUpQuestion) -> None: + """Configure the UI for the current head question. + + Args: + question: The question to display. + """ + if isinstance(question, ChoiceFollowUpQuestion): + self._state.mode = BottomPanelMode.LIST_SELECTION + self._state.list_selection_options = list(question.choices) + self._state.list_selection_title = question.prompt + self._state.list_selection_custom_text_placeholder = ( + "✏️ Type a custom response..." if question.allow_custom_text else None + ) + self._state.list_selection_index = 0 + self._state.list_selection_custom_input_text = "" + else: + # Text question - show as info line and switch to text input + self.append_info_line(question.prompt) + self._state.mode = BottomPanelMode.TEXT_INPUT + self._state.list_selection_options = [] + self._state.list_selection_title = None + + def _notify(self) -> None: + """Notify the app that state has changed.""" + self._on_state_changed() + + def request_shutdown(self) -> None: + """Request the application to shut down.""" + self._state.shutdown_requested = True + self._notify() + + def replace_session(self, session) -> None: + """Replace the current agent session. + + Args: + session: The new AgentSession to use. + """ + self._state.replaced_session = session + self._notify() diff --git a/python/samples/02-agents/harness/harness_research.py b/python/samples/02-agents/harness/harness_research.py index 977c26f049..48e7fd7e0c 100644 --- a/python/samples/02-agents/harness/harness_research.py +++ b/python/samples/02-agents/harness/harness_research.py @@ -1,6 +1,19 @@ +# /// script +# requires-python = ">=3.10" +# dependencies = [ +# "agent-framework", +# "textual>=6.2.1", +# "rich>=13.7.1", +# "azure-identity", +# "python-dotenv", +# ] +# /// +# Run with any PEP 723 compatible runner, e.g.: +# uv run samples/02-agents/harness/harness_research.py + # Copyright (c) Microsoft. All rights reserved. -"""Harness Research Assistant. +"""Harness Research Assistant with Console UI. Demonstrates ``create_harness_agent`` — a factory function that builds a pre-configured agent with batteries included, automatically wiring up function @@ -16,12 +29,9 @@ context providers: - **Web Search** — real-time web search via ``get_web_search_tool()`` The sample creates a research-focused agent with web search capability and runs -a simple interactive chat loop. The agent will plan research tasks using todos, -switch between plan and execute modes, search the web for current information, -and track its progress. - -Special commands: - /exit — End the session. +it inside the Textual-based harness console. The agent will plan research tasks +using todos, switch between plan and execute modes, search the web for current +information, and track its progress. Environment variables: FOUNDRY_PROJECT_ENDPOINT — Azure AI Foundry project endpoint URL @@ -36,19 +46,24 @@ import asyncio from agent_framework import create_harness_agent from agent_framework.foundry import FoundryChatClient from azure.identity import AzureCliCredential +from console import build_observers_with_planning, run_agent_async from dotenv import load_dotenv RESEARCH_INSTRUCTIONS = """\ ## Research Assistant Instructions -You are a research assistant. When given a research topic, research it thoroughly using web search and web browsing. -Use your knowledge to form good search queries and hypotheses, but always verify claims with the tools available to you rather than relying on memory alone. +You are a research assistant. When given a research topic, research it +thoroughly using web search and web browsing. Use your knowledge to form good +search queries and hypotheses, but always verify claims with the tools +available to you rather than relying on memory alone. ### Research quality Consult multiple sources when possible and cross-reference key claims. -When sources disagree, note the discrepancy and explain which source you consider more reliable and why. -If a web page fails to load or a search returns irrelevant results, try alternative search queries or sources before moving on. +When sources disagree, note the discrepancy and explain which source you +consider more reliable and why. +If a web page fails to load or a search returns irrelevant results, try +alternative search queries or sources before moving on. Track your sources — you will need them when presenting results. ### Presenting results @@ -58,7 +73,8 @@ When presenting your final findings: - Use clear sections with headings for each major topic or sub-question. - Cite your sources inline (e.g., "According to [source name](URL), ..."). - End with a brief summary of key takeaways. -- In addition to returning the results to the user, save the final research report to file memory so it survives compaction and can be referenced later. +- In addition to returning the results to the user, save the final research + report to file memory so it survives compaction and can be referenced later. """ @@ -82,64 +98,17 @@ async def main() -> None: agent_instructions=RESEARCH_INSTRUCTIONS, ) - # Create a session to maintain conversation state across turns. - session = agent.create_session() - - print("Research Assistant (powered by create_harness_agent)") - print("=" * 50) - print("Enter a research topic to get started.") - print("Type /exit to end the session.\n") - - # Simple interactive chat loop. - while True: - user_input = input("You: ").strip() - if not user_input: - continue - if user_input.lower() == "/exit": - print("\nGoodbye!") - break - - # Run the agent with streaming and print the response as it arrives. - print("\nAssistant: ", end="", flush=True) - async for update in agent.run(user_input, session=session, stream=True): - if update.contents: - for content in update.contents: - # Print a brief message for each tool call in the stream. - if content.type == "function_call": - print(f"\n [calling tool: {content.name}]", flush=True) - print(" ", end="", flush=True) - # Show web search activity when the result arrives with action details. - elif ( - content.type in ("search_tool_call", "search_tool_result") - and getattr(content, "tool_name", None) == "web_search" - ): - action = None - if content.type == "search_tool_result" and isinstance(content.result, dict): - action = content.result.get("action", {}) - elif content.type == "search_tool_call": - action = content.arguments if isinstance(content.arguments, dict) else None - if action: - action_type = action.get("type", "search") - if action_type == "search": - queries = action.get("queries") or [] - query_str = ", ".join(f'"{q}"' for q in queries) if queries else action.get("query", "") - print(f"\n 🌐 Web search: {query_str}", flush=True) - print(" ", end="", flush=True) - elif action_type == "open_page": - url = action.get("url", "(unknown)") - print(f"\n 🌐 Opening: {url}", flush=True) - print(" ", end="", flush=True) - elif action_type == "find_in_page": - pattern = action.get("pattern", "") - print(f'\n 🌐 Find in page: "{pattern}"', flush=True) - print(" ", end="", flush=True) - else: - print(f"\n 🌐 Web search: {action_type}", flush=True) - print(" ", end="", flush=True) - # Print text content as it streams in. - if update.text: - print(update.text, end="", flush=True) - print("\n") + # Run the harness console with the research agent. + await run_agent_async( + agent, + session=agent.create_session(), + observers=build_observers_with_planning(agent), + initial_mode="plan", + title="🔬 Research Assistant", + placeholder="Enter a research topic...", + max_context_window_tokens=128_000, + max_output_tokens=16_384, + ) if __name__ == "__main__": diff --git a/python/uv.lock b/python/uv.lock index 5a420aafa2..de1ff516ef 100644 --- a/python/uv.lock +++ b/python/uv.lock @@ -543,7 +543,7 @@ requires-dist = [ { name = "agent-framework-core", editable = "packages/core" }, { name = "agent-framework-openai", editable = "packages/openai" }, { name = "azure-ai-inference", specifier = ">=1.0.0b9,<1.0.0b10" }, - { name = "azure-ai-projects", specifier = ">=2.1.0,<3.0" }, + { name = "azure-ai-projects", specifier = ">=2.2.0,<3.0" }, ] [[package]] @@ -1242,7 +1242,7 @@ wheels = [ [[package]] name = "azure-ai-projects" -version = "2.1.0" +version = "2.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "azure-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, @@ -1252,9 +1252,9 @@ dependencies = [ { name = "openai", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, { name = "typing-extensions", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/72/76/3fdede8eddfe5927a571898a15f0288ba30fee78e5ba099f88df3ded70af/azure_ai_projects-2.1.0.tar.gz", hash = "sha256:f0749fa9a174255aa1a5550fb6078208521518472907a4c6dd552767d9b39caa", size = 543343, upload-time = "2026-04-20T17:06:48.751Z" } +sdist = { url = "https://files.pythonhosted.org/packages/b9/d8/24342aea74fe75b0a8378b6eff665b9c1cb63f855c1a96f70a0095e474a2/azure_ai_projects-2.2.0.tar.gz", hash = "sha256:58ee31bb031cfb004051145c545294bb0d32de679c670c312ef384845bd72cef", size = 668496, upload-time = "2026-05-30T00:20:59.099Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/f7/f6/4984e7772a97c7a9e6505a3de8e55a5070fa2b02cd7e980da91e0d9b9b97/azure_ai_projects-2.1.0-py3-none-any.whl", hash = "sha256:6f259d8eb9167d2dfd372006d0221a8118faeaeb05829fa898b595bc6f19c699", size = 274309, upload-time = "2026-04-20T17:06:50.542Z" }, + { url = "https://files.pythonhosted.org/packages/40/cf/90f27a2b48c9b748f84194b07e565f900e7f0ce0500da9b9f067dca599d3/azure_ai_projects-2.2.0-py3-none-any.whl", hash = "sha256:8f89bdaca4df1bd479d3bd2bd0f19a0905d60be6d17b84a69e8fabd82eac5906", size = 344307, upload-time = "2026-05-30T00:21:00.672Z" }, ] [[package]]