mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
bad05a2bdc
* Add initial harness console for python * Add textual to project * Add planning and approval flows with list selector * Address PR comments * Fix list selection bug * Fix PR #6312 round 2 review comments - Escape untrusted agent text with rich.markup.escape() in observers (text_output, planning_output, reasoning_display) to prevent markup injection - Remove non-functional 'Always approve' choices from tool_approval.py (framework lacks CreateAlwaysApproveToolResponse support) - Remove textual from root pyproject.toml dev deps (sample-specific) - Add PEP 723 inline script metadata to harness_research.py - Narrow except Exception to except NoMatches in list_selection.py Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix build error * Fix build errors --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
60 lines
1.4 KiB
Python
60 lines
1.4 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Text output observer for streaming agent text."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from rich.markup import escape
|
|
|
|
from .base import ConsoleObserver
|
|
|
|
if TYPE_CHECKING:
|
|
from agent_framework import Agent
|
|
|
|
from ..state_driver import IUXStateDriver
|
|
|
|
|
|
class TextOutputObserver(ConsoleObserver):
|
|
"""Displays streaming text output from the agent.
|
|
|
|
Writes text chunks incrementally to the UX state driver as they arrive,
|
|
allowing real-time display during streaming.
|
|
"""
|
|
|
|
async def on_text(
|
|
self,
|
|
ux: IUXStateDriver,
|
|
text: str,
|
|
agent: Agent,
|
|
session: Any,
|
|
) -> None:
|
|
"""Write each text chunk directly to the UX driver.
|
|
|
|
Args:
|
|
ux: The UX state driver for UI updates.
|
|
text: The text chunk to display.
|
|
agent: The AI agent.
|
|
session: The agent session.
|
|
"""
|
|
ux.write_text(escape(text))
|
|
|
|
async def on_stream_complete(
|
|
self,
|
|
ux: IUXStateDriver,
|
|
agent: Agent,
|
|
session: Any,
|
|
) -> list | None:
|
|
"""No-op on stream complete (state managed by UX driver).
|
|
|
|
Args:
|
|
ux: The UX state driver for UI updates.
|
|
agent: The AI agent.
|
|
session: The agent session.
|
|
|
|
Returns:
|
|
None (no follow-up actions).
|
|
"""
|
|
return None
|