Python: add agent-framework-hosting-telegram channel (#5643)

* feat(hosting-telegram): add Telegram channel package

New ``agent-framework-hosting-telegram`` package implementing the
Telegram Bot API channel for the Hosting framework. Mounts a webhook
endpoint (``POST /telegram/webhook``) and an in-process polling loop
onto an ``AgentFrameworkHost`` and translates Telegram ``Update``
payloads to/from the channel-neutral ``ChannelRequest`` /
``HostedRunResult`` plumbing.

Surface (re-exported from ``agent_framework_hosting_telegram``):

- ``TelegramChannel`` -- concrete ``Channel`` implementation. Owns the
  webhook route + an optional ``getUpdates`` long-polling lifespan,
  parses Telegram ``Update``s into ``ChannelRequest`` (text, photo,
  document, voice, callback_query, …), runs the optional
  ``ChannelRunHook``, calls back into the ``ChannelContext`` to invoke
  the agent target, and posts the response back via
  ``sendMessage`` / ``sendChatAction`` / ``answerCallbackQuery`` on the
  Telegram Bot API. Honours ``DeliveryReport.include_originating`` so
  cross-channel pushes can target the originating Telegram chat
  without double-acking.
- Native fields the channel doesn't lift onto ``ChannelRequest`` (e.g.
  ``chat.type``, ``message.message_id``, ``callback_query.data``) are
  attached to ``ChannelRequest.attributes`` so a ``ChannelRunHook``
  can pick them up via the standard ``protocol_request=`` kwarg.
- 13 unit tests covering route wiring, ``Update`` parsing across the
  common content shapes, hook composition, and originating vs
  non-originating delivery branches.

Registers the package in ``python/pyproject.toml``
``[tool.uv.sources]`` and adds the matching pyright
``executionEnvironments`` entry. Stacks on PR-2 (Hosting core);
independent of PR-3 / PR-4.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(hosting-telegram): preserve in-chat ordering, ack-before-run, drain shutdown

- Replace per-update task fan-out with per-chat asyncio.Queue + worker.
  Telegram only guarantees update ordering up to getUpdates; the
  previous code spawned one task per update, which broke ordering for
  adjacent updates in the same chat. Updates are now serialised per
  chat_id (so /start then "what's the weather" can't race) while
  different chats still process in parallel.

- Webhook handler now acks (200) immediately and runs the agent in
  the per-chat worker. Telegram redelivers any update the webhook
  doesn't 200 within ~60 seconds, so a streamed agent reply that runs
  longer than that previously triggered a retry storm and duplicate
  replies.

- _on_shutdown now drains everything: poll task → per-chat workers →
  webhook-spawned dispatcher tasks (the new ack-before-run path), then
  deletes the webhook + closes the HTTP client. Previously webhook
  tasks were not tracked at all, so an in-flight agent invocation
  could leak past app shutdown.

- _enqueue_update extracts chat_id from message / edited_message /
  callback_query; updates with no resolvable chat fall back to a
  one-shot dispatcher task that's still tracked in _update_tasks for
  shutdown.

- Webhook handler now also returns 400 on malformed JSON / non-object
  payloads instead of crashing the request.

4 new tests cover per-chat serial ordering, parallel-across-chats
isolation, ack-before-run latency, and shutdown drain.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* test(hosting): drop redundant @pytest.mark.asyncio decorators

asyncio_mode = "auto" is configured in pyproject.toml across the
hosting packages, so individual @pytest.mark.asyncio decorators are
unnecessary.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(hosting-telegram): adapt push tests to hosted run result wrapper

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(hosting-telegram): add response hooks

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-05-28 14:28:30 +02:00
committed by GitHub
Unverified
parent cb1d4a6ee5
commit f0b9ab6733
9 changed files with 1440 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
@@ -0,0 +1,29 @@
# agent-framework-hosting-telegram
Telegram channel for [agent-framework-hosting](../hosting). Supports both
**polling** (default — no public URL required, perfect for local dev) and
**webhook** transports, multi-content messages (text + media), command
registration, and end-to-end SSE-style streaming via Telegram message edits.
## Usage
```python
from agent_framework_hosting import AgentFrameworkHost
from agent_framework_hosting_telegram import TelegramChannel
host = AgentFrameworkHost(
target=my_agent,
channels=[TelegramChannel(bot_token="...")],
)
host.serve()
```
For production, configure `webhook_url="https://…"` and the channel will
register the webhook on startup and receive updates over HTTPS.
## Identity & sessions
Each Telegram chat is mapped to an opaque isolation key
(`telegram:<chat_id>`) so other channels can opt into the same per-chat
session by reusing the same key. The helper `telegram_isolation_key(chat_id)`
is exported for that purpose.
@@ -0,0 +1,7 @@
# Copyright (c) Microsoft. All rights reserved.
"""Telegram channel for :mod:`agent_framework_hosting`."""
from ._channel import TelegramChannel, telegram_isolation_key
__all__ = ["TelegramChannel", "telegram_isolation_key"]
@@ -0,0 +1,873 @@
# Copyright (c) Microsoft. All rights reserved.
"""Built-in channel: Telegram (polling + webhook transports).
Inspired by PR #5393's Telegram sample. Two transports are supported:
- ``polling`` (default when no ``webhook_url`` is set): the channel runs a
background ``getUpdates`` long-poll loop. No public URL required —
perfect for local development. This is what ``python-telegram-bot``
uses by default.
- ``webhook``: when ``webhook_url`` is set, the channel registers it via
``setWebhook`` on startup and receives updates over HTTPS POSTs to the
mounted ``/webhook`` route. This is the production-recommended mode.
"""
from __future__ import annotations
import asyncio
import contextlib
import time
from collections.abc import Awaitable, Callable, Mapping, Sequence
from typing import Any, Literal
import httpx
from agent_framework import (
AgentResponse,
AgentResponseUpdate,
Content,
Message,
ResponseStream,
)
from agent_framework_hosting import (
ChannelCommand,
ChannelCommandContext,
ChannelContext,
ChannelContribution,
ChannelIdentity,
ChannelRequest,
ChannelResponseContext,
ChannelResponseHook,
ChannelRunHook,
ChannelSession,
ChannelStreamTransformHook,
HostedRunResult,
apply_response_hook,
apply_run_hook,
logger,
)
from starlette.requests import Request
from starlette.responses import JSONResponse, Response
from starlette.routing import BaseRoute, Route
# Telegram update parsing ------------------------------------------------------
#
# A Telegram message can carry text, a caption, and one of several media kinds
# (photo, document, voice, audio, video). For media we resolve the file_id
# into a public bot-file URL via ``getFile`` and emit a ``Content.from_uri``;
# the agent then receives a multi-content Message with text + media side by
# side, the same as it would over the Responses API.
_TELEGRAM_MEDIA_DEFAULT_MIMETYPE = {
"photo": "image/jpeg",
"document": "application/octet-stream",
"voice": "audio/ogg",
"audio": "audio/mpeg",
"video": "video/mp4",
}
# Telegram's hard limit on a single message body. Past this, sendMessage /
# editMessageText return 400. We truncate interim and final edits at this
# boundary; if the agent emits more, callers can split into a follow-up
# sendMessage in their run hook.
_TELEGRAM_MAX_TEXT_LEN = 4096
def telegram_isolation_key(chat_id: Any) -> str:
"""Build the namespaced isolation key the Telegram channel writes under.
Exposed at module scope so other channels' ``run_hook`` callbacks can opt
into the same per-chat session (e.g. a Responses caller resuming a
Telegram conversation by passing the chat id).
"""
return f"telegram:{chat_id}"
def _text_result(text: str) -> HostedRunResult[AgentResponse]:
"""Build a host delivery payload from text accumulated by this channel."""
return HostedRunResult(AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text(text=text)])]))
def _telegram_media_file_id(message: Mapping[str, Any]) -> tuple[str, str] | None:
"""Return ``(file_id, fallback_media_type)`` for any media on the message."""
photo = message.get("photo")
if isinstance(photo, list) and photo:
# Telegram delivers photos as an array of progressively-larger sizes.
largest = photo[-1]
if isinstance(largest, Mapping) and (fid := largest.get("file_id")):
return str(fid), _TELEGRAM_MEDIA_DEFAULT_MIMETYPE["photo"]
for kind in ("document", "voice", "audio", "video"):
media = message.get(kind)
if media and isinstance(media, Mapping) and (fid := media.get("file_id")):
return str(fid), str(media.get("mime_type") or _TELEGRAM_MEDIA_DEFAULT_MIMETYPE[kind])
return None
async def _parse_telegram_message(
message: Mapping[str, Any],
resolve_file_url: Callable[[str], Awaitable[str | None]],
) -> Message:
"""Translate one Telegram ``message`` object into an Agent Framework Message."""
parts: list[Content] = []
if (text := message.get("text") or message.get("caption")) and isinstance(text, str):
parts.append(Content.from_text(text=text))
if (media := _telegram_media_file_id(message)) is not None:
file_id, media_type = media
if (uri := await resolve_file_url(file_id)) is not None:
parts.append(Content.from_uri(uri=uri, media_type=media_type))
if not parts:
# Edge case: no recognizable content — emit an empty placeholder so the
# agent contract still receives a Message and can react gracefully.
parts.append(Content.from_text(text=""))
return Message("user", parts)
class TelegramChannel:
"""Telegram channel with both polling and webhook transports.
Update kinds handled (both transports):
- ``message`` / ``edited_message`` — text, captions, and media
(photo/document/voice/audio/video).
- ``callback_query`` — inline-button presses; the ``data`` payload is
treated as the user's next utterance and the click is acknowledged.
Streaming
---------
The channel defaults to ``stream=True`` on every ``ChannelRequest``: it
drives ``ChannelContext.run_stream`` and progressively edits a single
Telegram message as ``AgentResponseUpdate`` chunks arrive (Telegram has
no native streaming primitive). Pass ``stream=False`` on the constructor
to opt out for all messages, or override per-request inside the
``run_hook`` (set ``ChannelRequest.stream = False``). A ``stream_transform_hook``
can rewrite or drop individual updates before they hit the wire — useful
for redaction, formatting, or merging tool-call deltas.
"""
name = "telegram"
def __init__(
self,
*,
bot_token: str,
path: str = "/telegram",
commands: Sequence[ChannelCommand] = (),
register_native_commands: bool = True,
run_hook: ChannelRunHook | None = None,
response_hook: ChannelResponseHook | None = None,
api_base: str = "https://api.telegram.org",
webhook_url: str | None = None,
secret_token: str | None = None,
parse_mode: str | None = None,
send_typing_action: bool = True,
transport: Literal["auto", "polling", "webhook"] = "auto",
polling_timeout: int = 30,
stream: bool = True,
stream_transform_hook: ChannelStreamTransformHook | None = None,
stream_edit_min_interval: float = 0.4,
) -> None:
self.path = path
self._token = bot_token
self._commands = list(commands)
self._register = register_native_commands
self._hook = run_hook
self.response_hook = response_hook
self._stream_default = stream
self._stream_transform_hook = stream_transform_hook
self._stream_edit_min_interval = stream_edit_min_interval
self._api = f"{api_base}/bot{bot_token}"
self._webhook_url = webhook_url
self._secret_token = secret_token
self._parse_mode = parse_mode
self._send_typing_action = send_typing_action
if transport == "auto":
transport = "webhook" if webhook_url else "polling"
if transport == "webhook" and not webhook_url:
raise ValueError("transport='webhook' requires webhook_url")
self._transport: Literal["polling", "webhook"] = transport
self._polling_timeout = polling_timeout
self._ctx: ChannelContext | None = None
self._http: httpx.AsyncClient | None = None
self._poll_task: asyncio.Task[None] | None = None
# Tracks all in-flight tasks (per-chat workers + webhook-spawned
# dispatcher tasks). Drained on shutdown.
self._update_tasks: set[asyncio.Task[None]] = set()
# Per-chat serial workers preserve in-chat ordering: each
# chat_id has its own asyncio.Queue + worker task. Updates for
# different chats run in parallel; updates for the same chat
# run strictly in arrival order.
self._chat_queues: dict[int, asyncio.Queue[Mapping[str, Any]]] = {}
self._chat_workers: dict[int, asyncio.Task[None]] = {}
def contribute(self, context: ChannelContext) -> ChannelContribution:
"""Register the webhook route (only in ``webhook`` transport) plus lifecycle hooks.
Polling-mode hosts intentionally expose no HTTP route — adding one
would just confuse readers who expect inbound HTTP traffic to do
something.
"""
self._ctx = context
routes: list[BaseRoute] = []
if self._transport == "webhook":
routes.append(Route("/webhook", self._handle, methods=["POST"]))
return ChannelContribution(
routes=routes,
commands=self._commands,
on_startup=[self._on_startup],
on_shutdown=[self._on_shutdown],
)
# -- lifecycle --------------------------------------------------------- #
async def _on_startup(self) -> None:
"""Open the HTTP client, optionally register slash commands, and start the transport.
- Polling: clears any previously-set webhook (Telegram refuses
``getUpdates`` while one is registered) and launches the
long-poll task.
- Webhook: ``setWebhook`` to the configured URL, including the
optional secret token used to authenticate inbound calls.
"""
# ``getUpdates`` blocks for up to ``polling_timeout`` seconds, so the
# client timeout has to comfortably exceed it. Skip when a client has
# been pre-injected (e.g. by tests).
if self._http is None:
self._http = httpx.AsyncClient(timeout=self._polling_timeout + 15)
if self._register and self._commands:
cmd_payload: dict[str, Any] = {
"commands": [{"command": c.name, "description": c.description} for c in self._commands]
}
await self._http.post(f"{self._api}/setMyCommands", json=cmd_payload)
logger.info("Registered %d Telegram commands", len(self._commands))
if self._transport == "webhook":
payload: dict[str, Any] = {
"url": self._webhook_url,
"allowed_updates": ["message", "edited_message", "callback_query"],
}
if self._secret_token:
payload["secret_token"] = self._secret_token
response = await self._http.post(f"{self._api}/setWebhook", json=payload)
response.raise_for_status()
logger.info("Telegram webhook registered: %s", self._webhook_url)
else:
# Telegram refuses getUpdates while a webhook is set, so clear it.
await self._http.post(f"{self._api}/deleteWebhook", json={"drop_pending_updates": False})
self._poll_task = asyncio.create_task(self._poll_loop(), name="telegram-poll")
logger.info("Telegram polling started (long-poll timeout=%ss)", self._polling_timeout)
async def _on_shutdown(self) -> None:
"""Stop the polling task, drain in-flight workers, drop the webhook, close HTTP.
Drain order:
1. Cancel the poll task so no new updates are admitted.
2. Cancel + await per-chat worker tasks so any currently-running
agent invocations can finish before we yank the HTTP client
out from under them.
3. Cancel + await any webhook-dispatched tasks tracked in
``_update_tasks`` (the webhook handler returns 200 immediately
and runs the agent in a background task, which the previous
shutdown ignored entirely).
4. Best-effort `deleteWebhook` and HTTP client close.
Webhook teardown is best-effort — failures (e.g. revoked token at
shutdown) are logged but never raised so app shutdown can complete.
"""
if self._poll_task is not None:
self._poll_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await self._poll_task
self._poll_task = None
# Cancel per-chat workers; their queues are no longer being fed.
for worker in list(self._chat_workers.values()):
worker.cancel()
for worker in list(self._chat_workers.values()):
with contextlib.suppress(asyncio.CancelledError, Exception):
await worker
self._chat_workers.clear()
self._chat_queues.clear()
# Webhook-spawned dispatcher tasks (the ack-before-run path) live
# in _update_tasks alongside any leftover poll-spawned tasks.
for task in list(self._update_tasks):
task.cancel()
for task in list(self._update_tasks):
with contextlib.suppress(asyncio.CancelledError, Exception):
await task
self._update_tasks.clear()
if self._http is not None:
if self._transport == "webhook":
try:
await self._http.post(f"{self._api}/deleteWebhook")
except Exception: # pragma: no cover - best-effort cleanup
logger.exception("deleteWebhook failed")
await self._http.aclose()
# -- polling loop ------------------------------------------------------ #
async def _poll_loop(self) -> None:
"""Long-poll ``getUpdates`` until cancelled.
Each batch advances the ``offset`` by the highest seen
``update_id`` so processed updates aren't redelivered. Updates
are routed to per-chat serial workers via :meth:`_enqueue_update`
— this preserves in-chat ordering (Telegram only guarantees
ordering up to ``getUpdates``; the previous fan-out into one
task per update broke that guarantee for adjacent updates).
Different chats still process in parallel because each has its
own worker. Transient errors back off for 2 seconds before
retrying.
"""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
offset: int | None = None
while True:
try:
params: dict[str, Any] = {
"timeout": self._polling_timeout,
"allowed_updates": '["message","edited_message","callback_query"]',
}
if offset is not None:
params["offset"] = offset
response = await self._http.get(f"{self._api}/getUpdates", params=params)
response.raise_for_status()
payload = response.json()
if not payload.get("ok"):
logger.warning("Telegram getUpdates returned error: %s", payload)
await asyncio.sleep(1.0)
continue
for update in payload.get("result", []) or []:
update_id = update.get("update_id")
if isinstance(update_id, int):
offset = update_id + 1
self._enqueue_update(update)
except asyncio.CancelledError:
raise
except Exception:
logger.exception("Telegram polling iteration failed; retrying in 2s")
await asyncio.sleep(2.0)
def _chat_id_for_update(self, update: Mapping[str, Any]) -> int | None:
"""Best-effort extraction of the chat id from any supported update shape."""
message = update.get("message") or update.get("edited_message")
if isinstance(message, Mapping):
chat = message.get("chat")
if isinstance(chat, Mapping):
cid = chat.get("id")
if isinstance(cid, int):
return cid
callback = update.get("callback_query")
if isinstance(callback, Mapping):
inner = callback.get("message")
if isinstance(inner, Mapping):
chat = inner.get("chat")
if isinstance(chat, Mapping):
cid = chat.get("id")
if isinstance(cid, int):
return cid
return None
def _enqueue_update(self, update: Mapping[str, Any]) -> None:
"""Route an update to its per-chat serial worker.
Updates with no resolvable chat_id (malformed payloads, unknown
update types) fall back to a one-shot dispatcher task so they
can't deadlock the main loop.
"""
chat_id = self._chat_id_for_update(update)
if chat_id is None:
# No chat to serialise on — fire and forget, but still track
# so shutdown can drain.
task = asyncio.create_task(self._safe_process_update(update))
self._update_tasks.add(task)
task.add_done_callback(self._update_tasks.discard)
return
queue = self._chat_queues.get(chat_id)
if queue is None:
queue = asyncio.Queue()
self._chat_queues[chat_id] = queue
worker = asyncio.create_task(
self._chat_worker(chat_id, queue),
name=f"telegram-chat-worker-{chat_id}",
)
self._chat_workers[chat_id] = worker
# Ensure shutdown can drain this worker too.
self._update_tasks.add(worker)
worker.add_done_callback(self._update_tasks.discard)
queue.put_nowait(update)
async def _chat_worker(self, chat_id: int, queue: asyncio.Queue[Mapping[str, Any]]) -> None:
"""Drain a single chat's queue serially.
Per-chat ordering is preserved by processing one update at a
time. Exceptions in :meth:`_safe_process_update` are already
swallowed, so the worker keeps running. The worker is cancelled
on channel shutdown.
"""
try:
while True:
update = await queue.get()
try:
await self._safe_process_update(update)
finally:
queue.task_done()
except asyncio.CancelledError:
raise
async def _safe_process_update(self, update: Mapping[str, Any]) -> None:
"""Wrap :meth:`_process_update` so a failure on one update never escapes a task."""
try:
await self._process_update(update)
except Exception:
logger.exception("Telegram update processing failed: %s", update.get("update_id"))
# -- request handling -------------------------------------------------- #
async def _handle(self, request: Request) -> Response:
"""Webhook endpoint — verifies the secret token then queues the update.
Telegram includes the configured secret in the
``X-Telegram-Bot-Api-Secret-Token`` header on every webhook delivery;
we reject mismatches so leaked URLs alone aren't enough to inject
traffic.
**Acks before running the agent.** Telegram redelivers any update
the webhook doesn't 200 within ~60 seconds, so a streamed agent
reply that runs longer than that would otherwise trigger a
retry storm and duplicate replies. We enqueue onto the
per-chat serial worker (preserving ordering with polling-mode)
and immediately return 200; the actual processing happens in
the worker task tracked by ``_update_tasks`` and drained on
shutdown.
"""
if self._secret_token is not None:
received = request.headers.get("x-telegram-bot-api-secret-token")
if received != self._secret_token:
logger.warning("Telegram webhook secret token mismatch — rejecting update")
return JSONResponse({"ok": False, "error": "invalid secret"}, status_code=401)
try:
update = await request.json()
except Exception:
logger.warning("Telegram webhook received malformed JSON; returning 400")
return JSONResponse({"ok": False, "error": "invalid json"}, status_code=400)
if not isinstance(update, Mapping):
logger.warning("Telegram webhook received non-object payload; returning 400")
return JSONResponse({"ok": False, "error": "invalid payload"}, status_code=400)
# Ack immediately, route through per-chat worker so ordering with
# polling-mode is identical and shutdown drains all in-flight work.
self._enqueue_update(update)
return JSONResponse({"ok": True})
async def _process_update(self, update: Mapping[str, Any]) -> None:
"""Convert one Telegram update into a :class:`ChannelRequest` and dispatch.
Branches:
- ``callback_query`` — inline-button click; handled separately so we
can ack the click and treat the button payload as the next user
utterance.
- ``message`` / ``edited_message`` — the common text-and-attachment
case; runs slash commands when present, otherwise builds a
message and dispatches to the agent.
"""
if self._ctx is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
# Inline-button presses: ack the click, treat the payload as input.
if (callback := update.get("callback_query")) is not None:
await self._handle_callback_query(callback)
return
# message and edited_message share the same shape.
message = update.get("message") or update.get("edited_message") or {}
chat_id = (message.get("chat") or {}).get("id")
text = message.get("text") or message.get("caption")
has_media = any(k in message for k in ("photo", "document", "voice", "audio", "video"))
if chat_id is None or (not isinstance(text, str) and not has_media):
return # Nothing actionable.
# Native command dispatch — bypasses the agent.
if isinstance(text, str) and text.startswith("/"):
command_name = text[1:].split()[0].split("@", 1)[0]
handler = next((c for c in self._commands if c.name == command_name), None)
if handler is not None:
channel_request = ChannelRequest(
channel=self.name,
operation="command.invoke",
input=text,
session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)),
attributes={"chat_id": chat_id},
identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)),
)
ctx = ChannelCommandContext(
request=channel_request,
reply=lambda body, cid=chat_id: self._send(cid, body),
)
await handler.handle(ctx)
return
# Plain message → agent run. Build a multi-content Message with the
# text/caption alongside any attached media (photo, document, ...).
parsed = await _parse_telegram_message(message, self._resolve_file_url)
channel_request = ChannelRequest(
channel=self.name,
operation="message.create",
input=[parsed],
session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)),
attributes={"chat_id": chat_id},
stream=self._stream_default,
identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)),
)
if self._hook is not None:
channel_request = await apply_run_hook(
self._hook,
channel_request,
target=self._ctx.target,
protocol_request=update,
)
await self._dispatch(chat_id, channel_request)
async def _handle_callback_query(self, callback: Mapping[str, Any]) -> None:
"""Handle an inline-button click.
Always answers the callback query to clear the spinner on the user's
client, then treats the button's ``data`` payload as the user's
next utterance and dispatches it as if they had typed it.
Callbacks without a chat or string ``data`` are silently dropped.
"""
if self._ctx is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
callback_id = callback.get("id")
data = callback.get("data")
message = callback.get("message") or {}
chat_id = (message.get("chat") or {}).get("id")
if callback_id is not None:
# Always answer to remove the loading spinner on the user's client.
try:
await self._http.post(f"{self._api}/answerCallbackQuery", json={"callback_query_id": callback_id})
except Exception: # pragma: no cover - defensive
logger.exception("answerCallbackQuery failed")
if chat_id is None or not isinstance(data, str):
return
channel_request = ChannelRequest(
channel=self.name,
operation="message.create",
input=data,
session=ChannelSession(isolation_key=telegram_isolation_key(chat_id)),
attributes={"chat_id": chat_id, "callback_query_id": callback_id},
stream=self._stream_default,
identity=ChannelIdentity(channel=self.name, native_id=str(chat_id)),
)
if self._hook is not None:
channel_request = await apply_run_hook(
self._hook,
channel_request,
target=self._ctx.target,
protocol_request=callback,
)
await self._dispatch(chat_id, channel_request)
async def _resolve_file_url(self, file_id: str) -> str | None:
"""Resolve a Telegram file_id into an HTTPS URL via getFile."""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
try:
response = await self._http.get(f"{self._api}/getFile", params={"file_id": file_id})
response.raise_for_status()
file_path = response.json().get("result", {}).get("file_path")
except Exception: # pragma: no cover - defensive: bad token, network, etc.
logger.exception("getFile failed for %s", file_id)
return None
return f"{self._api.replace('/bot', '/file/bot')}/{file_path}" if file_path else None
# -- outbound helpers -------------------------------------------------- #
async def _dispatch(self, chat_id: int, request: ChannelRequest) -> None:
"""Run the request and forward results to ``chat_id``."""
if self._ctx is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
if not request.stream:
if self._send_typing_action:
await self._send_chat_action(chat_id, "typing")
result = await self._ctx.run(request)
include_originating = await self._ctx.deliver_response(request, result)
if include_originating:
result = await self._apply_response_hook(result, request)
await self._reply_with_result(chat_id, result.result)
return
stream = self._ctx.run_stream(request)
await self._stream_to_chat(chat_id, request, stream)
async def _stream_to_chat(
self,
chat_id: int,
request: ChannelRequest,
stream: ResponseStream[AgentResponseUpdate, AgentResponse],
) -> None:
"""Iterate the agent's ResponseStream and progressively edit a Telegram message.
Smoothness recipe:
1. Send the placeholder message up front so the user sees instant
activity (a "" bubble) instead of waiting for the first edit.
2. Token consumption never awaits the network — a background
``edit_worker`` watches an asyncio.Event, coalesces accumulated
text, rate-limits itself to ``stream_edit_min_interval`` (default
0.4s — well under Telegram's per-chat edit limits), and only sends
when the text actually changed.
3. Interim edits are sent as **plain text** even if a ``parse_mode``
is configured. Partial Markdown/HTML mid-stream is invalid and
Telegram rejects it with 400 ``can't parse entities``. The final
edit re-applies the configured ``parse_mode`` so the user ends up
with formatted output.
4. ``sendChatAction("typing")`` is re-issued every 4s while the
stream is live so the typing bubble doesn't disappear on long
responses (Telegram clears it after ~5s).
"""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
# Pin to a local so mypy narrows inside the nested closures below.
http = self._http
accumulated = ""
last_sent = ""
last_edit_at = 0.0
message_id: int | None = None
worker_done = asyncio.Event()
wake = asyncio.Event()
async def send_initial_placeholder() -> None:
nonlocal message_id, last_edit_at
try:
response = await http.post(
f"{self._api}/sendMessage",
json={"chat_id": chat_id, "text": ""},
)
response.raise_for_status()
message_id = response.json().get("result", {}).get("message_id")
last_edit_at = time.monotonic()
except Exception: # pragma: no cover - placeholder is best-effort
logger.exception("Telegram placeholder send failed")
async def edit_worker() -> None:
nonlocal last_sent, last_edit_at
while not (worker_done.is_set() and accumulated == last_sent):
await wake.wait()
wake.clear()
if message_id is None or accumulated == last_sent:
continue
elapsed = time.monotonic() - last_edit_at
if elapsed < self._stream_edit_min_interval:
try:
await asyncio.wait_for(wake.wait(), timeout=self._stream_edit_min_interval - elapsed)
wake.clear()
except asyncio.TimeoutError:
pass
snapshot = accumulated[:_TELEGRAM_MAX_TEXT_LEN]
if snapshot == last_sent:
continue
# Interim edits go out as plain text — partial Markdown/HTML
# is invalid mid-stream and Telegram returns 400.
try:
await http.post(
f"{self._api}/editMessageText",
json={"chat_id": chat_id, "message_id": message_id, "text": snapshot},
)
except Exception: # pragma: no cover - keep streaming on error
logger.exception("Telegram interim edit failed")
last_sent = snapshot
last_edit_at = time.monotonic()
async def typing_worker() -> None:
while not worker_done.is_set():
await self._send_chat_action(chat_id, "typing")
try:
await asyncio.wait_for(worker_done.wait(), timeout=4.0)
except asyncio.TimeoutError:
continue
await send_initial_placeholder()
edit_task = asyncio.create_task(edit_worker(), name="telegram-edit-worker")
typing_task = asyncio.create_task(typing_worker(), name="telegram-typing-worker")
try:
async for update in stream:
if self._stream_transform_hook is not None:
transformed = self._stream_transform_hook(update)
if isinstance(transformed, Awaitable):
transformed = await transformed
if transformed is None:
continue
update = transformed
chunk = getattr(update, "text", None)
if chunk:
accumulated += chunk
wake.set()
except Exception:
logger.exception("Telegram streaming consumption failed")
finally:
worker_done.set()
wake.set()
try:
await edit_task
except Exception: # pragma: no cover
logger.exception("Telegram edit worker crashed")
typing_task.cancel()
with contextlib.suppress(asyncio.CancelledError, Exception):
await typing_task
# Always finalize so context providers / history hooks run.
try:
final = await stream.get_final_response()
except Exception: # pragma: no cover - finalize is best-effort
logger.exception("Stream finalize failed")
final = None
# Final edit applies parse_mode (if configured) to the full text.
final_text = (accumulated or last_sent)[:_TELEGRAM_MAX_TEXT_LEN]
result = _text_result(final_text) if final_text else None
include_originating = True
if result is not None and self._ctx is not None:
include_originating = await self._ctx.deliver_response(request, result)
if include_originating:
result = await self._apply_response_hook(result, request)
final_text = result.result.text[:_TELEGRAM_MAX_TEXT_LEN]
if message_id is not None and final_text and final_text != last_sent:
payload: dict[str, Any] = {
"chat_id": chat_id,
"message_id": message_id,
"text": final_text,
}
if self._parse_mode:
payload["parse_mode"] = self._parse_mode
try:
response = await self._http.post(f"{self._api}/editMessageText", json=payload)
# If parse_mode rejected the final edit, retry as plain text
# so the user still sees the answer.
if response.status_code == 400 and self._parse_mode:
payload.pop("parse_mode", None)
await self._http.post(f"{self._api}/editMessageText", json=payload)
except Exception: # pragma: no cover
logger.exception("Telegram final edit failed")
# If nothing ever streamed (no text chunks at all), fall back to the
# full result so images / tool outputs still reach the user.
if not accumulated and include_originating:
if final is not None:
wrapped_final = await self._apply_response_hook(HostedRunResult(final), request)
final = wrapped_final.result
await self._reply_with_result(chat_id, final)
async def _apply_response_hook(
self,
result: HostedRunResult[Any],
request: ChannelRequest,
) -> HostedRunResult[Any]:
"""Apply the channel-level response hook for an originating reply."""
if self.response_hook is None:
return result
context = ChannelResponseContext(
request=request,
channel_name=self.name,
destination_identity=None,
originating=True,
is_echo=False,
)
return await apply_response_hook(self.response_hook, result, context=context)
async def _reply_with_result(self, chat_id: int, result: Any) -> None:
"""Forward an AgentRunResponse back to Telegram.
Sends any image attachments on the last assistant message as photos,
then the text body via ``sendMessage``. Falls back to a ``"(no
response)"`` placeholder if neither text nor images are present so
the user is never left hanging.
"""
sent_photo = False
last_message = None
messages = getattr(result, "messages", None) or []
for msg in reversed(messages):
if getattr(msg, "role", None) == "assistant":
last_message = msg
break
if last_message is not None:
for content in getattr(last_message, "contents", []) or []:
uri = getattr(content, "uri", None)
media_type = getattr(content, "media_type", "") or ""
if uri and isinstance(media_type, str) and media_type.startswith("image/"):
await self._send_photo(chat_id, uri)
sent_photo = True
text = getattr(result, "text", None)
if text:
await self._send(chat_id, text)
elif not sent_photo:
await self._send(chat_id, "(no response)")
async def _send(self, chat_id: int, text: str, **extra: Any) -> None:
"""POST a ``sendMessage`` to Telegram, applying the configured ``parse_mode`` by default.
Extra kwargs are merged into the payload after ``parse_mode`` so
callers can override any field per-call (e.g. drop ``parse_mode``
for a known-unsafe interim text).
"""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
payload: dict[str, Any] = {"chat_id": chat_id, "text": text}
if self._parse_mode and "parse_mode" not in extra:
payload["parse_mode"] = self._parse_mode
payload.update(extra)
await self._http.post(f"{self._api}/sendMessage", json=payload)
# -- ChannelPush -------------------------------------------------------- #
async def push(self, identity: ChannelIdentity, payload: HostedRunResult[AgentResponse]) -> None:
"""Proactive delivery to a Telegram chat.
Implements :class:`host.ChannelPush` so other channels' callers can
target Telegram via ``ChannelRequest.response_target``
(e.g. ``ResponseTarget.channels(["telegram:8741188429"])`` from a
``/responses`` request). ``identity.native_id`` is the Telegram
chat id.
"""
try:
chat_id = int(identity.native_id)
except ValueError as exc:
raise ValueError(f"Telegram push requires an int chat_id, got {identity.native_id!r}") from exc
if self._http is None:
raise RuntimeError("TelegramChannel.push called before startup")
await self._send(chat_id, payload.result.text)
async def _send_photo(self, chat_id: int, photo_url: str, caption: str | None = None) -> None:
"""POST a ``sendPhoto`` to Telegram with an optional caption."""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
payload: dict[str, Any] = {"chat_id": chat_id, "photo": photo_url}
if caption:
payload["caption"] = caption
await self._http.post(f"{self._api}/sendPhoto", json=payload)
async def _send_chat_action(self, chat_id: int, action: str) -> None:
"""Fire a ``sendChatAction`` (typing, upload_photo, …); errors are logged and swallowed.
Chat actions are pure UX hints — Telegram clears them after ~5s
— so failures should never propagate to the caller.
"""
if self._http is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("telegram channel not started")
try:
await self._http.post(f"{self._api}/sendChatAction", json={"chat_id": chat_id, "action": action})
except Exception: # pragma: no cover - non-critical UX
logger.exception("sendChatAction failed")
__all__ = ["TelegramChannel", "telegram_isolation_key"]
@@ -0,0 +1,107 @@
[project]
name = "agent-framework-hosting-telegram"
description = "Telegram channel for agent-framework-hosting."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0a260424"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Typing :: Typed",
]
dependencies = [
"agent-framework-core>=1.2.0,<2",
"agent-framework-hosting==1.0.0a260424",
"httpx>=0.27,<1",
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = []
timeout = 120
markers = [
"integration: marks tests as integration tests that require external services",
]
[tool.ruff]
extend = "../../pyproject.toml"
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extends = "../../pyproject.toml"
include = ["agent_framework_hosting_telegram"]
exclude = ['tests']
# Telegram's API delivers loosely-typed JSON-ish maps (chat, message, photo,
# media, callback_query). Strict ``Unknown`` reporting on every ``.get(...)``
# adds noise without catching real bugs — narrowing happens via runtime
# isinstance checks instead. Other type checks remain strict.
reportUnknownArgumentType = "none"
reportUnknownMemberType = "none"
reportUnknownVariableType = "none"
reportUnknownLambdaType = "none"
reportOptionalMemberAccess = "none"
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_hosting_telegram"]
exclude_dirs = ["tests"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks.mypy]
help = "Run MyPy for this package."
cmd = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_hosting_telegram"
[tool.poe.tasks.test]
help = "Run the default unit test suite for this package."
cmd = 'pytest -m "not integration" --cov=agent_framework_hosting_telegram --cov-report=term-missing:skip-covered tests'
[build-system]
requires = ["flit-core >= 3.11,<4.0"]
build-backend = "flit_core.buildapi"
@@ -0,0 +1,389 @@
# Copyright (c) Microsoft. All rights reserved.
"""Unit tests for :mod:`agent_framework_hosting_telegram`.
These tests exercise the internal parsing helpers and the webhook entry-point
without spinning up a real Telegram bot. The polling loop and HTTP-side
helpers are excluded from coverage because they require a live bot token.
"""
from __future__ import annotations
import asyncio
import contextlib
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any
from unittest.mock import AsyncMock, MagicMock
from agent_framework import AgentResponse, Content, Message
from agent_framework_hosting import (
AgentFrameworkHost,
ChannelCommand,
ChannelCommandContext,
ChannelRequest,
HostedRunResult,
)
from starlette.testclient import TestClient
from agent_framework_hosting_telegram import TelegramChannel, telegram_isolation_key
from agent_framework_hosting_telegram._channel import (
_parse_telegram_message,
_telegram_media_file_id,
)
# --------------------------------------------------------------------------- #
# Pure helpers #
# --------------------------------------------------------------------------- #
def test_telegram_isolation_key_format() -> None:
assert telegram_isolation_key(42) == "telegram:42"
assert telegram_isolation_key("abc") == "telegram:abc"
class TestMediaFileId:
def test_no_media(self) -> None:
assert _telegram_media_file_id({"text": "hi"}) is None
def test_photo_picks_largest(self) -> None:
assert _telegram_media_file_id({"photo": [{"file_id": "small"}, {"file_id": "large"}]}) == (
"large",
"image/jpeg",
)
def test_photo_empty_list(self) -> None:
assert _telegram_media_file_id({"photo": []}) is None
def test_document_uses_mime_type(self) -> None:
result = _telegram_media_file_id({"document": {"file_id": "f1", "mime_type": "application/pdf"}})
assert result == ("f1", "application/pdf")
def test_voice_default_mime(self) -> None:
result = _telegram_media_file_id({"voice": {"file_id": "v1"}})
assert result == ("v1", "audio/ogg")
class TestParseTelegramMessage:
async def test_text_only(self) -> None:
async def resolve(_: str) -> str | None:
return None
msg = await _parse_telegram_message({"text": "hello"}, resolve)
assert msg.role == "user"
assert msg.text == "hello"
async def test_text_and_photo(self) -> None:
async def resolve(file_id: str) -> str | None:
return f"https://files.telegram.org/{file_id}"
msg = await _parse_telegram_message({"caption": "look", "photo": [{"file_id": "p1"}]}, resolve)
assert msg.text == "look"
# Image content present.
assert any((getattr(c, "uri", None) or "").endswith("/p1") for c in msg.contents)
async def test_unresolvable_media_falls_back_to_text(self) -> None:
async def resolve(_: str) -> str | None:
return None
msg = await _parse_telegram_message({"text": "x", "voice": {"file_id": "v1"}}, resolve)
# Resolver returned None — the contents should still include the
# text without crashing.
assert msg.text == "x"
# --------------------------------------------------------------------------- #
# Webhook entry point #
# --------------------------------------------------------------------------- #
@dataclass
class _FakeAgentResponse:
text: str
class _FakeAgent:
def __init__(self, reply: str = "ok") -> None:
self._reply = reply
self.runs: list[Any] = []
def create_session(self, *, session_id: str | None = None) -> Any:
return {"session_id": session_id}
def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any:
self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs})
async def _coro() -> _FakeAgentResponse:
return _FakeAgentResponse(text=self._reply)
return _coro()
def _run_result(text: str) -> HostedRunResult[AgentResponse]:
return HostedRunResult(AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text(text=text)])]))
def _make_telegram(stream_default: bool = False) -> tuple[TelegramChannel, _FakeAgent]:
agent = _FakeAgent("hi")
ch = TelegramChannel(
bot_token="123:abc",
webhook_url="https://example.com/hook",
secret_token="s3cr3t",
stream=stream_default,
)
# Replace the internal HTTP client with an AsyncMock so the channel
# never tries to call the real Telegram API.
fake_http = MagicMock()
# post() returns a response object whose raise_for_status() is sync.
response_mock = MagicMock()
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
fake_http.post = AsyncMock(return_value=response_mock)
fake_http.get = AsyncMock(return_value=response_mock)
fake_http.aclose = AsyncMock()
ch._http = fake_http
return ch, agent
class TestTelegramWebhook:
def test_webhook_accepts_text_message_and_dispatches_to_agent(self) -> None:
ch, agent = _make_telegram()
host = AgentFrameworkHost(target=agent, channels=[ch])
# Skip lifespan so polling/setWebhook are not invoked.
with TestClient(host.app) as client:
r = client.post(
"/telegram/webhook",
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hello"}},
headers={"x-telegram-bot-api-secret-token": "s3cr3t"},
)
assert r.status_code == 200
assert agent.runs, "expected the agent to be invoked"
def test_webhook_rejects_bad_secret(self) -> None:
ch, agent = _make_telegram()
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post(
"/telegram/webhook",
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hi"}},
headers={"x-telegram-bot-api-secret-token": "WRONG"},
)
assert r.status_code == 401
assert not agent.runs
async def test_response_hook_can_rewrite_originating_reply(self) -> None:
contexts: list[Any] = []
def hook(result: HostedRunResult, **kwargs: Any) -> HostedRunResult:
contexts.append(kwargs["context"])
return HostedRunResult(_FakeAgentResponse(text=result.result.text.upper()), session=result.session)
ch, agent = _make_telegram()
ch.response_hook = hook
class _Ctx:
target: Any = agent
async def run(self, _request: ChannelRequest) -> HostedRunResult:
return HostedRunResult(_FakeAgentResponse(text="hi"))
async def deliver_response(self, _request: ChannelRequest, _payload: HostedRunResult) -> bool:
return True
ch._ctx = _Ctx() # type: ignore[assignment] # pyright: ignore[reportPrivateUsage]
request = ChannelRequest(channel="telegram", operation="message.create", input="hi", stream=False)
await ch._dispatch(99, request) # pyright: ignore[reportPrivateUsage]
assert ch._http is not None
args, kwargs = ch._http.post.call_args # type: ignore[attr-defined]
assert args[0].endswith("/sendMessage")
assert kwargs["json"]["text"] == "HI"
assert contexts
assert contexts[0].channel_name == "telegram"
assert contexts[0].originating is True
assert contexts[0].destination_identity is None
class TestPushAndCommand:
async def test_push_calls_send(self) -> None:
ch, _agent = _make_telegram()
from agent_framework_hosting import ChannelIdentity
await ch.push(ChannelIdentity(channel="telegram", native_id="42"), _run_result("hi"))
assert ch._http is not None
ch._http.post.assert_called() # type: ignore[attr-defined]
args, kwargs = ch._http.post.call_args # type: ignore[attr-defined]
assert args[0].endswith("/sendMessage")
assert kwargs["json"]["chat_id"] in ("42", 42)
assert kwargs["json"]["text"] == "hi"
async def test_command_handler_invoked(self) -> None:
captured: list[ChannelCommandContext] = []
async def handler(ctx: ChannelCommandContext) -> None:
captured.append(ctx)
await ctx.reply("pong")
ch = TelegramChannel(
bot_token="123:abc",
webhook_url="https://example.com/hook",
commands=[ChannelCommand(name="ping", description="ping", handle=handler)],
register_native_commands=False,
)
fake_http = MagicMock()
response_mock = MagicMock()
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
fake_http.post = AsyncMock(return_value=response_mock)
fake_http.get = AsyncMock(return_value=response_mock)
fake_http.aclose = AsyncMock()
ch._http = fake_http
host = AgentFrameworkHost(target=_FakeAgent(), channels=[ch])
with TestClient(host.app) as client:
r = client.post(
"/telegram/webhook",
json={"update_id": 2, "message": {"chat": {"id": 7}, "text": "/ping"}},
)
assert r.status_code == 200
assert captured and captured[0].request.operation == "command.invoke"
# --------------------------------------------------------------------------- #
# Per-chat serial ordering #
# --------------------------------------------------------------------------- #
class TestPerChatOrdering:
async def test_updates_for_same_chat_run_serially(self) -> None:
"""Two updates for the same chat must process in arrival order."""
ch, _ = _make_telegram()
order: list[int] = []
slow_event = asyncio.Event()
async def fake_process(update: Mapping[str, Any]) -> None:
uid = update.get("update_id")
assert isinstance(uid, int)
if uid == 1:
# Block the first update so the second is queued behind it.
await slow_event.wait()
order.append(uid)
ch._process_update = fake_process # type: ignore[method-assign]
ch._enqueue_update({"update_id": 1, "message": {"chat": {"id": 100}, "text": "first"}})
ch._enqueue_update({"update_id": 2, "message": {"chat": {"id": 100}, "text": "second"}})
# Let the worker start the first update.
await asyncio.sleep(0)
assert order == [] # blocked on slow_event
slow_event.set()
# Drain.
worker = ch._chat_workers[100]
# Wait for the queue to empty.
await ch._chat_queues[100].join()
# Cleanup
worker.cancel()
with contextlib.suppress(asyncio.CancelledError):
await worker
assert order == [1, 2]
async def test_updates_for_different_chats_run_in_parallel(self) -> None:
"""Different chats get separate workers and can interleave freely."""
ch, _ = _make_telegram()
started: list[int] = []
gate_a = asyncio.Event()
async def fake_process(update: Mapping[str, Any]) -> None:
uid = update.get("update_id")
assert isinstance(uid, int)
started.append(uid)
if uid == 1:
await gate_a.wait()
ch._process_update = fake_process # type: ignore[method-assign]
ch._enqueue_update({"update_id": 1, "message": {"chat": {"id": 1}, "text": "a"}})
ch._enqueue_update({"update_id": 2, "message": {"chat": {"id": 2}, "text": "b"}})
# Both should be admitted into their respective workers despite
# update 1 being blocked.
await asyncio.sleep(0)
# Update 2 finishes; update 1 still blocked.
assert 2 in started
gate_a.set()
for cid in (1, 2):
await ch._chat_queues[cid].join()
for w in ch._chat_workers.values():
w.cancel()
with contextlib.suppress(asyncio.CancelledError):
await w
# --------------------------------------------------------------------------- #
# Webhook ack-before-run + shutdown drains workers #
# --------------------------------------------------------------------------- #
class TestWebhookAckBeforeRun:
async def test_webhook_returns_200_before_agent_completes(self) -> None:
"""The webhook must ack before the agent runs, to dodge Telegram's 60s redelivery."""
ch, _ = _make_telegram()
from starlette.requests import Request
agent_started = asyncio.Event()
agent_release = asyncio.Event()
async def fake_process(update: Mapping[str, Any]) -> None:
agent_started.set()
await agent_release.wait()
ch._process_update = fake_process # type: ignore[method-assign]
async def receive() -> Any:
payload = b'{"update_id":1,"message":{"chat":{"id":42},"text":"hi"}}'
return {"type": "http.request", "body": payload, "more_body": False}
scope = {
"type": "http",
"method": "POST",
"path": "/telegram/webhook",
"headers": [(b"x-telegram-bot-api-secret-token", b"s3cr3t")],
"query_string": b"",
}
request = Request(scope, receive=receive)
# Drive the webhook handler. Even though the agent won't complete
# (gate_a still cleared) the webhook must still 200 promptly.
resp = await ch._handle(request)
assert resp.status_code == 200
# The agent task is in flight but not finished — proves ack came first.
await asyncio.wait_for(agent_started.wait(), timeout=1.0)
assert not agent_release.is_set()
# Cleanup: release the agent and drain.
agent_release.set()
await ch._chat_queues[42].join()
for w in list(ch._chat_workers.values()):
w.cancel()
with contextlib.suppress(asyncio.CancelledError):
await w
class TestShutdownDrainsWorkers:
async def test_shutdown_cancels_in_flight_chat_workers(self) -> None:
"""`_on_shutdown` must drain per-chat workers, not leak them."""
ch, _ = _make_telegram()
forever = asyncio.Event()
async def stuck(update: Mapping[str, Any]) -> None:
await forever.wait()
ch._process_update = stuck # type: ignore[method-assign]
ch._enqueue_update({"update_id": 9, "message": {"chat": {"id": 1}, "text": "a"}})
await asyncio.sleep(0)
assert ch._chat_workers and ch._update_tasks
await ch._on_shutdown()
assert not ch._chat_workers
assert not ch._update_tasks
+2
View File
@@ -87,6 +87,7 @@ agent-framework-gemini = { workspace = true }
agent-framework-github-copilot = { workspace = true }
agent-framework-hosting = { workspace = true }
agent-framework-hosting-invocations = { workspace = true }
agent-framework-hosting-telegram = { workspace = true }
agent-framework-hyperlight = { workspace = true }
agent-framework-lab = { workspace = true }
agent-framework-mem0 = { workspace = true }
@@ -211,6 +212,7 @@ executionEnvironments = [
{ root = "packages/github_copilot/tests", reportPrivateUsage = "none" },
{ root = "packages/hosting/tests", reportPrivateUsage = "none" },
{ root = "packages/hosting-invocations/tests", reportPrivateUsage = "none" },
{ root = "packages/hosting-telegram/tests", reportPrivateUsage = "none" },
{ root = "packages/lab/gaia/tests", reportPrivateUsage = "none" },
{ root = "packages/lab/lightning/tests", reportPrivateUsage = "none" },
{ root = "packages/lab/tau2/tests", reportPrivateUsage = "none" },
+12
View File
@@ -50,6 +50,7 @@ members = [
"agent-framework-hosting",
"agent-framework-hosting-responses",
"agent-framework-hosting-invocations",
"agent-framework-hosting-telegram",
"agent-framework-hyperlight",
"agent-framework-lab",
"agent-framework-mem0",
@@ -644,6 +645,16 @@ provides-extras = ["serve", "disk"]
[package.metadata.requires-dev]
dev = [{ name = "httpx", specifier = ">=0.28.1" }]
[[package]]
name = "agent-framework-hosting-telegram"
version = "1.0.0a260424"
source = { editable = "packages/hosting-telegram" }
dependencies = [
{ name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "httpx", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
]
[[package]]
name = "agent-framework-hosting-responses"
version = "1.0.0a260424"
@@ -667,6 +678,7 @@ source = { editable = "packages/hosting-invocations" }
dependencies = [
{ name = "agent-framework-core", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "agent-framework-hosting", marker = "sys_platform == 'darwin' or sys_platform == 'linux' or sys_platform == 'win32'" },
{ name = "httpx", specifier = ">=0.27,<1" },
]
[[package]]