mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
36ce0950e4
Remove linking, multicast, durable delivery, and host push machinery from the v1 hosting core. Keep those scenarios in a proposed follow-up ADR and update channel packages, samples, docs, tests, and workspace metadata around the smaller host/channel contract. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
431 lines
16 KiB
Python
431 lines
16 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Unit tests for :mod:`agent_framework_hosting_telegram`.
|
|
|
|
These tests exercise the internal parsing helpers and the webhook entry-point
|
|
without spinning up a real Telegram bot. The polling loop and HTTP-side
|
|
helpers are excluded from coverage because they require a live bot token.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import contextlib
|
|
from collections.abc import Awaitable, Mapping
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
from agent_framework_hosting import (
|
|
AgentFrameworkHost,
|
|
ChannelCommand,
|
|
ChannelCommandContext,
|
|
ChannelRequest,
|
|
HostedRunResult,
|
|
)
|
|
from starlette.testclient import TestClient
|
|
|
|
from agent_framework_hosting_telegram import TelegramChannel, telegram_isolation_key
|
|
from agent_framework_hosting_telegram._channel import (
|
|
_parse_telegram_message,
|
|
_telegram_media_file_id,
|
|
)
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Pure helpers #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
def test_telegram_isolation_key_format() -> None:
|
|
assert telegram_isolation_key(42) == "telegram:42"
|
|
assert telegram_isolation_key("abc") == "telegram:abc"
|
|
|
|
|
|
class TestMediaFileId:
|
|
def test_no_media(self) -> None:
|
|
assert _telegram_media_file_id({"text": "hi"}) is None
|
|
|
|
def test_photo_picks_largest(self) -> None:
|
|
assert _telegram_media_file_id({"photo": [{"file_id": "small"}, {"file_id": "large"}]}) == (
|
|
"large",
|
|
"image/jpeg",
|
|
)
|
|
|
|
def test_photo_empty_list(self) -> None:
|
|
assert _telegram_media_file_id({"photo": []}) is None
|
|
|
|
def test_document_uses_mime_type(self) -> None:
|
|
result = _telegram_media_file_id({"document": {"file_id": "f1", "mime_type": "application/pdf"}})
|
|
assert result == ("f1", "application/pdf")
|
|
|
|
def test_voice_default_mime(self) -> None:
|
|
result = _telegram_media_file_id({"voice": {"file_id": "v1"}})
|
|
assert result == ("v1", "audio/ogg")
|
|
|
|
|
|
class TestParseTelegramMessage:
|
|
async def test_text_only(self) -> None:
|
|
async def resolve(_: str) -> str | None:
|
|
return None
|
|
|
|
msg = await _parse_telegram_message({"text": "hello"}, resolve)
|
|
assert msg.role == "user"
|
|
assert msg.text == "hello"
|
|
|
|
async def test_text_and_photo(self) -> None:
|
|
async def resolve(file_id: str) -> str | None:
|
|
return f"https://files.telegram.org/{file_id}"
|
|
|
|
msg = await _parse_telegram_message({"caption": "look", "photo": [{"file_id": "p1"}]}, resolve)
|
|
assert msg.text == "look"
|
|
# Image content present.
|
|
assert any((getattr(c, "uri", None) or "").endswith("/p1") for c in msg.contents)
|
|
|
|
async def test_unresolvable_media_falls_back_to_text(self) -> None:
|
|
async def resolve(_: str) -> str | None:
|
|
return None
|
|
|
|
msg = await _parse_telegram_message({"text": "x", "voice": {"file_id": "v1"}}, resolve)
|
|
# Resolver returned None — the contents should still include the
|
|
# text without crashing.
|
|
assert msg.text == "x"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Webhook entry point #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
@dataclass
|
|
class _FakeAgentResponse:
|
|
text: str
|
|
|
|
|
|
class _FakeAgent:
|
|
def __init__(self, reply: str = "ok") -> None:
|
|
self._reply = reply
|
|
self.runs: list[Any] = []
|
|
|
|
def create_session(self, *, session_id: str | None = None) -> Any:
|
|
return {"session_id": session_id}
|
|
|
|
def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any:
|
|
self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs})
|
|
|
|
async def _coro() -> _FakeAgentResponse:
|
|
return _FakeAgentResponse(text=self._reply)
|
|
|
|
return _coro()
|
|
|
|
|
|
def _make_telegram(
|
|
stream_default: bool = False, *, path: str = "/telegram/webhook"
|
|
) -> tuple[TelegramChannel, _FakeAgent]:
|
|
agent = _FakeAgent("hi")
|
|
ch = TelegramChannel(
|
|
bot_token="123:abc",
|
|
path=path,
|
|
webhook_url="https://example.com/hook",
|
|
secret_token="s3cr3t",
|
|
stream=stream_default,
|
|
)
|
|
# Replace the internal HTTP client with an AsyncMock so the channel
|
|
# never tries to call the real Telegram API.
|
|
fake_http = MagicMock()
|
|
# post() returns a response object whose raise_for_status() is sync.
|
|
response_mock = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.get = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
return ch, agent
|
|
|
|
|
|
class TestTelegramWebhook:
|
|
def test_webhook_accepts_text_message_and_dispatches_to_agent(self) -> None:
|
|
ch, agent = _make_telegram()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
# Skip lifespan so polling/setWebhook are not invoked.
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/telegram/webhook",
|
|
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hello"}},
|
|
headers={"x-telegram-bot-api-secret-token": "s3cr3t"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert agent.runs, "expected the agent to be invoked"
|
|
|
|
def test_empty_path_mounts_at_app_root(self) -> None:
|
|
ch, agent = _make_telegram(path="")
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/",
|
|
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hello"}},
|
|
headers={"x-telegram-bot-api-secret-token": "s3cr3t"},
|
|
)
|
|
assert r.status_code == 200
|
|
assert agent.runs, "expected the agent to be invoked"
|
|
|
|
def test_webhook_rejects_bad_secret(self) -> None:
|
|
ch, agent = _make_telegram()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/telegram/webhook",
|
|
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hi"}},
|
|
headers={"x-telegram-bot-api-secret-token": "WRONG"},
|
|
)
|
|
assert r.status_code == 401
|
|
assert not agent.runs
|
|
|
|
async def test_response_hook_can_rewrite_originating_reply(self) -> None:
|
|
seen_kwargs: list[dict[str, Any]] = []
|
|
|
|
def hook(result: HostedRunResult, **kwargs: Any) -> HostedRunResult:
|
|
seen_kwargs.append(dict(kwargs))
|
|
return HostedRunResult(_FakeAgentResponse(text=result.result.text.upper()), session=result.session)
|
|
|
|
ch, agent = _make_telegram()
|
|
ch.response_hook = hook
|
|
|
|
class _Ctx:
|
|
target: Any = agent
|
|
|
|
async def run(
|
|
self,
|
|
_request: ChannelRequest,
|
|
*,
|
|
run_hook: Any | None = None,
|
|
protocol_request: Any | None = None,
|
|
response_hook: Any | None = None,
|
|
channel_name: str | None = None,
|
|
) -> HostedRunResult:
|
|
result = HostedRunResult(_FakeAgentResponse(text="hi"))
|
|
if response_hook is None:
|
|
return result
|
|
shaped = response_hook(result, request=_request, channel_name=channel_name or _request.channel)
|
|
if isinstance(shaped, Awaitable):
|
|
return await shaped
|
|
return shaped
|
|
|
|
ch._ctx = _Ctx() # type: ignore[assignment] # pyright: ignore[reportPrivateUsage]
|
|
|
|
request = ChannelRequest(channel="telegram", operation="message.create", input="hi", stream=False)
|
|
await ch._dispatch(99, request) # pyright: ignore[reportPrivateUsage]
|
|
|
|
assert ch._http is not None
|
|
args, kwargs = ch._http.post.call_args # type: ignore[attr-defined]
|
|
assert args[0].endswith("/sendMessage")
|
|
assert kwargs["json"]["text"] == "HI"
|
|
assert seen_kwargs
|
|
assert seen_kwargs[0]["channel_name"] == "telegram"
|
|
|
|
|
|
class TestCommand:
|
|
async def test_command_handler_invoked(self) -> None:
|
|
captured: list[ChannelCommandContext] = []
|
|
|
|
async def handler(ctx: ChannelCommandContext) -> None:
|
|
captured.append(ctx)
|
|
await ctx.reply("pong")
|
|
|
|
ch = TelegramChannel(
|
|
bot_token="123:abc",
|
|
webhook_url="https://example.com/hook",
|
|
commands=[ChannelCommand(name="ping", description="ping", handle=handler)],
|
|
register_native_commands=False,
|
|
)
|
|
fake_http = MagicMock()
|
|
response_mock = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.get = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
host = AgentFrameworkHost(target=_FakeAgent(), channels=[ch])
|
|
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/telegram/webhook",
|
|
json={"update_id": 2, "message": {"chat": {"id": 7}, "text": "/ping"}},
|
|
)
|
|
assert r.status_code == 200
|
|
assert captured and captured[0].request.operation == "command.invoke"
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Per-chat serial ordering #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestPerChatOrdering:
|
|
async def test_updates_for_same_chat_run_serially(self) -> None:
|
|
"""Two updates for the same chat must process in arrival order."""
|
|
ch, _ = _make_telegram()
|
|
order: list[int] = []
|
|
slow_event = asyncio.Event()
|
|
|
|
async def fake_process(update: Mapping[str, Any]) -> None:
|
|
uid = update.get("update_id")
|
|
assert isinstance(uid, int)
|
|
if uid == 1:
|
|
# Block the first update so the second is queued behind it.
|
|
await slow_event.wait()
|
|
order.append(uid)
|
|
|
|
ch._process_update = fake_process # type: ignore[method-assign]
|
|
|
|
ch._enqueue_update({"update_id": 1, "message": {"chat": {"id": 100}, "text": "first"}})
|
|
ch._enqueue_update({"update_id": 2, "message": {"chat": {"id": 100}, "text": "second"}})
|
|
|
|
# Let the worker start the first update.
|
|
await asyncio.sleep(0)
|
|
assert order == [] # blocked on slow_event
|
|
slow_event.set()
|
|
# Drain.
|
|
worker = ch._chat_workers[100]
|
|
# Wait for the queue to empty.
|
|
await ch._chat_queues[100].join()
|
|
# Cleanup
|
|
worker.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await worker
|
|
|
|
assert order == [1, 2]
|
|
|
|
async def test_updates_for_different_chats_run_in_parallel(self) -> None:
|
|
"""Different chats get separate workers and can interleave freely."""
|
|
ch, _ = _make_telegram()
|
|
started: list[int] = []
|
|
gate_a = asyncio.Event()
|
|
|
|
async def fake_process(update: Mapping[str, Any]) -> None:
|
|
uid = update.get("update_id")
|
|
assert isinstance(uid, int)
|
|
started.append(uid)
|
|
if uid == 1:
|
|
await gate_a.wait()
|
|
|
|
ch._process_update = fake_process # type: ignore[method-assign]
|
|
|
|
ch._enqueue_update({"update_id": 1, "message": {"chat": {"id": 1}, "text": "a"}})
|
|
ch._enqueue_update({"update_id": 2, "message": {"chat": {"id": 2}, "text": "b"}})
|
|
|
|
# Both should be admitted into their respective workers despite
|
|
# update 1 being blocked.
|
|
await asyncio.sleep(0)
|
|
# Update 2 finishes; update 1 still blocked.
|
|
assert 2 in started
|
|
gate_a.set()
|
|
for cid in (1, 2):
|
|
await ch._chat_queues[cid].join()
|
|
for w in ch._chat_workers.values():
|
|
w.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await w
|
|
|
|
|
|
# --------------------------------------------------------------------------- #
|
|
# Webhook ack-before-run + shutdown drains workers #
|
|
# --------------------------------------------------------------------------- #
|
|
|
|
|
|
class TestWebhookAckBeforeRun:
|
|
async def test_webhook_returns_200_before_agent_completes(self) -> None:
|
|
"""The webhook must ack before the agent runs, to dodge Telegram's 60s redelivery."""
|
|
ch, _ = _make_telegram()
|
|
from starlette.requests import Request
|
|
|
|
agent_started = asyncio.Event()
|
|
agent_release = asyncio.Event()
|
|
|
|
async def fake_process(update: Mapping[str, Any]) -> None:
|
|
agent_started.set()
|
|
await agent_release.wait()
|
|
|
|
ch._process_update = fake_process # type: ignore[method-assign]
|
|
|
|
async def receive() -> Any:
|
|
payload = b'{"update_id":1,"message":{"chat":{"id":42},"text":"hi"}}'
|
|
return {"type": "http.request", "body": payload, "more_body": False}
|
|
|
|
scope = {
|
|
"type": "http",
|
|
"method": "POST",
|
|
"path": "/telegram/webhook",
|
|
"headers": [(b"x-telegram-bot-api-secret-token", b"s3cr3t")],
|
|
"query_string": b"",
|
|
}
|
|
request = Request(scope, receive=receive)
|
|
|
|
# Drive the webhook handler. Even though the agent won't complete
|
|
# (gate_a still cleared) the webhook must still 200 promptly.
|
|
resp = await ch._handle(request)
|
|
assert resp.status_code == 200
|
|
# The agent task is in flight but not finished — proves ack came first.
|
|
await asyncio.wait_for(agent_started.wait(), timeout=1.0)
|
|
assert not agent_release.is_set()
|
|
|
|
# Cleanup: release the agent and drain.
|
|
agent_release.set()
|
|
await ch._chat_queues[42].join()
|
|
for w in list(ch._chat_workers.values()):
|
|
w.cancel()
|
|
with contextlib.suppress(asyncio.CancelledError):
|
|
await w
|
|
|
|
|
|
class TestShutdownDrainsWorkers:
|
|
async def test_shutdown_cancels_in_flight_chat_workers(self) -> None:
|
|
"""`_on_shutdown` must drain per-chat workers, not leak them."""
|
|
ch, _ = _make_telegram()
|
|
forever = asyncio.Event()
|
|
|
|
async def stuck(update: Mapping[str, Any]) -> None:
|
|
await forever.wait()
|
|
|
|
ch._process_update = stuck # type: ignore[method-assign]
|
|
ch._enqueue_update({"update_id": 9, "message": {"chat": {"id": 1}, "text": "a"}})
|
|
await asyncio.sleep(0)
|
|
assert ch._chat_workers and ch._update_tasks
|
|
|
|
await ch._on_shutdown()
|
|
assert not ch._chat_workers
|
|
assert not ch._update_tasks
|
|
|
|
|
|
def _deletewebhook_called(http_mock: MagicMock) -> bool:
|
|
return any(call.args and str(call.args[0]).endswith("/deleteWebhook") for call in http_mock.post.call_args_list)
|
|
|
|
|
|
class TestWebhookShutdownTeardown:
|
|
async def test_shutdown_keeps_webhook_by_default(self) -> None:
|
|
"""Default: shutdown must NOT delete the webhook (avoids redeploy races)."""
|
|
ch, _ = _make_telegram()
|
|
assert ch._transport == "webhook"
|
|
await ch._on_shutdown()
|
|
assert not _deletewebhook_called(ch._http) # type: ignore[arg-type]
|
|
ch._http.aclose.assert_awaited() # type: ignore[union-attr]
|
|
|
|
async def test_shutdown_deletes_webhook_when_opted_in(self) -> None:
|
|
"""Opt-in: ``delete_webhook_on_shutdown=True`` performs best-effort teardown."""
|
|
ch = TelegramChannel(
|
|
bot_token="123:abc",
|
|
webhook_url="https://example.com/hook",
|
|
secret_token="s3cr3t",
|
|
delete_webhook_on_shutdown=True,
|
|
stream=False,
|
|
)
|
|
fake_http = MagicMock()
|
|
response_mock = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.get = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
await ch._on_shutdown()
|
|
assert _deletewebhook_called(fake_http)
|
|
fake_http.aclose.assert_awaited()
|