Python: feat(python): cross-channel hosting improvements (endpoint paths, Activity push, Telegram/Teams fixes) (#6307)

* Update hosting channel endpoint paths

Treat channel paths as concrete endpoint paths so built-in channels can be mounted at their defaults or at the app root without sample-specific subclasses. Update docs, tests, and the Foundry Telegram Invocations sample accordingly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Add push support to ActivityProtocolChannel

Implement the ChannelPush protocol so the Activity Protocol channel can
receive cross-channel fan-out (ResponseTarget.all_linked) and echo_input
replay as a non-originating destination:

- Add push() that reconstructs a proactive Bot Framework activity (bot/user
  swap) from the stored conversation reference and POSTs it to
  /v3/conversations/{id}/activities.
- Record a ChannelIdentity (service_url, conversation, bot, user, channel_id,
  locale) on ChannelRequest.identity so the host registers the channel under
  its isolation key for fan-out resolution.
- Route the streaming path through deliver_response so Activity-originated
  turns broadcast like Telegram/Discord.
- Add tests for push delivery, service_url validation, ChannelPush instance
  check, and inbound identity recording.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Don't delete Telegram webhook on shutdown by default

The TelegramChannel deleted its webhook on shutdown in webhook mode. During
a rolling redeploy the new revision registers the webhook on startup, then
the old revision's shutdown deletes it, silently breaking inbound delivery
until the next boot. setWebhook is overwriting/idempotent, so startup
re-asserts the webhook every boot and no teardown is needed.

Add a delete_webhook_on_shutdown flag (default False) so teardown is opt-in
for ephemeral deployments, and leave the webhook in place otherwise.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix Activity channel streaming on non-Teams channels (405 on updateActivity)

The Activity Protocol channel streamed replies the Teams way: POST a
placeholder, then PUT-edit it as tokens arrive. Only Teams supports the
updateActivity REST op; Web Chat, Direct Line and the Emulator return
405 Method Not Allowed on the PUT, so the user saw only the placeholder.

Gate the placeholder+edit flow on edit-capable channels (msteams). Other
channels now buffer the stream and POST a single final message, mirroring
the non-streaming path's fan-out and response-hook semantics. Also add a
defensive 405 fallback inside the Teams edit loop so an unexpected 405
can never strand the user on the placeholder.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(hosting-activity-protocol): don't parse Teams inline attachment content as a URI

Teams message activities include a text/html attachment whose inline
`content` is raw HTML (not a URL). _parse_activity fell back to
`attachment["content"]` and passed it to Content.from_uri, raising
ContentError ("URI must contain a scheme") and failing the whole turn,
so Teams users got no response.

Only treat `contentUrl` as a URI, require an absolute scheme, and skip
unparseable attachments defensively instead of failing the message.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(hosting-activity-protocol): native slash-command dispatch for Teams/Activity

Add a commands= parameter to ActivityProtocolChannel that intercepts a
leading /command (after stripping the bot's own @mention) and dispatches
to ChannelCommand handlers, mirroring the Telegram channel. Unknown
commands fall through to the agent. The channel run_hook is applied to
command requests so handlers observe the same resolved isolation key as
ordinary messages, and handler errors are swallowed (200, no Bot Service
retry of non-idempotent commands).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* feat(hosting): silent attributed Telegram echoes + Teams markdown rendering

- hosting-telegram: send cross-channel input echoes with disable_notification
  (silent) and detect echo payloads so they aren't re-broadcast.
- hosting-activity-protocol: render outbound + push activities as textFormat
  'markdown' so Teams shows formatted replies (enables per-channel variants).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(hosting-activity-protocol): address PR #6307 review feedback

Consult the host delivery pipeline even for empty streamed replies so
ResponseTarget.none is honoured and non-originating fan-out is consulted
instead of always emitting an originating "(no response)" message. Applies
to both the progressive-edit (Teams) and buffered (Web Chat/Direct Line)
streaming paths.

Re-validate service_url against the allow-list in push(): the identity is
read from a persisted store and push runs out-of-band, so the captured
service_url must be re-checked before a bearer token is sent.

Adds tests for empty-stream host consultation/suppression on both streaming
paths and for push rejecting a disallowed service_url.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Eduard van Valkenburg
2026-06-03 16:37:03 +02:00
committed by GitHub
Unverified
parent e8c22caaeb
commit e5a6e35843
33 changed files with 1449 additions and 133 deletions
+2 -2
View File
@@ -118,7 +118,7 @@ We will introduce a new hosting core distribution package per language. The full
- **Intent-only delivery envelope + pluggable `DurableTaskRunner`** — assistant messages stored by the host carry an `intended_targets[]` array on `Message.additional_properties["hosting"]` capturing the resolved destination set (after `ResponseTarget` + `LinkPolicy` filtering). The write is **immutable** — a single record of intent, never mutated post-push. Per-destination operational state (attempts, retries, last error, success timestamp, channel-issued id) lives in a pluggable `DurableTaskRunner` (`register` / `schedule` / `get`) that the host uses to fan out non-originating pushes. Built-in `InProcessTaskRunner` (asyncio + bounded retry) is the default for `long_running` deployments; adapter packages (`agent-framework-hosting-durabletask`, future Foundry adapter) plug in for `ephemeral` deployments. Replay is a property of the configured runner — native for durable adapters, not supported for in-process. This eliminates the earlier `pending`/`delivered`/`failed`/`skipped` state machine, the `SupportsDeliveryTracking` provider capability, and the Foundry `update_item` service ask.
- **Caller-supplied vs. host-tracked session carriage** — channels split into two families based on whether the upstream protocol carries a per-conversation key on every request. *Caller-supplied* channels (Responses' `previous_response_id`, Invocations, A2A, MCP) parse it into `ChannelSession.key` and let the caller branch threads by sending fresh ids. *Host-tracked* channels (Telegram, Activity Protocol via Azure Bot Service — Teams/Web Chat/Slack/…— WhatsApp) carry only a stable identity and rely on the host's per-`isolation_key` session alias plus a `host.reset_session(...)` `/new`-style command. The split is invisible to the agent target and explains why `reset_session` and aliasing exist at all (host-tracked channels have no other way to start a fresh thread). Anonymous vs. identified is an orthogonal axis; identity is supplied by the channel, the resolver, or both.
- **Multi-user surfaces are first-class.** Telegram groups, supergroups, forum topics, and Activity Protocol multi-user `conversationType`s (`groupChat`, `channel`) are designed-in from v1 — not retrofitted. The contract enforces a clean separation of **user identity** (`ChannelIdentity.native_id` = `from.id` / `from.aadObjectId`) and **conversation locator** (`ChannelRequest.conversation_id` = `chat.id` (+ optional `message_thread_id` / `replyToId`)). Channel implementations expose a `conversation_scope` option (`per_user`, `per_user_per_conversation` (default in groups), `per_conversation`) and an `accept_in_group` addressing rule (`mention_only` (default), `command_only`, `mention_or_command`, `all`) so the bot does not respond to every message in a group and so a single user's group context does not leak into their DM by default. Linker challenge messages (OAuth URL / one-time code) MUST redirect to the user's DM in group contexts.
- **Built-in channels** — own their protocol-defined relative routes under default mount roots (`/responses/v1`, `/invocations/invoke`, `/telegram/webhook`) without the app author spelling those out.
- **Built-in channels** — own their protocol-defined endpoint paths (`/responses`, `/invocations`, `/telegram/webhook`) without the app author spelling those out.
Channel implementations live in **separate distribution packages**, one per channel, with public surfaces kept stable per language.
@@ -317,7 +317,7 @@ These terms are language-neutral and shared between Python and .NET implementati
The decision is validated when, in each implementing language:
1. a one-channel Responses sample and a two-channel Responses + Invocations sample can be expressed with one host, default route layouts under `/responses/v1` and `/invocations/invoke`, and no handwritten protocol routing,
1. a one-channel Responses sample and a two-channel Responses + Invocations sample can be expressed with one host, default route layouts under `/responses` and `/invocations`, and no handwritten protocol routing,
2. a Responses channel by default forwards official request parameters like `temperature` into agent options and maps `store=False` into disabled session use,
3. app authors can override that default per request with an run hook that validates or rewrites the final `ChannelRequest` (for example requiring `temperature`, ignoring `store`, or adapting for `A2AAgent`),
4. a Telegram-style message channel can express command metadata, command registration, and either webhook or polling lifecycle behavior through the new channel contract,
+21 -21
View File
@@ -109,10 +109,10 @@ The hosting core is deliberately **not** a replacement for the existing protocol
After we deliver `agent-framework-hosting` and its first channel packages, users will be able to:
1. **Compose one host with one or more channels** — instantiate `AgentFrameworkHost(target=..., channels=[...])` where `target` is either a `SupportsAgentRun`-compatible agent or a `Workflow`, and get one Starlette application with all channels mounted.
2. **Expose the Responses API** — add `ResponsesChannel()` and serve `/responses/v1` (and conversation routes) without writing protocol handlers.
3. **Expose the Invocations API** — add `InvocationsChannel()` and serve `/invocations/invoke` without writing protocol handlers.
2. **Expose the Responses API** — add `ResponsesChannel()` and serve `/responses` without writing protocol handlers.
3. **Expose the Invocations API** — add `InvocationsChannel()` and serve `/invocations` without writing protocol handlers.
4. **Expose a Telegram bot** — add `TelegramChannel(bot_token=...)` with either `polling` or `webhook` transport, and register native commands declaratively with `ChannelCommand`.
5. **Override mount roots without breaking protocol paths** — pass `path="/public/responses"` and the channel still owns the protocol-relative suffix (`/v1`, `/invoke`, `/webhook`).
5. **Override endpoint paths** — pass `path="/public/responses"` to move a channel endpoint, or `path=""` when an external platform must call the app root.
6. **Customize per-request invocation behavior** — pass a `run_hook` to any built-in channel. The hook receives the channel-produced `ChannelRequest` (the host-neutral envelope each channel builds from its own protocol parsing — see [Key Types](#key-types)) and returns a possibly-modified `ChannelRequest`. Use it to validate, rewrite, or strip channel-derived options (e.g. enforce or drop `temperature`, override `session_mode`) before the host calls the target's execution seam. It is also the **adapter** that reshapes the channel's default `ChannelRequest.input` into the typed inputs a workflow target requires.
7. **Control session use per request** — built-in channels set `ChannelRequest.session_mode` to `auto`, `required`, or `disabled`; the host honors that when resolving `AgentSession`.
8. **Partition sessions by isolation key** — channels populate `ChannelSession.isolation_key` (user, tenant, chat, …) using hosted-agent terminology.
@@ -121,7 +121,7 @@ After we deliver `agent-framework-hosting` and its first channel packages, users
11. **Link a new channel to an existing identity through a well-known ceremony** — the host accepts a host-level `identity_linker` (e.g. `OAuthIdentityLinker(...)`, `OneTimeCodeIdentityLinker(...)`) which contributes its own routes/lifecycle and exposes a `begin(channel_identity) -> LinkChallenge` / `complete(challenge_id, proof) -> isolation_key` flow. Channels surface a `link`/`connect` `ChannelCommand` that delegates to the linker; on success the resolver subsequently maps the new channel-native identity to the existing `isolation_key`. Mechanism (OAuth provider, signed one-time code, future linker types) is pluggable; the contract is fixed.
12. **Route the response to a chosen channel**`ChannelRequest.response_target` accepts `ResponseTarget.originating` (default — synchronous response on the originating channel), `ResponseTarget.active` (the channel most recently observed for the resolved `isolation_key`), `ResponseTarget.channel("activity")` (specific channel id, recipient resolved from the link store), `ResponseTarget.channels([...])` (a list), `ResponseTarget.identities([ChannelIdentity(...)])` (one or more **explicit channel-native identities** — bypasses the link store, used when the caller already knows the recipient's channel-native id), `ResponseTarget.all_linked` (every channel where this `isolation_key` is known), or `ResponseTarget.none` (background-only — caller must poll the `ContinuationToken`). When the target is not the originating channel, the host delivers via the destination channel's `ChannelPush` capability.
13. **Push proactively from a channel** — channels that can deliver outbound messages without a prior request (Telegram bot proactive message, Activity Protocol proactive message via Azure Bot Service, webhook callbacks, SSE broadcasts) implement an optional `ChannelPush` capability on top of the base `Channel` protocol. Channels without push can only be the `originating` target.
14. **Submit background runs as a first-class operation**`host.run_in_background(request) -> ContinuationToken` returns immediately with an opaque, URL-safe `token` and a status (`queued` | `running` | `completed` | `failed`). The host invokes the target asynchronously and, when complete, both delivers the result via the configured `ResponseTarget` push **and** records it against the token so callers can poll `host.get_continuation(token)`. Built-in channels expose poll routes (`/responses/v1/{continuation_token}`, `/invocations/{continuation_token}`) that surface this without app code. Continuation tokens are persisted via a `HostStateStore` (file-based by default — see [Host state storage](#host-state-storage)) so background runs survive host restarts.
14. **Submit background runs as a first-class operation**`host.run_in_background(request) -> ContinuationToken` returns immediately with an opaque, URL-safe `token` and a status (`queued` | `running` | `completed` | `failed`). The host invokes the target asynchronously and, when complete, both delivers the result via the configured `ResponseTarget` push **and** records it against the token so callers can poll `host.get_continuation(token)`. Built-in channels expose poll routes (`/responses/{continuation_token}`, `/invocations/{continuation_token}`) that surface this without app code. Continuation tokens are persisted via a `HostStateStore` (file-based by default — see [Host state storage](#host-state-storage)) so background runs survive host restarts.
15. **Track the active channel per `isolation_key`** — the host records `(isolation_key, last_seen_channel, last_seen_at)` on every successfully resolved request so `ResponseTarget.active` resolves correctly. Apps can override in the `run_hook` (e.g. force `active` to a specific channel for a particular request).
16. **Add Starlette middleware at the host level** — pass `middleware=[Middleware(CORSMiddleware, ...)]` to `AgentFrameworkHost`.
17. **Serve with one call** — call `host.serve(host="localhost", port=8000)` without manually importing `uvicorn`, while `host.app` remains the canonical ASGI surface for any other server (Hypercorn, Daphne, Granian, Gunicorn+uvicorn workers).
@@ -253,20 +253,20 @@ The split is between distribution packages. The **public import path stays stabl
### Built-in routes
For built-in channels, `path` is the configurable mount root, not the full final endpoint. The channel package owns the fixed protocol-relative suffix.
For built-in channels, `path` is the configurable endpoint root. Use `path=""` when an external platform requires that channel at the app root.
| Channel | Default `path` | Default exposed route(s) |
| --- | --- | --- |
| `ResponsesChannel` | `/responses` | `/responses/v1` and nested responses/conversation routes below it |
| `InvocationsChannel` | `/invocations` | `/invocations/invoke` |
| `TelegramChannel` | `/telegram` | webhook mode: `/telegram/webhook`; polling mode: no required HTTP route |
| `ResponsesChannel` | `/responses` | `/responses` |
| `InvocationsChannel` | `/invocations` | `/invocations` |
| `TelegramChannel` | `/telegram/webhook` | webhook mode: `/telegram/webhook`; polling mode: no required HTTP route |
Overrides only replace the outer mount root:
Overrides replace the endpoint path:
```python
ResponsesChannel(path="/public/responses") # -> /public/responses/v1
InvocationsChannel(path="/internal/invocations") # -> /internal/invocations/invoke
TelegramChannel(path="/bots/telegram", bot_token=token) # -> /bots/telegram/webhook
ResponsesChannel(path="/public/responses") # -> /public/responses
InvocationsChannel(path="/internal/invocations") # -> /internal/invocations
TelegramChannel(path="/bots/telegram/webhook", bot_token=token) # -> /bots/telegram/webhook
```
### Key Types
@@ -695,7 +695,7 @@ When `response_target` is anything other than `originating`, the originating cha
| `error` | `str?` | Populated on `failed`. |
| `response_target` | `ResponseTarget` | The configured delivery target (recorded for diagnostics). |
The host stores `ContinuationToken`s through a `HostStateStore` (see [Host state storage](#host-state-storage)). The v1 default is **`FileHostStateStore`** — one JSON file per token under a configurable directory (default `./.af-hosting/continuations/`), written atomically (`.tmp` + `os.replace`) so a host crash mid-write doesn't corrupt the record. This means background runs **survive host restarts**: a caller that polls `/responses/v1/{continuation_token}` after the process recycles still gets a valid status (and the result if the run had completed before the crash). Completed/failed entries are evicted by a configurable TTL (default 24h). `InMemoryHostStateStore` is available for tests / ephemeral hosts. Built-in channels expose poll routes that surface the token in their native shape (`/responses/v1/{continuation_token}` returns a Responses-shaped object; `/invocations/{continuation_token}` returns the Invocations status envelope).
The host stores `ContinuationToken`s through a `HostStateStore` (see [Host state storage](#host-state-storage)). The v1 default is **`FileHostStateStore`** — one JSON file per token under a configurable directory (default `./.af-hosting/continuations/`), written atomically (`.tmp` + `os.replace`) so a host crash mid-write doesn't corrupt the record. This means background runs **survive host restarts**: a caller that polls `/responses/{continuation_token}` after the process recycles still gets a valid status (and the result if the run had completed before the crash). Completed/failed entries are evicted by a configurable TTL (default 24h). `InMemoryHostStateStore` is available for tests / ephemeral hosts. Built-in channels expose poll routes that surface the token in their native shape (`/responses/{continuation_token}` returns a Responses-shaped object; `/invocations/{continuation_token}` returns the Invocations status envelope).
#### Host state storage
@@ -1058,7 +1058,7 @@ if __name__ == "__main__":
host.serve(host="localhost", port=8000)
```
This exposes the Responses routes under `/responses/v1`. No manual `uvicorn` import, no protocol handlers written by the user.
This exposes the Responses route under `/responses`. No manual `uvicorn` import, no protocol handlers written by the user.
### Scenario 2: Expose Responses + Invocations on one host with shared Starlette middleware
@@ -1090,8 +1090,8 @@ agent = Agent(
host = AgentFrameworkHost(
target=agent,
channels=[
ResponsesChannel(), # -> /responses/v1
InvocationsChannel(), # -> /invocations/invoke
ResponsesChannel(), # -> /responses
InvocationsChannel(), # -> /invocations
],
middleware=[
Middleware(
@@ -1211,8 +1211,8 @@ Same agent, three channels, one Starlette app, one process.
host = AgentFrameworkHost(
target=agent,
channels=[
ResponsesChannel(), # -> /responses/v1
InvocationsChannel(), # -> /invocations/invoke
ResponsesChannel(), # -> /responses
InvocationsChannel(), # -> /invocations
TelegramChannel(
bot_token=os.environ["TELEGRAM_BOT_TOKEN"],
transport="webhook", # -> /telegram/webhook
@@ -1417,7 +1417,7 @@ async def telegram_promote_app_user(request: ChannelRequest, **_) -> ChannelRequ
)
# The application backend POSTs to /responses/v1/responses with
# The application backend POSTs to /responses with
#
# {
# "model": "...",
@@ -1862,7 +1862,7 @@ channel /link command
| Concern | Owned by | Notes |
|---|---|---|
| HTTP / WebSocket route shape | Channel package | e.g. `/responses/v1`, `/responses/ws`, `/invocations/invoke`, `/telegram/webhook` — channels may contribute either or both |
| HTTP / WebSocket route shape | Channel package | e.g. `/responses`, `/responses/ws`, `/invocations`, `/telegram/webhook` — channels may contribute either or both |
| Protocol request model | Channel package | e.g. Responses items (HTTP body or WS frames), Invocations body, Telegram webhook payload |
| Signature/auth validation | Channel package or host middleware | channel-specific unless generic Starlette middleware |
| Request-to-agent invocation mapping | Channel package + optional `run_hook` | forwards caller parameters into `ChannelRequest.options`, chooses `session_mode`, can enforce extra app policy |
@@ -1882,7 +1882,7 @@ channel /link command
| Proactive outbound delivery | Channel package via optional `ChannelPush` capability | channels that can push (Telegram, Activity Protocol via Bot Service, webhook, SSE) implement `push(identity, result)`; channels that can't are only valid as `originating` targets |
| Per-delivery audit + replay state | Host core writes intent-only — the resolved destination set onto the assistant `Message.additional_properties["hosting"]["intended_targets"]` (immutable, single write). Operational state (attempts, retries, last error, success timestamp) lives in the `DurableTaskRunner` and is observed via the runner's own backend. | Replay across host restarts is a property of the configured runner (native for durable adapters; not supported for `InProcessTaskRunner`). See [Intended targets + durable delivery](#intended-targets--durable-delivery) and [Durable task runner](#durable-task-runner). |
| Background-run lifecycle | Host core | owns `ContinuationToken` issuance, async execution, completion notification; persists via `HostStateStore` (file-based default — survives restarts) |
| Run poll routes | Channel package | each channel exposes its own protocol-shaped poll route (`/responses/v1/{continuation_token}`, `/invocations/{continuation_token}`) backed by `host.get_continuation(token)` |
| Run poll routes | Channel package | each channel exposes its own protocol-shaped poll route (`/responses/{continuation_token}`, `/invocations/{continuation_token}`) backed by `host.get_continuation(token)` |
| Conversation history (all channels — Responses, Invocations, Telegram, Activity Protocol, …) | Agent's core `HistoryProvider` (`agent_framework._sessions.HistoryProvider`) | Channels project their wire id (`previous_response_id`, `conversation_id`, request body `session_id`, host-tracked alias, …) into `ChannelSession.key`; the host resolves an `AgentSession` and the agent's `HistoryProvider` does the load / append. No channel-specific history seam. Multi-provider composition (with a single `load_messages=True`) is the standard AF convention; see [Conversation history for the Responses channel](#conversation-history-for-the-responses-channel) for the Foundry-backed variant. |
| Channel-owned non-message per-thread state (e.g. AG-UI `client_state`) | Channel-shipped `ContextProvider` subclass written into the same per-source state slot | Reuses the existing `ContextProvider` seam — *not* a new storage protocol. Channel reads `ChannelRequest.client_state` in `before_run`, lets the agent observe/mutate the slot, then reads the post-run value in `after_run` to emit channel-specific events (e.g. AG-UI `StateSnapshotEvent` / `StateDeltaEvent`). Composition rules unchanged (one `HistoryProvider` carries `load_messages=True`; additional `ContextProvider`s attach alongside). See [Channel-owned per-thread state](#channel-owned-per-thread-state). |
| Agent invocation | Host core | always through the target's execution seam — `SupportsAgentRun.run(...)` for agent targets, `Workflow.run(...)` for workflow targets |
+15
View File
@@ -21,6 +21,21 @@ When making changes to a package, check if the following need updates:
- The package's `AGENTS.md` file (adding/removing/renaming public APIs, architecture changes, import path changes)
- The agent skills in `.github/skills/` if conventions, commands, or workflows change
At the end of every run, re-read `AGENTS.md` and the relevant skill files and
update any guidance that the conversation revealed to be out of date,
incomplete, or misleading (renamed files, changed commands, new conventions
the user confirmed, etc.). **Before adding a new principle or rule, ask the
user whether they want it captured as a durable principle** — do not invent
team norms from a single conversation without explicit confirmation.
## Terminology
- **Avoid "GA" for Agent Framework code.** Reserve *GA* for hosted services
(e.g. "the Foundry service is GA"). For Agent Framework packages, features,
and APIs use **"released"** or **"stable"** depending on context — these
match the feature-lifecycle stages documented in the
`python-feature-lifecycle` skill.
## Pull Request Description Guidance
When preparing a PR description:
@@ -23,7 +23,10 @@ This channel handles:
- inbound ``message`` activities — text and attachments resolved to URIs,
- outbound replies via ``POST /v3/conversations/{id}/activities``,
- streaming via ``PUT /v3/conversations/{id}/activities/{id}`` mid-stream
edits (Teams supports updateActivity in personal chats and groups),
edits on channels that support ``updateActivity`` (Teams personal chats
and groups); every other channel — Web Chat, Direct Line, the Emulator —
rejects the PUT with ``405``, so those buffer the stream and POST a
single final message instead,
- typing indicators while the agent works,
- per-conversation isolation key ``activity:<conversation_id>`` so a Responses
caller can resume a Teams conversation by passing the conversation id,
@@ -67,7 +70,7 @@ from __future__ import annotations
import asyncio
import time
from collections.abc import Awaitable, Callable, Mapping
from collections.abc import Awaitable, Callable, Mapping, Sequence
from typing import Any
from urllib.parse import urlparse
@@ -79,9 +82,13 @@ from agent_framework import (
Message,
ResponseStream,
)
from agent_framework.exceptions import ContentError
from agent_framework_hosting import (
ChannelCommand,
ChannelCommandContext,
ChannelContext,
ChannelContribution,
ChannelIdentity,
ChannelRequest,
ChannelResponseContext,
ChannelResponseHook,
@@ -116,6 +123,16 @@ _DEFAULT_SERVICE_URL_HOSTS = (
"smba.trafficmanager.net",
)
# Bot Framework channels that support editing an Activity in place via
# ``PUT /v3/conversations/{id}/activities/{id}`` (the ``updateActivity``
# REST operation). Progressive-edit streaming (POST a placeholder, then
# repeatedly PUT it) only works on these. Every other channel — Web Chat,
# Direct Line, the Emulator, etc. — returns ``405 Method Not Allowed`` on
# the PUT, so those channels buffer the stream and POST a single final
# message instead. Teams is the canonical (and effectively only) public
# channel that supports the edit operation.
_EDIT_CAPABLE_CHANNELS = frozenset({"msteams"})
InboundAuthValidator = Callable[[Request], Awaitable[bool]]
@@ -134,13 +151,20 @@ class _OutboundError(RuntimeError):
"""Marker for transient outbound failures that should produce 502/retry."""
def _text_result(text: str) -> HostedRunResult[AgentResponse]:
"""Wrap plain text in a ``HostedRunResult`` for streaming fan-out delivery."""
return HostedRunResult(AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text(text=text)])]))
def _parse_activity(activity: Mapping[str, Any]) -> Message:
"""Translate one Bot Framework ``message`` Activity into an Agent Framework Message.
Pulls the activity's ``text`` plus any image/file attachments with a
``contentType`` and resolvable URL into ``Content`` parts. If the
activity has no usable parts an empty text part is emitted so the
caller never sees a content-less message.
Pulls the activity's ``text`` plus any image/file attachments that expose a
resolvable ``contentUrl`` into ``Content`` parts. Bot Framework's inline
``content`` field (e.g. the ``text/html`` rendering Teams attaches alongside
``text``, or an Adaptive Card payload) is *not* a URI, so it is ignored here
to avoid mis-parsing it as a URL. If the activity has no usable parts an
empty text part is emitted so the caller never sees a content-less message.
"""
parts: list[Content] = []
if (text := activity.get("text")) and isinstance(text, str):
@@ -149,16 +173,56 @@ def _parse_activity(activity: Mapping[str, Any]) -> Message:
for attachment in activity.get("attachments") or []:
if not isinstance(attachment, Mapping):
continue
url = attachment.get("contentUrl") or attachment.get("content")
url = attachment.get("contentUrl")
content_type = attachment.get("contentType")
if isinstance(url, str) and isinstance(content_type, str) and "/" in content_type:
if not (isinstance(url, str) and isinstance(content_type, str) and "/" in content_type):
continue
# contentUrl is occasionally a relative reference or otherwise lacks a
# scheme; skip those so one odd attachment can't fail the whole turn.
if not urlparse(url).scheme:
logger.debug("Skipping attachment with non-absolute contentUrl: %r", url)
continue
try:
parts.append(Content.from_uri(uri=url, media_type=content_type))
except ContentError:
logger.debug("Skipping attachment with unparseable contentUrl: %r", url)
continue
if not parts:
parts.append(Content.from_text(text=""))
return Message("user", parts)
def _command_text(activity: Mapping[str, Any]) -> str:
"""Return the activity text with the bot's own @mention stripped.
Channels that require an @mention to address the bot (Teams team and
group-chat scopes) prefix the message ``text`` with a mention whose literal
rendering is carried in the matching ``entities[].text`` (e.g.
``"<at>Personal Assistant</at> /todos"``). Personal 1:1 chats carry no
mention. We remove only the bot's own mention substring(s) — never other
users' mentions — so a leading ``/command`` can be detected in every scope.
"""
text = activity.get("text")
if not isinstance(text, str):
return ""
bot_id = (activity.get("recipient") or {}).get("id")
for entity in activity.get("entities") or []:
if not isinstance(entity, Mapping) or entity.get("type") != "mention":
continue
mentioned = entity.get("mentioned")
mentioned_id = mentioned.get("id") if isinstance(mentioned, Mapping) else None
# Only strip the bot's own mention; leave mentions of other users intact.
# When the recipient id is unknown we cannot disambiguate, so fall back
# to stripping every mention to keep command detection working.
if bot_id is not None and mentioned_id != bot_id:
continue
mention_text = entity.get("text")
if isinstance(mention_text, str) and mention_text:
text = text.replace(mention_text, "")
return text.strip()
class ActivityProtocolChannel:
"""Microsoft Teams channel via Bot Framework v4 webhook.
@@ -176,7 +240,7 @@ class ActivityProtocolChannel:
def __init__(
self,
*,
path: str = "/activity",
path: str = "/activity/messages",
app_id: str | None = None,
app_password: str | None = None,
certificate_path: str | None = None,
@@ -184,6 +248,7 @@ class ActivityProtocolChannel:
tenant_id: str = _BOTFRAMEWORK_TENANT,
token_scope: str = _BOTFRAMEWORK_SCOPE,
credential: AsyncTokenCredential | None = None,
commands: Sequence[ChannelCommand] = (),
run_hook: ChannelRunHook | None = None,
response_hook: ChannelResponseHook | None = None,
send_typing_action: bool = True,
@@ -196,7 +261,8 @@ class ActivityProtocolChannel:
"""Configure the Teams channel.
Keyword Args:
path: Mount path. The webhook lives at ``{path}/messages``.
path: Messages endpoint path on the host. Use ``""`` to expose the
webhook at the app root.
app_id: Bot Framework / Entra application (client) id. Required
whenever any credential is supplied.
app_password: Application secret for OAuth2 client credentials.
@@ -214,6 +280,16 @@ class ActivityProtocolChannel:
credential: Bring your own ``AsyncTokenCredential`` (e.g. a
``DefaultAzureCredential`` configured elsewhere). Overrides
``app_password`` / ``certificate_path``.
commands: Discoverable ``/command`` handlers. An inbound message
whose text (after stripping the bot's own @mention) begins with
``/`` and matches a command ``name`` (case-insensitive) is
dispatched to that handler instead of the agent, mirroring the
Telegram channel. The matching ``run_hook`` is applied to the
command request first, so command handlers observe the same
resolved ``session.isolation_key`` as ordinary messages.
Unknown ``/foo`` text falls through to the agent. Handlers reply
via ``ChannelCommandContext.reply``; surface them to users with
a Teams manifest ``commandLists`` entry.
run_hook: Optional rewrite of ``ChannelRequest`` before invocation.
response_hook: Optional rewrite of the
:class:`HostedRunResult` before the originating Activity
@@ -260,6 +336,7 @@ class ActivityProtocolChannel:
self._app_id = app_id
self._token_scope = token_scope
self._tenant_id = tenant_id
self._commands = list(commands)
self._hook = run_hook
self.response_hook = response_hook
self._send_typing_action = send_typing_action
@@ -292,10 +369,11 @@ class ActivityProtocolChannel:
self._credential = None # dev mode
def contribute(self, context: ChannelContext) -> ChannelContribution:
"""Capture the host context and register the ``POST /messages`` webhook."""
"""Capture the host context and register the messages webhook."""
self._ctx = context
return ChannelContribution(
routes=[Route("/messages", self._handle, methods=["POST"])],
routes=[Route("/", self._handle, methods=["POST"])],
commands=self._commands,
on_startup=[self._on_startup],
on_shutdown=[self._on_shutdown],
)
@@ -326,14 +404,14 @@ class ActivityProtocolChannel:
else:
cred_kind = type(self._credential).__name__
logger.info(
"ActivityProtocolChannel listening on %s/messages (auth=%s, tenant=%s)",
"ActivityProtocolChannel listening on %s (auth=%s, tenant=%s)",
self.path,
cred_kind,
self._tenant_id,
)
if self._inbound_auth_validator is None:
logger.warning(
"ActivityProtocolChannel %s/messages has no inbound_auth_validator — "
"ActivityProtocolChannel %s has no inbound_auth_validator — "
"the webhook will accept ANY caller. Plug an inbound_auth_validator "
"or terminate auth in front of the channel before exposing this "
"endpoint to a public network.",
@@ -464,12 +542,46 @@ class ActivityProtocolChannel:
logger.warning("Teams activity missing conversation.id or serviceUrl — dropping")
return
# Native command dispatch — a leading ``/command`` (after stripping the
# bot's own @mention) bypasses the agent, mirroring the Telegram channel.
# Unknown commands fall through to the agent as a normal message.
if self._commands:
command_text = _command_text(activity)
if command_text.startswith("/"):
tokens = command_text[1:].split()
if tokens:
command_name = tokens[0].split("@", 1)[0].lower()
handler = next((c for c in self._commands if c.name.lower() == command_name), None)
if handler is not None:
await self._invoke_command(activity, conversation_id, service_url, handler, command_text)
return
parsed = _parse_activity(activity)
# Store a Bot Framework conversation reference on the identity so the
# host can proactively ``push`` to this conversation later (fan-out
# from another channel). Recording the identity also registers this
# channel under the isolation key so ``ResponseTarget.all_linked`` /
# ``.active`` can resolve it.
identity = ChannelIdentity(
channel=self.name,
native_id=conversation_id,
attributes={
"service_url": service_url,
"conversation": dict(conversation),
# Inbound recipient is the bot → outbound ``from``; inbound
# ``from`` is the user → outbound ``recipient``.
"bot": dict(activity.get("recipient") or {}),
"user": dict(activity.get("from") or {}),
"channel_id": activity.get("channelId"),
"locale": activity.get("locale"),
},
)
channel_request = ChannelRequest(
channel=self.name,
operation="message.create",
input=[parsed],
session=ChannelSession(isolation_key=activity_protocol_isolation_key(conversation_id)),
identity=identity,
attributes={
"conversation_id": conversation_id,
"service_url": service_url,
@@ -489,6 +601,69 @@ class ActivityProtocolChannel:
await self._dispatch(activity, channel_request)
async def _invoke_command(
self,
activity: Mapping[str, Any],
conversation_id: str,
service_url: str,
handler: ChannelCommand,
command_text: str,
) -> None:
"""Run a matched ``/command`` handler and reply into the conversation.
The command request mirrors the message-path request (same isolation
key, identity and attributes) and is run through the channel ``run_hook``
first, so handlers observe the same resolved ``session.isolation_key`` as
ordinary messages. Handler/reply failures are logged but never raised:
commands are best-effort, and surfacing a 502 would make Bot Service
retry the inbound activity and re-run a non-idempotent command.
"""
if self._ctx is None: # pragma: no cover - guarded by lifecycle
raise RuntimeError("activity channel not started")
identity = ChannelIdentity(
channel=self.name,
native_id=conversation_id,
attributes={
"service_url": service_url,
"conversation": dict(activity.get("conversation") or {}),
"bot": dict(activity.get("recipient") or {}),
"user": dict(activity.get("from") or {}),
"channel_id": activity.get("channelId"),
"locale": activity.get("locale"),
},
)
request = ChannelRequest(
channel=self.name,
operation="command.invoke",
input=command_text,
session=ChannelSession(isolation_key=activity_protocol_isolation_key(conversation_id)),
identity=identity,
attributes={
"conversation_id": conversation_id,
"service_url": service_url,
"from_id": (activity.get("from") or {}).get("id"),
"channel_id": activity.get("channelId"),
"aad_object_id": (activity.get("from") or {}).get("aadObjectId"),
},
metadata={"reply_to_id": activity.get("id"), "recipient": activity.get("recipient")},
)
if self._hook is not None:
request = await apply_run_hook(
self._hook,
request,
target=self._ctx.target,
protocol_request=activity,
)
async def _reply(body: str) -> None:
await self._send_message(activity, body)
ctx = ChannelCommandContext(request=request, reply=_reply)
try:
await handler.handle(ctx)
except Exception:
logger.exception("ActivityProtocolChannel command %r failed", command_text)
# -- outbound helpers -------------------------------------------------- #
async def _dispatch(self, inbound: Mapping[str, Any], request: ChannelRequest) -> None:
@@ -513,7 +688,7 @@ class ActivityProtocolChannel:
return
stream = self._ctx.run_stream(request)
await self._stream_to_conversation(inbound, stream)
await self._stream_to_conversation(inbound, request, stream)
async def _apply_response_hook(
self,
@@ -535,22 +710,30 @@ class ActivityProtocolChannel:
async def _stream_to_conversation(
self,
inbound: Mapping[str, Any],
request: ChannelRequest,
stream: ResponseStream[AgentResponseUpdate, AgentResponse],
) -> None:
"""Iterate the stream and progressively edit a single Teams activity.
"""Stream the reply back into the originating conversation.
If the initial placeholder POST fails we fall back to buffering
the whole stream and POSTing a single final message at the end.
Without that fallback the edit-loop's exit condition
``accumulated == last_sent`` is unreachable while ``activity_id``
is ``None`` (no PUT possible), and the worker would deadlock
forever on ``wake.wait()`` after ``worker_done`` is set.
Channels that support the ``updateActivity`` REST operation (see
``_EDIT_CAPABLE_CHANNELS`` — effectively only Teams) get the
progressive-edit experience: a ``…`` placeholder is POSTed, then
repeatedly PUT-edited as text accumulates. Every other channel —
Web Chat, Direct Line, the Emulator, etc. — returns ``405 Method
Not Allowed`` on the PUT, so those buffer the whole stream and POST
a single final message (``_buffer_and_send``); attempting the
edit path there would leave the user staring at a stray ``…``.
"""
if str(inbound.get("channelId") or "").lower() not in _EDIT_CAPABLE_CHANNELS:
await self._buffer_and_send(inbound, request, stream)
return
accumulated = ""
last_sent = ""
last_edit_at = 0.0
activity_id: str | None = None
placeholder_ok = False
edit_unsupported = False
worker_done = asyncio.Event()
wake = asyncio.Event()
@@ -567,7 +750,7 @@ class ActivityProtocolChannel:
placeholder_ok = False
async def edit_worker() -> None:
nonlocal last_sent, last_edit_at
nonlocal last_sent, last_edit_at, edit_unsupported
# When the placeholder failed we have no activity_id to PUT
# into; the loop's only useful work is exiting cleanly. Skip
# straight to that — the final flush below will POST the
@@ -591,8 +774,23 @@ class ActivityProtocolChannel:
continue
try:
await self._update_activity(inbound, activity_id or "", snapshot)
except httpx.HTTPStatusError as exc:
# Some channels advertised as edit-capable may still
# reject the PUT (405). Stop editing and let the final
# flush POST the accumulated text as a new message;
# don't advance ``last_sent`` so that flush still fires.
if exc.response.status_code == 405:
edit_unsupported = True
logger.warning(
"Activity edit not supported by channel %r — sending a single final message instead",
inbound.get("channelId"),
)
return
logger.exception("Activity interim edit failed")
continue
except Exception: # pragma: no cover
logger.exception("Activity interim edit failed")
continue
last_sent = snapshot
last_edit_at = time.monotonic()
@@ -627,10 +825,24 @@ class ActivityProtocolChannel:
except Exception: # pragma: no cover
logger.exception("Stream finalize failed")
# Fan the final reply out to any non-originating linked destinations
# (e.g. ``ResponseTarget.all_linked``) and learn whether this channel
# should still render on its own wire. For the default
# ``ResponseTarget.originating`` this is a no-op that returns True.
# Always consult the host even when nothing streamed so that
# ``ResponseTarget.none`` is honoured and non-originating targets are
# still fanned out for empty replies.
include_originating = True
if self._ctx is not None:
include_originating = await self._ctx.deliver_response(request, _text_result(accumulated))
if not include_originating:
return
# Final flush — make sure the user sees everything that arrived after
# the worker's last edit. If the placeholder failed we POST a fresh
# activity here with whatever accumulated.
if not placeholder_ok:
# the worker's last edit. If the placeholder failed, or the channel
# turned out not to support edits (405), POST a fresh activity here
# with whatever accumulated rather than PUT-editing the placeholder.
if not placeholder_ok or edit_unsupported:
text = accumulated or "(no response)"
try:
await self._send_message(inbound, text)
@@ -649,6 +861,62 @@ class ActivityProtocolChannel:
except Exception: # pragma: no cover
logger.exception("Activity placeholder replace failed")
async def _buffer_and_send(
self,
inbound: Mapping[str, Any],
request: ChannelRequest,
stream: ResponseStream[AgentResponseUpdate, AgentResponse],
) -> None:
"""Consume the whole stream and POST a single final message.
Used for Bot Framework channels that do not support editing an
activity in place (everything except Teams — see
``_EDIT_CAPABLE_CHANNELS``). Those channels return ``405`` to
``PUT /v3/conversations/{id}/activities/{id}``, so the progressive
in-place edit cannot be used; we buffer the stream and ``POST`` a
single message at the end. Mirrors the non-streaming path's
fan-out + response-hook semantics so behaviour is consistent
regardless of whether the target streamed.
"""
accumulated = ""
try:
async for update in stream:
if self._stream_transform_hook is not None:
transformed = self._stream_transform_hook(update)
if isinstance(transformed, Awaitable):
transformed = await transformed
if transformed is None:
continue
update = transformed
chunk = getattr(update, "text", None)
if chunk:
accumulated += chunk
except Exception:
logger.exception("Activity streaming consumption failed")
try:
await stream.get_final_response()
except Exception: # pragma: no cover
logger.exception("Stream finalize failed")
# Fan the final reply out to any non-originating linked destinations
# and learn whether this channel should still render on its own wire.
# Always consult the host even when nothing streamed so that
# ``ResponseTarget.none`` is honoured and non-originating targets are
# still fanned out for empty replies.
include_originating = True
if self._ctx is not None:
include_originating = await self._ctx.deliver_response(request, _text_result(accumulated))
if not include_originating:
return
result = await self._apply_response_hook(_text_result(accumulated), request)
text = getattr(result.result, "text", None) or "(no response)"
try:
await self._send_message(inbound, text)
except Exception: # pragma: no cover
logger.exception("Activity buffered final send failed")
# -- Bot Framework REST helpers --------------------------------------- #
def _activity_payload(self, inbound: Mapping[str, Any], text: str) -> dict[str, Any]:
@@ -664,7 +932,7 @@ class ActivityProtocolChannel:
"channelId": inbound.get("channelId"),
"serviceUrl": inbound.get("serviceUrl"),
"text": text,
"textFormat": "plain",
"textFormat": "markdown",
}
async def _send_message(self, inbound: Mapping[str, Any], text: str) -> str | None:
@@ -730,5 +998,56 @@ class ActivityProtocolChannel:
except Exception: # pragma: no cover - non-critical UX
logger.exception("Teams typing send failed")
# -- ChannelPush -------------------------------------------------------- #
async def push(self, identity: ChannelIdentity, payload: HostedRunResult[Any]) -> None:
"""Proactively deliver an out-of-band message into a Bot Framework conversation.
Implements :class:`host.ChannelPush` so this channel can be a
non-originating destination for ``ChannelRequest.response_target``
(e.g. ``ResponseTarget.all_linked`` fan-out from Telegram/Discord, or
``echo_input`` replay). The conversation reference is reconstructed
from ``identity.attributes`` captured on the inbound activity:
``service_url``, ``conversation``, ``bot`` (outbound ``from``),
``user`` (outbound ``recipient``), and ``channel_id``.
Echo payloads (the user's mirrored input) carry ``role="user"``
messages; Bot Service channels can only send AS the bot, so the text
is delivered as a normal bot message.
"""
if self._http is None:
raise RuntimeError("ActivityProtocolChannel.push called before startup")
attrs = identity.attributes
service_url = str(attrs.get("service_url") or "").rstrip("/")
conversation = dict(attrs.get("conversation") or {"id": identity.native_id})
conversation_id = conversation.get("id") or identity.native_id
if not service_url:
raise ValueError("ActivityProtocolChannel.push requires 'service_url' in identity attributes")
# Re-validate the persisted ``service_url`` against the allow-list. The
# identity may have been recorded hours earlier (push runs out-of-band),
# so the allow-list could have narrowed or the store been tampered with
# since; never send a bearer token to a now-disallowed host.
if not self._is_service_url_allowed(service_url):
raise ValueError(f"ActivityProtocolChannel.push: service_url {service_url!r} is not in the allowed hosts")
text = getattr(payload.result, "text", None) or "(no response)"
activity = {
"type": "message",
"from": dict(attrs.get("bot") or {}),
"recipient": dict(attrs.get("user") or {}),
"conversation": conversation,
"channelId": attrs.get("channel_id"),
"serviceUrl": attrs.get("service_url"),
"text": text,
"textFormat": "markdown",
}
if attrs.get("locale"):
activity["locale"] = attrs["locale"]
url = f"{service_url}/v3/conversations/{conversation_id}/activities"
token = await self._get_token()
response = await self._http.post(url, json=activity, headers=self._auth_headers(token))
response.raise_for_status()
__all__ = ["ActivityProtocolChannel", "activity_protocol_isolation_key"]
@@ -9,16 +9,24 @@ streaming edits and certificate paths are out of scope here.
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import dataclass, replace
from typing import Any
from unittest.mock import AsyncMock, MagicMock
import pytest
from agent_framework_hosting import AgentFrameworkHost, HostedRunResult
from agent_framework_hosting import (
AgentFrameworkHost,
ChannelCommand,
ChannelCommandContext,
ChannelIdentity,
ChannelRequest,
ChannelSession,
HostedRunResult,
)
from starlette.testclient import TestClient
from agent_framework_hosting_activity_protocol import ActivityProtocolChannel, activity_protocol_isolation_key
from agent_framework_hosting_activity_protocol._channel import _parse_activity
from agent_framework_hosting_activity_protocol._channel import _command_text, _parse_activity, _text_result
def test_activity_protocol_isolation_key_format() -> None:
@@ -57,6 +65,74 @@ class TestParseActivity:
# No URI content survived.
assert not any(getattr(c, "uri", None) for c in msg.contents)
def test_skips_teams_text_html_inline_content(self) -> None:
# Teams attaches a text/html rendering whose inline ``content`` is raw
# HTML (not a URL). It must not be parsed as a URI.
msg = _parse_activity({
"type": "message",
"text": "hello there",
"attachments": [
{"contentType": "text/html", "content": "<p>hello there</p>"},
],
})
assert msg.text == "hello there"
assert not any(getattr(c, "uri", None) for c in msg.contents)
def test_skips_attachment_contenturl_without_scheme(self) -> None:
msg = _parse_activity({
"type": "message",
"text": "hi",
"attachments": [
{"contentType": "image/png", "contentUrl": "/relative/path.png"},
],
})
assert msg.text == "hi"
assert not any(getattr(c, "uri", None) for c in msg.contents)
class TestCommandText:
def test_plain_text_unchanged(self) -> None:
assert _command_text({"text": "/help"}) == "/help"
def test_non_string_text_returns_empty(self) -> None:
assert _command_text({"text": None}) == ""
assert _command_text({}) == ""
def test_strips_bot_mention(self) -> None:
activity = {
"text": "<at>Personal Assistant</at> /todos",
"recipient": {"id": "bot-1"},
"entities": [
{"type": "mention", "text": "<at>Personal Assistant</at>", "mentioned": {"id": "bot-1"}},
],
}
assert _command_text(activity) == "/todos"
def test_strips_bot_mention_without_space(self) -> None:
activity = {
"text": "<at>Bot</at>/help",
"recipient": {"id": "bot-1"},
"entities": [{"type": "mention", "text": "<at>Bot</at>", "mentioned": {"id": "bot-1"}}],
}
assert _command_text(activity) == "/help"
def test_keeps_other_user_mention(self) -> None:
activity = {
"text": "/whoami <at>Someone</at>",
"recipient": {"id": "bot-1"},
"entities": [{"type": "mention", "text": "<at>Someone</at>", "mentioned": {"id": "user-9"}}],
}
# Another user's mention must not be stripped.
assert _command_text(activity) == "/whoami <at>Someone</at>"
def test_malformed_entities_are_ignored(self) -> None:
activity = {
"text": "/help",
"recipient": {"id": "bot-1"},
"entities": ["not-a-mapping", {"type": "clientInfo"}, {"type": "mention"}],
}
assert _command_text(activity) == "/help"
@dataclass
class _FakeAgentResponse:
@@ -80,9 +156,11 @@ class _FakeAgent:
return _coro()
def _make_teams(stream: bool = False) -> tuple[ActivityProtocolChannel, _FakeAgent]:
def _make_teams(
stream: bool = False, *, path: str = "/activity/messages"
) -> tuple[ActivityProtocolChannel, _FakeAgent]:
agent = _FakeAgent("hi there")
ch = ActivityProtocolChannel(stream=stream, send_typing_action=False)
ch = ActivityProtocolChannel(path=path, stream=stream, send_typing_action=False)
fake_http = MagicMock()
response_mock = MagicMock()
response_mock.raise_for_status = MagicMock()
@@ -105,6 +183,11 @@ _VALID_ACTIVITY: dict[str, Any] = {
"serviceUrl": "https://smba.trafficmanager.net/amer/",
}
# Minimal request envelope for direct ``_stream_to_conversation`` calls. The
# channel only consults it for cross-channel fan-out, which is skipped when
# ``_ctx`` is unset (as in these unit tests).
_VALID_REQUEST = ChannelRequest(channel="activity", operation="message.create", input=[])
class TestTeamsWebhook:
def test_message_activity_dispatches_to_agent(self) -> None:
@@ -122,6 +205,14 @@ class TestTeamsWebhook:
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
assert body["text"] == "hi there"
def test_empty_path_mounts_at_app_root(self) -> None:
ch, agent = _make_teams(path="")
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/", json=_VALID_ACTIVITY)
assert r.status_code == 200
assert agent.runs, "expected the agent to be invoked"
def test_response_hook_can_rewrite_originating_reply(self) -> None:
contexts: list[Any] = []
@@ -181,6 +272,107 @@ class TestTeamsWebhook:
assert not agent.runs
class TestCommands:
def _make_with_commands(self, commands: list[ChannelCommand]) -> tuple[ActivityProtocolChannel, _FakeAgent]:
agent = _FakeAgent("hi there")
ch = ActivityProtocolChannel(send_typing_action=False, commands=commands)
fake_http = MagicMock()
response_mock = MagicMock()
response_mock.raise_for_status = MagicMock()
response_mock.json = MagicMock(return_value={"id": "act-1"})
fake_http.post = AsyncMock(return_value=response_mock)
fake_http.put = AsyncMock(return_value=response_mock)
fake_http.aclose = AsyncMock()
ch._http = fake_http
return ch, agent
def test_slash_command_bypasses_agent_and_replies(self) -> None:
seen: list[ChannelCommandContext] = []
async def handle(ctx: ChannelCommandContext) -> None:
seen.append(ctx)
await ctx.reply("listed")
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
host = AgentFrameworkHost(target=agent, channels=[ch])
activity = dict(_VALID_ACTIVITY, text="/todos")
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=activity)
assert r.status_code == 200
assert not agent.runs, "command must bypass the agent"
assert seen and seen[0].request.operation == "command.invoke"
assert seen[0].request.input == "/todos"
assert seen[0].request.session is not None
assert seen[0].request.session.isolation_key == activity_protocol_isolation_key("19:meeting_xyz@thread.v2")
assert ch._http is not None
assert ch._http.post.call_args[1]["json"]["text"] == "listed" # type: ignore[attr-defined]
def test_command_match_is_case_insensitive(self) -> None:
ran = False
async def handle(ctx: ChannelCommandContext) -> None:
nonlocal ran
ran = True
ch, agent = self._make_with_commands([ChannelCommand("New", "reset", handle)])
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/new"))
assert r.status_code == 200
assert ran
assert not agent.runs
def test_unknown_command_falls_through_to_agent(self) -> None:
async def handle(ctx: ChannelCommandContext) -> None: # pragma: no cover - never called
raise AssertionError("should not run")
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/unknown"))
assert r.status_code == 200
assert agent.runs, "unknown /command must reach the agent"
def test_command_failure_does_not_retry(self) -> None:
async def handle(ctx: ChannelCommandContext) -> None:
raise RuntimeError("boom")
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos"))
# Best-effort: a failing command is swallowed and acked with 200 so Bot
# Service does not retry (and re-run a non-idempotent command).
assert r.status_code == 200
assert not agent.runs
def test_run_hook_applied_to_command_request(self) -> None:
def hook(request: ChannelRequest, **_: Any) -> ChannelRequest:
return replace(request, session=ChannelSession(isolation_key="resolved-key"))
captured: list[str] = []
async def handle(ctx: ChannelCommandContext) -> None:
assert ctx.request.session is not None
captured.append(ctx.request.session.isolation_key)
agent = _FakeAgent("hi")
ch = ActivityProtocolChannel(send_typing_action=False, commands=[ChannelCommand("todos", "x", handle)])
ch._hook = hook
fake_http = MagicMock()
response_mock = MagicMock()
response_mock.raise_for_status = MagicMock()
response_mock.json = MagicMock(return_value={"id": "act-1"})
fake_http.post = AsyncMock(return_value=response_mock)
fake_http.aclose = AsyncMock()
ch._http = fake_http
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos"))
assert r.status_code == 200
assert captured == ["resolved-key"]
class TestOutbound:
async def test_send_message_posts_to_conversation_url(self) -> None:
ch, _agent = _make_teams()
@@ -193,6 +385,103 @@ class TestOutbound:
assert body["text"] == "hi"
class TestPush:
"""The channel implements ``host.ChannelPush`` so it can be a
non-originating destination for cross-channel fan-out / echo replay."""
def test_is_channel_push_instance(self) -> None:
from agent_framework_hosting import ChannelPush
ch, _agent = _make_teams()
assert isinstance(ch, ChannelPush)
def _identity(self) -> ChannelIdentity:
return ChannelIdentity(
channel="activity",
native_id="19:meeting_xyz@thread.v2",
attributes={
"service_url": "https://smba.trafficmanager.net/amer/",
"conversation": {"id": "19:meeting_xyz@thread.v2"},
"bot": {"id": "bot-1"},
"user": {"id": "user-1"},
"channel_id": "msteams",
"locale": "en-US",
},
)
async def test_push_posts_proactive_activity(self) -> None:
ch, _agent = _make_teams()
await ch.push(self._identity(), _text_result("broadcast hello"))
assert ch._http is not None
ch._http.post.assert_called() # type: ignore[attr-defined]
url = ch._http.post.call_args[0][0] # type: ignore[attr-defined]
assert url == ("https://smba.trafficmanager.net/amer/v3/conversations/19:meeting_xyz@thread.v2/activities")
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
assert body["text"] == "broadcast hello"
# Outbound activity speaks AS the bot: inbound recipient -> from,
# inbound from -> recipient.
assert body["from"] == {"id": "bot-1"}
assert body["recipient"] == {"id": "user-1"}
assert body["conversation"] == {"id": "19:meeting_xyz@thread.v2"}
async def test_push_requires_service_url(self) -> None:
ch, _agent = _make_teams()
identity = ChannelIdentity(
channel="activity",
native_id="conv-x",
attributes={"conversation": {"id": "conv-x"}},
)
with pytest.raises(ValueError, match="service_url"):
await ch.push(identity, _text_result("hi"))
async def test_push_rejects_disallowed_service_url(self) -> None:
# ``push`` runs out-of-band against a persisted identity, so it must
# re-validate the service_url against the allow-list rather than trust
# the value captured (possibly hours) earlier.
ch, _agent = _make_teams()
identity = ChannelIdentity(
channel="activity",
native_id="conv-x",
attributes={
"service_url": "https://attacker.example.com/",
"conversation": {"id": "conv-x"},
"bot": {"id": "bot-1"},
"user": {"id": "user-1"},
},
)
with pytest.raises(ValueError, match="not in the allowed hosts"):
await ch.push(identity, _text_result("hi"))
assert ch._http is not None
ch._http.post.assert_not_called() # type: ignore[attr-defined]
class TestIdentityRecording:
"""``_process_activity`` must stamp the inbound conversation reference
onto ``ChannelRequest.identity`` so the host can record it for fan-out."""
async def test_inbound_sets_request_identity(self) -> None:
ch, agent = _make_teams()
captured: dict[str, Any] = {}
async def hook(req: ChannelRequest, **_: Any) -> ChannelRequest:
captured["request"] = req
return req
ch._hook = hook # type: ignore[assignment]
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
assert r.status_code == 200
request = captured["request"]
assert request.identity is not None
assert request.identity.channel == "activity"
assert request.identity.native_id == "19:meeting_xyz@thread.v2"
attrs = request.identity.attributes
assert attrs["service_url"] == "https://smba.trafficmanager.net/amer/"
assert attrs["bot"] == {"id": "bot-1"}
assert attrs["user"] == {"id": "user-1"}
class TestConfig:
def test_rejects_both_secret_and_certificate(self) -> None:
with pytest.raises(ValueError, match="not both"):
@@ -371,7 +660,7 @@ class TestStreaming:
# Use a tight throttle so the test doesn't sit on `wait_for`.
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(_VALID_ACTIVITY, _Stream()) # type: ignore[arg-type]
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
assert ch._http is not None
# Placeholder POST + at least one final PUT.
ch._http.post.assert_called() # type: ignore[attr-defined]
@@ -420,7 +709,7 @@ class TestStreaming:
import asyncio as _asyncio
await _asyncio.wait_for(
ch._stream_to_conversation(_VALID_ACTIVITY, _Stream()), # type: ignore[arg-type]
ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()), # type: ignore[arg-type]
timeout=2.0,
)
# Two POSTs total: placeholder (failed) + fallback final.
@@ -444,9 +733,150 @@ class TestStreaming:
return _FakeAgentResponse(text="")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(_VALID_ACTIVITY, _EmptyStream()) # type: ignore[arg-type]
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
# The placeholder PUT-replaces with "(no response)" so the user
# isn't left staring at "…".
assert ch._http is not None
last_put_body = ch._http.put.call_args[1]["json"] # type: ignore[attr-defined]
assert last_put_body["text"] == "(no response)"
async def test_non_edit_channel_buffers_and_posts_single_message(self) -> None:
# Web Chat (and every non-Teams channel) does not support
# PUT /activities/{id}; the channel must buffer the stream and POST
# a single final message rather than the placeholder+edit dance.
ch, _agent = _make_teams(stream=True)
webchat_activity = {**_VALID_ACTIVITY, "channelId": "webchat"}
@dataclass
class _Up:
text: str
class _Stream:
def __aiter__(self) -> Any:
async def gen() -> Any:
yield _Up("hel")
yield _Up("lo")
return gen()
async def get_final_response(self) -> Any:
return _FakeAgentResponse(text="hello")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
assert ch._http is not None
# No PUT (no editing); exactly one POST with the full text.
ch._http.put.assert_not_called() # type: ignore[attr-defined]
assert ch._http.post.await_count == 1 # type: ignore[attr-defined]
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
assert body["text"] == "hello"
async def test_non_edit_channel_empty_stream_posts_no_response(self) -> None:
ch, _agent = _make_teams(stream=True)
webchat_activity = {**_VALID_ACTIVITY, "channelId": "directline"}
class _EmptyStream:
def __aiter__(self) -> Any:
async def gen() -> Any:
if False:
yield None # type: ignore[unreachable]
return gen()
async def get_final_response(self) -> Any:
return _FakeAgentResponse(text="")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
assert ch._http is not None
ch._http.put.assert_not_called() # type: ignore[attr-defined]
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
assert body["text"] == "(no response)"
async def test_buffer_empty_stream_consults_host_and_can_suppress(self) -> None:
# Empty streamed replies must still consult the host so that
# ``ResponseTarget.none`` (deliver_response -> False) suppresses the
# originating message instead of posting "(no response)".
ch, _agent = _make_teams(stream=True)
webchat_activity = {**_VALID_ACTIVITY, "channelId": "directline"}
ctx = MagicMock()
ctx.deliver_response = AsyncMock(return_value=False)
ch._ctx = ctx
class _EmptyStream:
def __aiter__(self) -> Any:
async def gen() -> Any:
if False:
yield None # type: ignore[unreachable]
return gen()
async def get_final_response(self) -> Any:
return _FakeAgentResponse(text="")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
assert ch._http is not None
ctx.deliver_response.assert_awaited_once()
ch._http.post.assert_not_called() # type: ignore[attr-defined]
ch._http.put.assert_not_called() # type: ignore[attr-defined]
async def test_edit_empty_stream_consults_host_and_can_suppress(self) -> None:
# Same contract for the edit-capable (Teams) progressive path.
ch, _agent = _make_teams(stream=True)
ctx = MagicMock()
ctx.deliver_response = AsyncMock(return_value=False)
ch._ctx = ctx
class _EmptyStream:
def __aiter__(self) -> Any:
async def gen() -> Any:
if False:
yield None # type: ignore[unreachable]
return gen()
async def get_final_response(self) -> Any:
return _FakeAgentResponse(text="")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
ctx.deliver_response.assert_awaited_once()
async def test_edit_405_falls_back_to_single_post(self) -> None:
# Defensive: a channel advertised as edit-capable that nonetheless
# rejects the PUT with 405 must stop editing and POST the final
# text as a fresh message instead of silently leaving "…".
import httpx as _httpx
ch, _agent = _make_teams(stream=True)
assert ch._http is not None
request_405 = _httpx.Request("PUT", "https://smba.trafficmanager.net/amer/v3/x")
response_405 = _httpx.Response(405, request=request_405)
ch._http.put = AsyncMock( # type: ignore[attr-defined]
side_effect=_httpx.HTTPStatusError("405", request=request_405, response=response_405)
)
@dataclass
class _Up:
text: str
class _Stream:
def __aiter__(self) -> Any:
async def gen() -> Any:
yield _Up("hel")
yield _Up("lo")
return gen()
async def get_final_response(self) -> Any:
return _FakeAgentResponse(text="hello")
ch._stream_edit_min_interval = 0.0
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
# Placeholder POST + fallback final POST = 2 POSTs; the final one
# carries the full text.
assert ch._http.post.await_count == 2 # type: ignore[attr-defined]
final_body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
assert final_body["text"] == "hello"
@@ -92,7 +92,7 @@ class DiscordChannel:
public_key: str,
bot_token: str | None = None,
guild_id: str | None = None,
path: str = "/discord",
path: str = "/discord/interactions",
agent_command: str = "ask",
agent_command_description: str = "Ask the agent",
agent_command_option: str = "prompt",
@@ -120,8 +120,8 @@ class DiscordChannel:
guild_id: Optional guild id for guild-scoped slash command
registration. Recommended for development because global
command registration can take a long time to propagate.
path: Host mount path. The interaction route is contributed as
``/interactions`` below this path.
path: Interaction endpoint path on the host. Use ``""`` to expose
the interaction route at the app root.
agent_command: Slash command name that invokes the hosted agent.
agent_command_description: Description for the agent slash command.
agent_command_option: String option name that carries the prompt.
@@ -159,7 +159,7 @@ class DiscordChannel:
self.agent_command_description = agent_command_description
self.agent_command_option = agent_command_option
self.register_commands = register_commands
self._commands: set[ChannelCommand] = set(commands) or {} # type: ignore
self._commands = tuple(commands or ())
self._command_by_name = {command.name: command for command in self._commands}
self._run_hook = run_hook
self.response_hook = response_hook
@@ -184,7 +184,7 @@ class DiscordChannel:
"""Register the Discord interaction route and lifecycle hooks."""
self._ctx = context
return ChannelContribution(
routes=[Route("/interactions", self._handle, methods=["POST"])],
routes=[Route("/", self._handle, methods=["POST"])],
commands=self._commands,
on_startup=[self._on_startup],
on_shutdown=[self._on_shutdown],
@@ -126,9 +126,9 @@ def test_ping_requires_valid_signature_and_returns_pong() -> None:
body = json.dumps({"type": 1}).encode("utf-8")
with TestClient(app) as client:
ok = client.post("/interactions", content=body, headers=_headers(signing_key, body))
ok = client.post("/", content=body, headers=_headers(signing_key, body))
bad = client.post(
"/interactions",
"/",
content=body,
headers={
**_headers(signing_key, body),
@@ -159,11 +159,11 @@ def test_request_validation_errors() -> None:
unsupported_app = Starlette(routes=list(unsupported_channel.contribute(_FakeContext()).routes)) # type: ignore[arg-type]
with TestClient(app) as client:
too_large = client.post("/interactions", content=b"{}x")
invalid_json = client.post("/interactions", content=b"{")
too_large = client.post("/", content=b"{}x")
invalid_json = client.post("/", content=b"{")
with TestClient(unsupported_app) as client:
non_object = client.post("/interactions", json=[])
unsupported = client.post("/interactions", json={"type": 99})
non_object = client.post("/", json=[])
unsupported = client.post("/", json={"type": 99})
assert too_large.status_code == 413
assert invalid_json.status_code == 400
@@ -1,13 +1,13 @@
# agent-framework-hosting-invocations
Minimal `POST /invoke` channel for [agent-framework-hosting](../hosting). Useful
Minimal `POST /invocations` channel for [agent-framework-hosting](../hosting). Useful
for smoke-testing, durable-task drivers, and bespoke clients that don't speak
the OpenAI Responses protocol.
## Wire shape
```
POST /invocations/invoke
POST /invocations
{
"message": "hello",
"session_id": "user-42",
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.
"""Minimal ``POST /invoke`` channel for :mod:`agent_framework_hosting`."""
"""Minimal ``POST /invocations`` channel for :mod:`agent_framework_hosting`."""
from ._channel import InvocationsChannel
@@ -1,6 +1,6 @@
# Copyright (c) Microsoft. All rights reserved.
"""Minimal ``POST /invoke`` channel.
"""Minimal ``POST /invocations`` channel.
Inspired by ``agent-framework-foundry-hosting``'s ``InvocationsHostServer``.
A framework-agnostic surface for callers that just want to send a message and
@@ -32,7 +32,7 @@ from starlette.routing import Route
class InvocationsChannel:
"""Minimal ``POST /invoke`` surface.
"""Minimal ``POST /invocations`` surface.
A run hook can rewrite the channel request (e.g. inject a session, add
options) before the host invokes the agent. A stream-transform hook can
@@ -51,8 +51,8 @@ class InvocationsChannel:
) -> None:
"""Configure the invocations endpoint.
``path`` is the mount root the host prefixes when registering this
channel's routes (the actual handler is ``POST {path}/invoke``).
``path`` is the endpoint path the host uses when registering this
channel. Use ``""`` to expose the handler at the app root.
``run_hook`` may rewrite the :class:`ChannelRequest` before the host
invokes the target — typically to attach session metadata or
translate the wire payload into ``Message`` instances.
@@ -68,12 +68,12 @@ class InvocationsChannel:
self._ctx: ChannelContext | None = None
def contribute(self, context: ChannelContext) -> ChannelContribution:
"""Capture the host-supplied context and register ``POST /invoke``."""
"""Capture the host-supplied context and register the endpoint route."""
self._ctx = context
return ChannelContribution(routes=[Route("/invoke", self._handle, methods=["POST"])])
return ChannelContribution(routes=[Route("/", self._handle, methods=["POST"])])
async def _handle(self, request: Request) -> Response:
"""Handle a single ``POST /invoke`` call.
"""Handle a single Invocations call.
Validates the JSON body shape, builds a :class:`ChannelRequest`
(optionally with a ``ChannelSession`` keyed by ``session_id``),
@@ -1,6 +1,6 @@
[project]
name = "agent-framework-hosting-invocations"
description = "Minimal POST /invoke channel for agent-framework-hosting."
description = "Minimal POST /invocations channel for agent-framework-hosting."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
@@ -60,9 +60,9 @@ class _FakeAgent:
return _coro()
def _make_client(agent: _FakeAgent | None = None) -> tuple[TestClient, _FakeAgent]:
def _make_client(agent: _FakeAgent | None = None, *, path: str = "/invocations") -> tuple[TestClient, _FakeAgent]:
agent = agent or _FakeAgent()
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel()])
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel(path=path)])
return TestClient(host.app), agent
@@ -70,14 +70,21 @@ class TestInvocations:
def test_post_invoke_returns_response(self) -> None:
client, _agent = _make_client(_FakeAgent(reply="pong"))
with client:
r = client.post("/invocations/invoke", json={"message": "ping"})
r = client.post("/invocations", json={"message": "ping"})
assert r.status_code == 200
assert r.json() == {"response": "pong", "session_id": None}
def test_empty_path_mounts_at_app_root(self) -> None:
client, _agent = _make_client(_FakeAgent(reply="pong"), path="")
with client:
r = client.post("/", json={"message": "ping"})
assert r.status_code == 200
assert r.json() == {"response": "pong", "session_id": None}
def test_session_id_propagates_to_target(self) -> None:
client, agent = _make_client()
with client:
r = client.post("/invocations/invoke", json={"message": "x", "session_id": "s1"})
r = client.post("/invocations", json={"message": "x", "session_id": "s1"})
assert r.status_code == 200
assert r.json()["session_id"] == "s1"
sess = agent.calls[0]["kwargs"].get("session")
@@ -90,7 +97,7 @@ class TestInvocations:
client, _ = _make_client()
with client:
r = client.post(
"/invocations/invoke",
"/invocations",
content=b"{not json",
headers={"content-type": "application/json"},
)
@@ -99,26 +106,26 @@ class TestInvocations:
def test_empty_message_returns_422(self) -> None:
client, _ = _make_client()
with client:
r = client.post("/invocations/invoke", json={"message": ""})
r = client.post("/invocations", json={"message": ""})
assert r.status_code == 422
def test_non_string_session_id_returns_422(self) -> None:
client, _ = _make_client()
with client:
r = client.post("/invocations/invoke", json={"message": "x", "session_id": 1})
r = client.post("/invocations", json={"message": "x", "session_id": 1})
assert r.status_code == 422
def test_non_object_body_returns_422(self) -> None:
client, _ = _make_client()
with client:
r = client.post("/invocations/invoke", json=[])
r = client.post("/invocations", json=[])
assert r.status_code == 422
def test_streaming_emits_data_lines_and_done(self) -> None:
agent = _FakeAgent(chunks=["hel", "lo"])
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel()])
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
body = r.text
assert "data: hel" in body
@@ -136,7 +143,7 @@ class TestInvocations:
agent = _FakeAgent(reply="ok")
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel(run_hook=hook)])
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
# Even though caller asked for stream=True, hook flipped it off — so
# we get JSON back, not SSE.
@@ -154,7 +161,7 @@ class TestInvocations:
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel(response_hook=hook)])
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "ping"})
r = client.post("/invocations", json={"message": "ping"})
assert r.status_code == 200
assert r.json() == {"response": "hooked:pong", "session_id": None}
@@ -174,7 +181,7 @@ class TestInvocations:
channels=[InvocationsChannel(stream_transform_hook=transform)],
)
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
body = r.text
assert "data: FOO" in body
@@ -192,7 +199,7 @@ class TestInvocations:
channels=[InvocationsChannel(stream_transform_hook=transform)],
)
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
body = r.text
assert "data: keep" in body
@@ -210,7 +217,7 @@ class TestInvocations:
channels=[InvocationsChannel(stream_transform_hook=transform)],
)
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
assert "data: aa!" in r.text
@@ -221,7 +228,7 @@ class TestInvocations:
agent = _FakeAgent(chunks=["line1\r\nline2"])
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel()])
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
body = r.text
assert "data: line1\n" in body
@@ -247,7 +254,7 @@ class TestInvocations:
agent = _AgentWithFailingFinal()
host = AgentFrameworkHost(target=agent, channels=[InvocationsChannel()])
with TestClient(host.app) as client:
r = client.post("/invocations/invoke", json={"message": "x", "stream": True})
r = client.post("/invocations", json={"message": "x", "stream": True})
assert r.status_code == 200
body = r.text
assert "data: partial" in body
@@ -77,7 +77,7 @@ class ResponsesChannel:
Mounts ``POST <path>/responses`` (default path ``/responses`` so the
full route is ``/responses/responses`` when the channel is prefixed,
or just ``/responses`` when ``path=""``).
or just ``/`` when ``path=""``).
"""
name = "responses"
@@ -85,7 +85,7 @@ class ResponsesChannel:
def __init__(
self,
*,
path: str = "",
path: str = "/responses",
run_hook: ChannelRunHook | None = None,
response_hook: ChannelResponseHook | None = None,
stream_transform_hook: ChannelStreamTransformHook | None = None,
@@ -94,9 +94,9 @@ class ResponsesChannel:
"""Create a Responses channel.
Keyword Args:
path: Mount prefix on the host. Default ``""`` mounts the
``POST /responses`` route at the app root, matching the
upstream OpenAI surface.
path: Endpoint path on the host. Default ``"/responses"`` matches
the upstream OpenAI surface; use ``""`` to expose this channel
at the app root.
run_hook: Optional :data:`ChannelRunHook` invoked with the
parsed :class:`ChannelRequest` before the agent target
runs. May return a replacement request.
@@ -145,12 +145,12 @@ class ResponsesChannel:
)
def contribute(self, context: ChannelContext) -> ChannelContribution:
"""Capture the host-supplied context and register ``POST /responses``."""
"""Capture the host-supplied context and register the endpoint route."""
self._ctx = context
return ChannelContribution(routes=[Route("/responses", self._handle, methods=["POST"])])
return ChannelContribution(routes=[Route("/", self._handle, methods=["POST"])])
async def _handle(self, request: Request) -> Response:
"""Handle a single ``POST /responses`` call.
"""Handle a single Responses API call.
Parses the OpenAI Responses-shaped body into ``Message`` /
``options`` / ``ChannelSession`` triples via :mod:`._parsing`,
@@ -91,9 +91,13 @@ class _RecordingPushChannel:
# --------------------------------------------------------------------------- #
def _make_client(agent: _FakeAgent | None = None) -> tuple[TestClient, AgentFrameworkHost, _FakeAgent]:
def _make_client(
agent: _FakeAgent | None = None,
*,
path: str = "/responses",
) -> tuple[TestClient, AgentFrameworkHost, _FakeAgent]:
agent = agent or _FakeAgent()
host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel()])
host = AgentFrameworkHost(target=agent, channels=[ResponsesChannel(path=path)])
return TestClient(host.app), host, agent
@@ -110,6 +114,13 @@ class TestResponsesChannelNonStreaming:
assert body["output"][0]["content"][0]["text"] == "hi back"
assert len(agent.calls) == 1
def test_empty_path_mounts_at_app_root(self) -> None:
client, _host, _agent = _make_client(_FakeAgent(reply="hi back"), path="")
with client:
r = client.post("/", json={"input": "hi"})
assert r.status_code == 200
assert r.json()["output"][0]["content"][0]["text"] == "hi back"
def test_invalid_json_returns_400(self) -> None:
client, *_ = _make_client()
with client:
@@ -88,6 +88,21 @@ def _text_result(text: str) -> HostedRunResult[AgentResponse]:
return HostedRunResult(AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text(text=text)])]))
def _is_echo_payload(payload: HostedRunResult[AgentResponse]) -> bool:
"""Return ``True`` when a push payload is an echoed user turn.
Per the :class:`~agent_framework_hosting.ChannelPush` contract the host
mirrors the originating user's input as a one-or-more message
:class:`~agent_framework.AgentResponse` with every ``role == "user"``,
delivered *before* the agent's (``role == "assistant"``) reply. Treating a
payload whose messages are all user-role as an echo lets the channel pick
echo-only delivery options (e.g. silent notifications) without the host
having to thread an explicit ``is_echo`` flag through ``push``.
"""
messages = getattr(payload.result, "messages", None) or []
return bool(messages) and all(getattr(m, "role", None) == "user" for m in messages)
def _telegram_media_file_id(message: Mapping[str, Any]) -> tuple[str, str] | None:
"""Return ``(file_id, fallback_media_type)`` for any media on the message."""
photo = message.get("photo")
@@ -151,7 +166,7 @@ class TelegramChannel:
self,
*,
bot_token: str,
path: str = "/telegram",
path: str = "/telegram/webhook",
commands: Sequence[ChannelCommand] = (),
register_native_commands: bool = True,
run_hook: ChannelRunHook | None = None,
@@ -159,6 +174,7 @@ class TelegramChannel:
api_base: str = "https://api.telegram.org",
webhook_url: str | None = None,
secret_token: str | None = None,
delete_webhook_on_shutdown: bool = False,
parse_mode: str | None = None,
send_typing_action: bool = True,
transport: Literal["auto", "polling", "webhook"] = "auto",
@@ -179,6 +195,7 @@ class TelegramChannel:
self._api = f"{api_base}/bot{bot_token}"
self._webhook_url = webhook_url
self._secret_token = secret_token
self._delete_webhook_on_shutdown = delete_webhook_on_shutdown
self._parse_mode = parse_mode
self._send_typing_action = send_typing_action
if transport == "auto":
@@ -210,7 +227,7 @@ class TelegramChannel:
self._ctx = context
routes: list[BaseRoute] = []
if self._transport == "webhook":
routes.append(Route("/webhook", self._handle, methods=["POST"]))
routes.append(Route("/", self._handle, methods=["POST"]))
return ChannelContribution(
routes=routes,
commands=self._commands,
@@ -258,7 +275,7 @@ class TelegramChannel:
logger.info("Telegram polling started (long-poll timeout=%ss)", self._polling_timeout)
async def _on_shutdown(self) -> None:
"""Stop the polling task, drain in-flight workers, drop the webhook, close HTTP.
"""Stop the polling task, drain in-flight workers, close HTTP.
Drain order:
1. Cancel the poll task so no new updates are admitted.
@@ -269,10 +286,17 @@ class TelegramChannel:
``_update_tasks`` (the webhook handler returns 200 immediately
and runs the agent in a background task, which the previous
shutdown ignored entirely).
4. Best-effort `deleteWebhook` and HTTP client close.
4. Close the HTTP client.
Webhook teardown is best-effort failures (e.g. revoked token at
shutdown) are logged but never raised so app shutdown can complete.
The webhook registration is intentionally **left in place** on
shutdown. A Telegram webhook is a single global resource, so
deleting it here races rolling redeploys: the new revision calls
``setWebhook`` on startup, then the old revision's shutdown would
delete it, silently breaking inbound delivery until the next boot.
``setWebhook`` is overwriting/idempotent, so the next startup
re-asserts it anyway. Set ``delete_webhook_on_shutdown=True`` to opt
into best-effort teardown (e.g. for a one-off/ephemeral deployment);
failures are logged but never raised so app shutdown can complete.
"""
if self._poll_task is not None:
self._poll_task.cancel()
@@ -296,7 +320,7 @@ class TelegramChannel:
await task
self._update_tasks.clear()
if self._http is not None:
if self._transport == "webhook":
if self._transport == "webhook" and self._delete_webhook_on_shutdown:
try:
await self._http.post(f"{self._api}/deleteWebhook")
except Exception: # pragma: no cover - best-effort cleanup
@@ -845,7 +869,17 @@ class TelegramChannel:
raise ValueError(f"Telegram push requires an int chat_id, got {identity.native_id!r}") from exc
if self._http is None:
raise RuntimeError("TelegramChannel.push called before startup")
await self._send(chat_id, payload.result.text)
# The Bot API can only ever send AS the bot, so there is no way to
# impersonate the user for an echo (the MTProto ``send_as`` field is
# not exposed to bots). The next best UX is to deliver echoes
# *silently* (``disable_notification``) so a mirrored input doesn't
# buzz the user's device the way a genuine reply does. Echo phases are
# identified per the ChannelPush contract: a payload whose messages are
# all ``role == "user"`` is the originating turn mirrored here.
extra: dict[str, Any] = {}
if _is_echo_payload(payload):
extra["disable_notification"] = True
await self._send(chat_id, payload.result.text, **extra)
async def _send_photo(self, chat_id: int, photo_url: str, caption: str | None = None) -> None:
"""POST a ``sendPhoto`` to Telegram with an optional caption."""
@@ -123,10 +123,13 @@ def _run_result(text: str) -> HostedRunResult[AgentResponse]:
return HostedRunResult(AgentResponse(messages=[Message(role="assistant", contents=[Content.from_text(text=text)])]))
def _make_telegram(stream_default: bool = False) -> tuple[TelegramChannel, _FakeAgent]:
def _make_telegram(
stream_default: bool = False, *, path: str = "/telegram/webhook"
) -> tuple[TelegramChannel, _FakeAgent]:
agent = _FakeAgent("hi")
ch = TelegramChannel(
bot_token="123:abc",
path=path,
webhook_url="https://example.com/hook",
secret_token="s3cr3t",
stream=stream_default,
@@ -158,6 +161,18 @@ class TestTelegramWebhook:
assert r.status_code == 200
assert agent.runs, "expected the agent to be invoked"
def test_empty_path_mounts_at_app_root(self) -> None:
ch, agent = _make_telegram(path="")
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app) as client:
r = client.post(
"/",
json={"update_id": 1, "message": {"chat": {"id": 99}, "text": "hello"}},
headers={"x-telegram-bot-api-secret-token": "s3cr3t"},
)
assert r.status_code == 200
assert agent.runs, "expected the agent to be invoked"
def test_webhook_rejects_bad_secret(self) -> None:
ch, agent = _make_telegram()
host = AgentFrameworkHost(target=agent, channels=[ch])
@@ -216,6 +231,23 @@ class TestPushAndCommand:
assert args[0].endswith("/sendMessage")
assert kwargs["json"]["chat_id"] in ("42", 42)
assert kwargs["json"]["text"] == "hi"
# Agent replies must stay loud: no silent flag on a non-echo push.
assert "disable_notification" not in kwargs["json"]
async def test_push_echo_is_silent(self) -> None:
ch, _agent = _make_telegram()
from agent_framework_hosting import ChannelIdentity
echo = HostedRunResult(
AgentResponse(messages=[Message(role="user", contents=[Content.from_text(text="said via X")])])
)
await ch.push(ChannelIdentity(channel="telegram", native_id="42"), echo)
assert ch._http is not None
_args, kwargs = ch._http.post.call_args # type: ignore[attr-defined]
# Bots cannot impersonate the user (no MTProto send_as), so the echo is
# delivered silently instead of buzzing the device like a real reply.
assert kwargs["json"]["disable_notification"] is True
assert kwargs["json"]["text"] == "said via X"
async def test_command_handler_invoked(self) -> None:
captured: list[ChannelCommandContext] = []
@@ -387,3 +419,39 @@ class TestShutdownDrainsWorkers:
await ch._on_shutdown()
assert not ch._chat_workers
assert not ch._update_tasks
def _deletewebhook_called(http_mock: MagicMock) -> bool:
return any(
call.args and str(call.args[0]).endswith("/deleteWebhook") for call in http_mock.post.call_args_list
)
class TestWebhookShutdownTeardown:
async def test_shutdown_keeps_webhook_by_default(self) -> None:
"""Default: shutdown must NOT delete the webhook (avoids redeploy races)."""
ch, _ = _make_telegram()
assert ch._transport == "webhook"
await ch._on_shutdown()
assert not _deletewebhook_called(ch._http) # type: ignore[arg-type]
ch._http.aclose.assert_awaited() # type: ignore[union-attr]
async def test_shutdown_deletes_webhook_when_opted_in(self) -> None:
"""Opt-in: ``delete_webhook_on_shutdown=True`` performs best-effort teardown."""
ch = TelegramChannel(
bot_token="123:abc",
webhook_url="https://example.com/hook",
secret_token="s3cr3t",
delete_webhook_on_shutdown=True,
stream=False,
)
fake_http = MagicMock()
response_mock = MagicMock()
response_mock.json = MagicMock(return_value={"ok": True, "result": {}})
fake_http.post = AsyncMock(return_value=response_mock)
fake_http.get = AsyncMock(return_value=response_mock)
fake_http.aclose = AsyncMock()
ch._http = fake_http
await ch._on_shutdown()
assert _deletewebhook_called(fake_http)
fake_http.aclose.assert_awaited()
@@ -48,7 +48,7 @@ from starlette.applications import Starlette
from starlette.middleware import Middleware
from starlette.requests import Request
from starlette.responses import PlainTextResponse
from starlette.routing import BaseRoute, Mount, Route
from starlette.routing import BaseRoute, Mount, Route, WebSocketRoute
from starlette.types import ASGIApp, Receive, Scope, Send
from ._authorization import (
@@ -110,6 +110,21 @@ _EPHEMERAL_RUNTIME_MARKERS: tuple[str, ...] = (
RuntimeMode = Literal["long_running", "ephemeral"]
def _exact_path_route(path: str, route: BaseRoute) -> BaseRoute | None:
"""Clone a root route so ``Mount('/x', Route('/'))`` also handles ``/x`` without a redirect."""
if isinstance(route, Route) and route.path == "/":
return Route(
path,
route.endpoint,
methods=route.methods,
name=route.name,
include_in_schema=route.include_in_schema,
)
if isinstance(route, WebSocketRoute) and route.path == "/":
return WebSocketRoute(path, route.endpoint, name=route.name)
return None
def _detect_runtime_mode(env: Mapping[str, str] | None = None) -> tuple[RuntimeMode, str | None]:
"""Inspect deployment markers and return ``(mode, matched_marker_or_None)``.
@@ -257,7 +272,7 @@ def _workflow_event_to_update(event: WorkflowEvent[Any]) -> AgentResponseUpdate
@asynccontextmanager
async def _suppress_already_consumed() -> AsyncIterator[None]: # noqa: RUF029
async def _suppress_already_consumed() -> AsyncIterator[None]:
"""Yield, swallowing finalizer failures so consumer cleanup never crashes the host.
The bridge stream calls ``get_final_response()`` after iterating the
@@ -1233,7 +1248,7 @@ class AgentFrameworkHost:
Mirrors the ``AgentServerHost`` convention from
``azure.ai.agentserver.core``: one INFO line that captures the
target type, every channel + its mount path, the bind address
target type, every channel + its endpoint path, the bind address
(when known), whether we're running inside a Foundry Hosted
Agents container, and the worker count. Keeps log noise low
while still giving an operator a single grep-able anchor when
@@ -1290,11 +1305,19 @@ class AgentFrameworkHost:
for channel in self.channels:
contribution = channel.contribute(context)
# Channels publish routes relative to their root; mount under channel.path.
# An empty path means "mount at the app root" — useful for single-channel hosts
# that don't want a prefix (e.g. ResponsesChannel exposing POST /responses directly).
# An empty path means "mount at the app root" — useful when an external
# platform requires the channel endpoint at "/" or at a route contributed
# by the channel.
if contribution.routes:
if channel.path:
routes.append(Mount(channel.path, routes=list(contribution.routes)))
channel_routes = list(contribution.routes)
exact_routes = [
exact_route
for route in channel_routes
if (exact_route := _exact_path_route(channel.path, route)) is not None
]
routes.extend(exact_routes)
routes.append(Mount(channel.path, routes=channel_routes))
else:
routes.extend(contribution.routes)
on_startup.extend(contribution.on_startup)
@@ -710,7 +710,7 @@ class Channel(Protocol):
"""
name: str
path: str # default mount path (e.g. "/responses"); use "" to mount routes at the app root
path: str # default endpoint path (e.g. "/responses"); use "" to mount contributed routes at the app root
def contribute(self, context: ChannelContext) -> ChannelContribution: ...
+13 -1
View File
@@ -96,7 +96,7 @@ class _RecordingChannel:
self.pushes: list[tuple[ChannelIdentity, HostedRunResult[Any]]] = []
self._push_raises: Exception | None = None
self._supports_push = supports_push
# Provide a single trivial route so contribute() exercises the mount path.
# Provide a single trivial route so contribute() exercises the endpoint path.
self._routes: Sequence[BaseRoute] = (Route("/ping", _ping),)
def contribute(self, context: ChannelContext) -> ChannelContribution:
@@ -239,6 +239,18 @@ class TestHostWiring:
assert r.status_code == 200
assert r.json() == {"ok": True}
def test_app_mounts_root_route_at_exact_channel_path(self) -> None:
agent = _FakeAgent()
ch = _RecordingChannel(path="/fake")
ch._routes = (Route("/", _ping),)
host = AgentFrameworkHost(target=agent, channels=[ch])
with TestClient(host.app, follow_redirects=False) as client:
r = client.get("/fake")
assert r.status_code == 200
assert r.json() == {"ok": True}
assert client.get("/fake/").status_code == 200
def test_app_mounts_at_root_when_path_is_empty(self) -> None:
agent = _FakeAgent()
ch = _RecordingChannel(path="")
@@ -15,6 +15,7 @@ its own package (`agent-framework-hosting-responses`,
| [`local_responses/`](./local_responses) | The minimal shape: one agent + one `@tool` + `ResponsesChannel` + a single `run_hook` that strips caller-supplied options and forces a `reasoning` preset. | **Local only.** Start here to learn the run-hook seam. |
| [`local_responses_workflow/`](./local_responses_workflow) | A 4-step `Workflow` (typed `SloganBrief` intake → writer → legal → formatter) hosted behind **both** the Responses and Invocations channels via a shared `run_hook` that parses inbound text/JSON into the workflow's typed input. The host writes per-conversation checkpoints via `checkpoint_location=…`. Demonstrates workflow targets + structured input adaptation + multi-channel + resume-across-turns. Includes a `call_server.rest` file with REST examples for both endpoints. | **Local only.** |
| [`foundry_hosted_agent/`](./foundry_hosted_agent) | One Foundry agent, **Responses + Invocations only** — the minimal shape that is **runtime-compatible with the Foundry Hosted Agents platform**. | Ships with `Dockerfile` + `agent.yaml` + `agent.manifest.yaml` + `azure.yaml` so the same image runs locally **or** as a Foundry Hosted Agent (`azd up`). |
| [`foundry_telegram_invocations_weather/`](./foundry_telegram_invocations_weather) | Experimental Telegram weather bot that mounts `TelegramChannel` at `POST /invocations`, registers the Foundry Hosted Agents Invocations URL as the Telegram webhook, and uses `FoundryHostedAgentHistoryProvider` for storage. | Ships with `Dockerfile` + `agent.yaml` + `agent.manifest.yaml` + `azure.yaml`; used to validate whether a non-Responses channel can run under Foundry Invocations. |
| [`local_telegram/`](./local_telegram) | Adds Telegram, a `@tool`, `FileHistoryProvider`, run hooks (per-user / per-chat session keying), extra Telegram commands, and `ResponseTarget` multicast. Runs under Hypercorn with multiple workers. | **Local only.** No Dockerfile / Foundry packaging. |
| [`local_identity_link/`](./local_identity_link) | Everything in `local_telegram/` plus Teams and the Entra identity-link sidecar (`/auth/start` + `/auth/callback`). Demonstrates linking a Telegram chat to an Entra user so multiple non-Entra channels can share one isolation key. | **Local only.** No Dockerfile / Foundry packaging. |
@@ -3,7 +3,7 @@
Smallest end-to-end hosting sample. One Foundry-backed agent, two
channels, no human-chat surface — and that minimal shape is the whole
point: a host configured with at least the **Responses** and
**Invocations** channels under their default mount roots is
**Invocations** channels under their default endpoints is
**runtime-compatible with the Foundry Hosted Agents platform**. The
same container image runs locally, behind any ASGI server, or as a
Hosted Agent — no protocol shim, no extra adapter.
@@ -11,7 +11,7 @@ Hosted Agent — no protocol shim, no extra adapter.
| Route | Channel | Used by |
| ------------------------------ | -------------------- | ------------------------------------------- |
| `POST /responses` | `ResponsesChannel` | OpenAI Responses clients (`call_server.py`) |
| `POST /invocations/invoke` | `InvocationsChannel` | Host-native JSON envelope (Hosted Agents) |
| `POST /invocations` | `InvocationsChannel` | Host-native JSON envelope (Hosted Agents) |
## Conversation history
@@ -4,7 +4,7 @@
This sample is intentionally minimal and is **runtime-compatible with the
Foundry Hosted Agents platform**: a host that exposes the Responses and
Invocations channels under their default mount roots can be packaged as a
Invocations channels under their default endpoints can be packaged as a
container image and deployed to Foundry Hosted Agents without any protocol
shim. The same image runs locally, behind any ASGI server, or as a Hosted
Agent.
@@ -52,7 +52,7 @@ Run
Routes
------
- ``POST /responses`` OpenAI Responses-shaped surface.
- ``POST /invocations/invoke`` host-native JSON envelope.
- ``POST /invocations`` host-native JSON envelope.
"""
from __future__ import annotations
@@ -3,7 +3,7 @@
"""Call the foundry_hosted_agent server three ways.
The foundry_hosted_agent host exposes ``POST /responses`` (OpenAI Responses-shaped) and
``POST /invocations/invoke`` (host-native), and that minimal contract is
``POST /invocations`` (host-native), and that minimal contract is
**runtime-compatible with the Foundry Hosted Agents platform** so the same
agent code that calls the local server also calls the same image deployed
as a Hosted Agent.
@@ -0,0 +1,19 @@
FROM ghcr.io/astral-sh/uv:python3.12-bookworm-slim
WORKDIR /app
# The sample depends on hosting packages from Git refs until they publish to
# PyPI, so the remote builder needs git available during `uv sync`.
RUN apt-get update \
&& apt-get install -y --no-install-recommends git \
&& rm -rf /var/lib/apt/lists/*
COPY pyproject.toml ./
COPY app.py ./
RUN uv sync --no-dev
ENV PORT=8000
EXPOSE 8000
CMD ["uv", "run", "python", "app.py"]
@@ -0,0 +1,4 @@
*
!app.py
!pyproject.toml
!Dockerfile
@@ -0,0 +1,66 @@
# foundry_telegram_invocations_weather
Telegram weather bot sample for validating a non-Responses channel on Foundry
Hosted Agents. The sample configures `TelegramChannel(path="/invocations")` so
the webhook handler runs at the container endpoint `POST /invocations`; Foundry
exposes that route publicly as:
```text
{FOUNDRY_PROJECT_ENDPOINT}/agents/agent-framework-telegram-invocations-weather/endpoint/protocols/invocations?api-version=2025-11-15-preview
```
| Route | Channel | Used by |
|---|---|---|
| `POST /responses` | `ResponsesChannel` | Quick hosted-agent sanity checks |
| `POST /invocations` | `TelegramChannel` | Telegram webhook payloads |
The agent uses `FoundryHostedAgentHistoryProvider` and a small
`lookup_weather` tool so Telegram requests exercise model calls, tool calls,
and Foundry-hosted storage.
## Important platform note
This is an intentional experiment. Current Foundry Hosted Agents behavior
requires Entra bearer auth before a request reaches the container. Telegram
cannot attach that bearer token to webhook deliveries, so webhook registration
can succeed while live Telegram deliveries fail at the Foundry front door with
`401`. Authenticated calls to the Invocations endpoint are still useful for
validating the channel and storage behavior inside the container.
The sample does not configure `TELEGRAM_WEBHOOK_SECRET` because prior probing
showed Foundry strips Telegram's `X-Telegram-Bot-Api-Secret-Token` header before
the request reaches the container.
## Run locally
```bash
export FOUNDRY_PROJECT_ENDPOINT=https://<your-project>.services.ai.azure.com
export MODEL_DEPLOYMENT_NAME=gpt-5.4-nano
export TELEGRAM_BOT_TOKEN=<telegram-bot-token>
export TELEGRAM_WEBHOOK_URL=https://<public-local-tunnel>/invocations
az login
uv sync
uv run python app.py
```
## Deploy
```bash
set -a
. ../../../../.env
set +a
azd env set TELEGRAM_BOT_TOKEN "$TELEGRAM_BOT_TOKEN"
azd env set MODEL_DEPLOYMENT_NAME "${MODEL_DEPLOYMENT_NAME:-gpt-5.4-nano}"
azd env set HOSTING_INVOCATIONS_API_VERSION 2025-11-15-preview
azd up
```
If you connect this sample to an existing Foundry project instead of running
`azd provision`, make sure the azd environment has `AZURE_AI_PROJECT_ID` and the
project's ACR connection values set before running `azd deploy`.
On startup, `TelegramChannel` calls `setWebhook` using the Foundry public
Invocations URL derived from `FOUNDRY_PROJECT_ENDPOINT` and
`FOUNDRY_AGENT_NAME`.
@@ -0,0 +1,38 @@
name: agent-framework-telegram-invocations-weather
description: >
Telegram weather bot sample hosted by Agent Framework. The Telegram webhook
handler is mounted at /invocations so the Foundry Hosted Agents Invocations
protocol endpoint can be registered as the bot's webhook URL.
metadata:
tags:
- Agent Framework
- AI Agent Hosting
- Azure AI AgentServer
- Responses Protocol
- Invocations Protocol
- Telegram
template:
name: agent-framework-telegram-invocations-weather
kind: hosted
protocols:
- protocol: responses
version: 1.0.0
- protocol: invocations
version: 1.0.0
environment_variables:
- name: MODEL_DEPLOYMENT_NAME
value: "{{MODEL_DEPLOYMENT_NAME}}"
- name: TELEGRAM_BOT_TOKEN
value: "{{TELEGRAM_BOT_TOKEN}}"
- name: HOSTING_INVOCATIONS_API_VERSION
value: "{{HOSTING_INVOCATIONS_API_VERSION}}"
resources:
- kind: model
id: gpt-5.4-nano
name: MODEL_DEPLOYMENT_NAME
parameters:
properties:
- name: TELEGRAM_BOT_TOKEN
secret: true
- name: HOSTING_INVOCATIONS_API_VERSION
secret: false
@@ -0,0 +1,31 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/microsoft/AgentSchema/refs/heads/main/schemas/v1.0/ContainerAgent.yaml
kind: hosted
name: agent-framework-telegram-invocations-weather
description: |
Telegram weather bot sample hosted by Agent Framework. The Telegram webhook
handler is mounted at /invocations so the Foundry Hosted Agents Invocations
protocol endpoint can be registered as the bot's webhook URL.
metadata:
tags:
- Agent Framework
- AI Agent Hosting
- Azure AI AgentServer
- Responses Protocol
- Invocations Protocol
- Telegram
protocols:
- protocol: responses
version: 1.0.0
- protocol: invocations
version: 1.0.0
resources:
cpu: "1"
memory: 2Gi
environment_variables:
- name: MODEL_DEPLOYMENT_NAME
value: ${MODEL_DEPLOYMENT_NAME}
- name: TELEGRAM_BOT_TOKEN
value: ${TELEGRAM_BOT_TOKEN}
- name: HOSTING_INVOCATIONS_API_VERSION
value: ${HOSTING_INVOCATIONS_API_VERSION}
@@ -0,0 +1,194 @@
# Copyright (c) Microsoft. All rights reserved.
"""Telegram weather bot hosted behind Foundry Hosted Agents Invocations.
This sample intentionally mounts the Telegram webhook handler at the container's
``/invocations`` route so the Foundry public Invocations protocol URL can be
registered as the Telegram webhook URL:
``{FOUNDRY_PROJECT_ENDPOINT}/agents/{FOUNDRY_AGENT_NAME}/endpoint/protocols/invocations``
It uses ``FoundryHostedAgentHistoryProvider`` for conversation history and a
small weather tool to validate that a normal channel can run under the
Hosted Agents runtime. The sample also exposes Responses for a quick platform
sanity check.
Sample output after sending "weather in Amsterdam" to the Telegram bot:
Assistant:> Amsterdam is cloudy with a high of 16 C.
"""
from __future__ import annotations
import logging
import os
from dataclasses import replace
from typing import Annotated
from agent_framework import Agent, tool
from agent_framework.observability import enable_instrumentation
from agent_framework_foundry import FoundryChatClient
from agent_framework_foundry_hosting import FoundryHostedAgentHistoryProvider, foundry_response_id
from agent_framework_hosting import (
AgentFrameworkHost,
ChannelCommand,
ChannelCommandContext,
ChannelRequest,
)
from agent_framework_hosting_responses import ResponsesChannel
from agent_framework_hosting_telegram import TelegramChannel, telegram_isolation_key
from azure.identity.aio import DefaultAzureCredential
AGENT_NAME = "agent-framework-telegram-invocations-weather"
DEFAULT_MODEL_DEPLOYMENT = "gpt-5.4-nano"
DEFAULT_INVOCATIONS_API_VERSION = "2025-11-15-preview"
logging.basicConfig(
level=os.environ.get("LOG_LEVEL", "INFO").upper(),
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
for _noisy in (
"httpx",
"httpcore",
"azure.core.pipeline.policies.http_logging_policy",
"urllib3",
):
logging.getLogger(_noisy).setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
@tool(approval_mode="never_require")
def lookup_weather(location: Annotated[str, "The city to look up weather for."]) -> str:
"""Return a deterministic weather report for a city."""
reports = {
"seattle": "Seattle is rainy with a high of 12 C.",
"amsterdam": "Amsterdam is cloudy with a high of 16 C.",
"tokyo": "Tokyo is clear with a high of 22 C.",
"london": "London is misty with a high of 11 C.",
}
normalized = location.strip().lower()
return reports.get(normalized, f"{location} is sunny with a high of 20 C.")
def _foundry_invocations_webhook_url() -> str:
"""Build the public Foundry Invocations URL used as Telegram's webhook."""
explicit = os.environ.get("TELEGRAM_WEBHOOK_URL")
if explicit:
return explicit
project_endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"].rstrip("/")
agent_name = os.environ.get("FOUNDRY_AGENT_NAME", AGENT_NAME)
api_version = os.environ.get("HOSTING_INVOCATIONS_API_VERSION", DEFAULT_INVOCATIONS_API_VERSION)
return f"{project_endpoint}/agents/{agent_name}/endpoint/protocols/invocations?api-version={api_version}"
def _configure_observability() -> None:
"""Wire Azure Monitor OpenTelemetry when Foundry injects a connection string."""
conn_str = os.environ.get("APPLICATIONINSIGHTS_CONNECTION_STRING")
if not conn_str:
logger.info("APPLICATIONINSIGHTS_CONNECTION_STRING not set; skipping Azure Monitor export.")
return
from azure.monitor.opentelemetry import configure_azure_monitor # pyright: ignore[reportUnknownVariableType]
configure_azure_monitor(connection_string=conn_str)
logger.info("Azure Monitor OpenTelemetry configured.")
def telegram_hook(request: ChannelRequest, **_: object) -> ChannelRequest:
"""Clamp request options for Telegram-originating runs."""
options = dict(request.options or {})
options.pop("store", None)
options["reasoning"] = {"effort": "high", "summary": "auto"}
return replace(request, options=options)
def make_commands() -> list[ChannelCommand]:
"""Create Telegram slash commands used by the sample."""
async def handle_start(ctx: ChannelCommandContext) -> None:
await ctx.reply("Hi! Ask me for weather in Seattle, Amsterdam, Tokyo, London, or any city.")
async def handle_help(ctx: ChannelCommandContext) -> None:
await ctx.reply(
"/weather <city> - call the weather tool directly\n"
"/whoami - show your Telegram session key\n"
"/help - show this message"
)
async def handle_whoami(ctx: ChannelCommandContext) -> None:
await ctx.reply(f"Your session key is {telegram_isolation_key(ctx.request.attributes.get('chat_id'))}.")
async def handle_weather(ctx: ChannelCommandContext) -> None:
command_text = ctx.request.input if isinstance(ctx.request.input, str) else ""
_, _, location = command_text.partition(" ")
await ctx.reply(lookup_weather(location=(location.strip() or "Seattle")))
return [
ChannelCommand("start", "Introduce the bot", handle_start),
ChannelCommand("help", "List available commands", handle_help),
ChannelCommand("whoami", "Show the Telegram session key", handle_whoami),
ChannelCommand("weather", "Call the weather tool: /weather <city>", handle_weather),
]
def build_host() -> AgentFrameworkHost:
"""Build the Foundry-hosted Telegram weather agent."""
# 1. Create a shared credential for model calls and Foundry storage.
credential = DefaultAzureCredential()
project_endpoint = os.environ["FOUNDRY_PROJECT_ENDPOINT"]
# 2. Create the agent with a simple weather tool and Foundry-backed history.
agent = Agent(
client=FoundryChatClient(
project_endpoint=project_endpoint,
model=os.environ.get("MODEL_DEPLOYMENT_NAME", DEFAULT_MODEL_DEPLOYMENT),
credential=credential,
),
name="TelegramInvocationsWeatherAgent",
instructions=(
"You are a concise weather assistant. Use lookup_weather for weather questions "
"and answer in one short sentence."
),
tools=[lookup_weather],
context_providers=[
FoundryHostedAgentHistoryProvider(
credential=credential,
endpoint=project_endpoint,
),
],
)
# 3. Register Telegram at /invocations and keep Responses available for sanity checks.
return AgentFrameworkHost(
target=agent,
allow_in_process_runner=True,
channels=[
ResponsesChannel(response_id_factory=foundry_response_id),
TelegramChannel(
bot_token=os.environ["TELEGRAM_BOT_TOKEN"],
path="/invocations",
transport="webhook",
webhook_url=_foundry_invocations_webhook_url(),
parse_mode="Markdown",
commands=make_commands(),
run_hook=telegram_hook,
),
],
)
_configure_observability()
enable_instrumentation(enable_sensitive_data=True)
app = build_host().app
if __name__ == "__main__":
import asyncio
import hypercorn.asyncio
import hypercorn.config
config = hypercorn.config.Config()
config.bind = [f"0.0.0.0:{int(os.environ.get('PORT', '8000'))}"]
asyncio.run(hypercorn.asyncio.serve(app, config)) # type: ignore[arg-type]
@@ -0,0 +1,18 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/Azure/azure-dev/main/schemas/v1.0/azure.yaml.json
requiredVersions:
extensions:
azure.ai.agents: '>=0.1.0-preview'
name: ai-foundry-telegram-invocations-weather
services:
agent-framework-telegram-invocations-weather:
project: .
host: azure.ai.agent
language: docker
docker:
remoteBuild: true
config:
container:
resources:
cpu: "1"
memory: 2Gi
@@ -0,0 +1,26 @@
[project]
name = "agent-framework-hosting-foundry-telegram-invocations-weather"
version = "0.0.1"
description = "Foundry Hosted Agents Telegram weather sample using the Invocations path."
requires-python = ">=3.10"
dependencies = [
"agent-framework-foundry",
"agent-framework-foundry-hosting",
"agent-framework-hosting",
"agent-framework-hosting-responses",
"agent-framework-hosting-telegram",
"azure-identity",
"aiohttp>=3.13.5",
"hypercorn>=0.17",
"mcp>=1.24,<2",
"azure-monitor-opentelemetry>=1.6",
]
[tool.uv]
package = false
[tool.uv.sources]
agent-framework-foundry-hosting = { git = "https://github.com/microsoft/agent-framework.git", branch = "feature/python-hosting", subdirectory = "python/packages/foundry_hosting" }
agent-framework-hosting = { git = "https://github.com/microsoft/agent-framework.git", branch = "feature/python-hosting", subdirectory = "python/packages/hosting" }
agent-framework-hosting-responses = { git = "https://github.com/microsoft/agent-framework.git", branch = "feature/python-hosting", subdirectory = "python/packages/hosting-responses" }
agent-framework-hosting-telegram = { git = "https://github.com/microsoft/agent-framework.git", branch = "feature/python-hosting", subdirectory = "python/packages/hosting-telegram" }
@@ -15,7 +15,7 @@ of the workflow.
`Workflow` target and dispatches to `workflow.run(...)` (no
`Agent.create_session(...)`).
- Two channels are mounted side-by-side (`ResponsesChannel` at
`/responses`, `InvocationsChannel` at `/invocations/invoke`). Both
`/responses`, `InvocationsChannel` at `/invocations`). Both
share the **same `brief_hook`** that **adapts the channel-native
input into the workflow start executor's typed input** — Responses
delivers a `list[Message]`, Invocations delivers a `str`, but the
@@ -45,7 +45,7 @@ Content-Type: application/json
###
# 4. Invocations API — structured brief
POST {{host}}/invocations/invoke
POST {{host}}/invocations
Content-Type: application/json
{
@@ -55,7 +55,7 @@ Content-Type: application/json
###
# 5. Invocations API — plain topic
POST {{host}}/invocations/invoke
POST {{host}}/invocations
Content-Type: application/json
{
@@ -66,7 +66,7 @@ Content-Type: application/json
###
# 6. Invocations API — resume the same session_id to reuse the
# workflow's per-conversation checkpoint store.
POST {{host}}/invocations/invoke
POST {{host}}/invocations
Content-Type: application/json
{
@@ -77,7 +77,7 @@ Content-Type: application/json
###
# 7. Invocations API — streaming (SSE; one `data:` line per chunk,
# terminated by `data: [DONE]`).
POST {{host}}/invocations/invoke
POST {{host}}/invocations
Content-Type: application/json
Accept: text/event-stream