Files
Eduard van Valkenburg 6b822853eb Python: add hosting Channels sample apps (#5645)
* samples(hosting): add hosting Channels sample apps under samples/04-hosting/af-hosting

Adds five end-to-end sample apps under
``python/samples/04-hosting/af-hosting/`` that exercise the
``agent-framework-hosting`` Channels stack from the simplest single-channel
case up to a multi-channel deployment with cross-channel identity linking.

Samples (ordered by complexity)
-------------------------------

* ``foundry_hosted_agent/`` — minimal Responses + Invocations host with a
  Foundry-backed agent and ``FoundryHostedAgentHistoryProvider``.
  ``agd``-deployable; bundles a ``Dockerfile`` and
  ``scripts/vendor-packages.sh`` that copies workspace packages into
  ``_vendor/`` for self-contained builds. ``_vendor/`` is gitignored.
* ``local_responses/`` — single-channel Responses host with a
  ``run_hook`` that strips caller-supplied options and forces a
  reasoning preset. Demonstrates the hook seam over the uniform
  ``ChannelRequest`` envelope.
* ``local_responses_workflow/`` — Responses + Invocations exposing a
  three-agent workflow with per-conversation checkpoint storage.
* ``local_telegram/`` — Responses + Telegram with a ``@tool``,
  ``FileHistoryProvider``, hooks, and a ``ResponseTarget`` multicast
  variant (``call_server_multicast.py``) that pushes a single Responses
  reply to a separate Telegram chat.
* ``local_identity_link/`` — full surface: Responses + Invocations +
  Telegram + Activity Protocol (Teams) + the ``EntraIdentityLinkChannel``
  sidecar. Resolves per-channel ids onto a single Entra object id so a
  user's history follows them across surfaces.

Notes
-----

* Samples that use Telegram/Teams via Activity Protocol depend on the
  renamed ``agent-framework-hosting-activity-protocol`` package (see the
  PR-5 series).
* All samples use ``[tool.uv.sources]`` editable workspace deps, except
  ``foundry_hosted_agent/`` which uses the ``./_vendor/`` self-contained
  layout for ``azd`` Docker builds.
* Each sample includes a ``README.md`` with run instructions and an
  ``app.py`` ASGI entrypoint plus a ``call_server.py`` client harness.

Depends on the prior hosting PRs (foundry-hosted-agent refactor +
hosting-core + the per-channel packages). After those merge, this
branch can be rebased onto ``main`` cleanly.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* samples(hosting): point sample deps at the feature/python-hosting GitHub branch

Switches every sample's ``[tool.uv.sources]`` from in-monorepo
editable path deps (which only resolve when running inside the
agent-framework workspace) to git refs targeting the
``feature/python-hosting`` branch on
``microsoft/agent-framework``. Samples now install standalone outside
the monorepo while the ``agent-framework-hosting*`` packages are still
pre-PyPI; once they publish, the ``[tool.uv.sources]`` block can be
dropped and the declared deps resolve from PyPI.

Cleanup
-------

* Drops ``foundry_hosted_agent/scripts/vendor-packages.sh``,
  ``_vendor/`` from ``.gitignore``, the ``hooks.prepackage`` block in
  ``azure.yaml`` and the ``COPY _vendor/`` step in the Dockerfile —
  vendoring is no longer needed because git refs make the deps
  network-resolvable from any context.
* Drops obsolete ``workspace.pyproject.toml`` reference and ``scripts/``
  / ``workspace.pyproject.toml`` entries from
  ``Dockerfile.dockerignore``.
* Updates the foundry sample's Dockerfile to ``uv sync --no-dev``
  (no ``--frozen``) so it locks fresh against the GitHub-hosted deps
  at build time.
* Drops every committed ``uv.lock`` because the resolver needs network
  access to ``feature/python-hosting`` to lock — they regenerate the
  first time a user runs ``uv sync`` after the branch lands.
* Refreshes the per-sample READMEs to mention the GitHub install path
  instead of "in-tree workspace packages".

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* samples(hosting): address PR #5645 review comments

- foundry_hosted_agent/call_server.py: replace hard-coded
  project_endpoint and service_session_id with FOUNDRY_PROJECT_ENDPOINT,
  FOUNDRY_HOSTED_AGENT_NAME, and optional FOUNDRY_HOSTED_SESSION_ID
  environment variables. Session-id is now optional so the sample
  exercises the new-conversation path by default.

- local_identity_link/app.py:
  * make_telegram_hook: apply the reasoning bump regardless of
    identity-link state (the previous early-return on linked chats
    silently dropped the high-effort preset for the very flow the
    sample exists to demonstrate).
  * make_responses_hook: add a prominent DEV-ONLY warning that the
    client-supplied entra_oid shortcut bypasses identity verification
    and must be replaced by a JWT validator in production.
  * /link command: early-return when chat_id is missing instead of
    minting an authorize URL keyed on "telegram:None" (which would
    poison the link store with a binding any future chat_id-less
    update would collapse onto).
  * Switch ENTRA_CERT_PATH / ENTRA_CERT_PASSWORD env vars to the
    longer ENTRA_CERTIFICATE_PATH / ENTRA_CERTIFICATE_PASSWORD names
    that the README already documents.
  * channels: Sequence[Channel] -> list[Channel] (the next line
    appends, which a Sequence type doesn't expose).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* chore(hosting-samples): apply sample formatting

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* fix(hosting-samples): guard command input text

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-05-28 14:57:46 +02:00

228 lines
8.7 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Advanced multi-channel hosting sample.
Builds on ``app.py`` to demonstrate:
- a function ``@tool`` on the agent (``lookup_weather``),
- per-isolation-key history persisted via ``FileHistoryProvider``,
- a ``ResponsesChannel`` ``run_hook`` that clamps caller-supplied
``ChatOptions`` and honours the OpenAI ``previous_response_id`` field as
the ``AgentSession`` id — so a Responses caller can resume a Telegram
chat by passing ``previous_response_id="telegram:<chat_id>"`` (or any
other isolation key written by another channel),
- a ``TelegramChannel`` ``run_hook`` that bumps ``temperature`` for a
chattier Telegram persona,
- a richer Telegram command catalog including a ``/new`` command that resets
the cached session for the chat.
Required env: ``FOUNDRY_PROJECT_ENDPOINT``, ``FOUNDRY_MODEL``,
``TELEGRAM_BOT_TOKEN``. Auth uses ``DefaultAzureCredential``.
Run
---
This module exposes ``app`` as the canonical ASGI surface. Recommended
production launch is **Hypercorn**::
hypercorn app:app --bind 0.0.0.0:8000 --workers 4
The ``__main__`` block below uses ``host.serve(...)`` (single-process
Hypercorn) as a local-dev fallback.
Note
----
``FileHistoryProvider`` provides only in-process file-write locking. Running
multiple Hypercorn workers against the same ``./sessions`` directory is fine
for this sample, but a production deployment should swap it for a store with
cross-process consistency.
"""
from __future__ import annotations
import os
from dataclasses import replace
from pathlib import Path
from random import randint
from typing import Annotated
from agent_framework import Agent, FileHistoryProvider, tool
from agent_framework_foundry import FoundryChatClient
from agent_framework_hosting import (
AgentFrameworkHost,
ChannelCommand,
ChannelCommandContext,
ChannelRequest,
ChannelSession,
)
from agent_framework_hosting_responses import ResponsesChannel
from agent_framework_hosting_telegram import TelegramChannel, telegram_isolation_key
from azure.identity.aio import DefaultAzureCredential
# import logging
# logging.basicConfig(level=logging.DEBUG)
SESSIONS_DIR = Path(__file__).resolve().parent / "storage" / "sessions"
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
# --------------------------------------------------------------------------- #
# Tools the agent can call
# --------------------------------------------------------------------------- #
@tool(approval_mode="never_require")
def lookup_weather(
location: Annotated[str, "The city to look up weather for."],
) -> str:
"""Return a deterministic weather report for a city."""
high_temp = randint(5, 25)
reports = {
"Seattle": f"Seattle is rainy with a high of {high_temp}°C.",
"Amsterdam": f"Amsterdam is cloudy with a high of {high_temp}°C.",
"Tokyo": f"Tokyo is clear with a high of {high_temp}°C.",
}
return reports.get(location, f"{location} is sunny with a high of {high_temp}°C.")
# --------------------------------------------------------------------------- #
# Responses channel run hook
# --------------------------------------------------------------------------- #
def responses_hook(request: ChannelRequest, *, protocol_request: dict | None = None, **_: object) -> ChannelRequest:
"""Validate, rewrite, and key the channel-built ChannelRequest before invocation.
The spec calls this out as the developer's runtime escape hatch over the
uniform ``ChannelRequest`` envelope. Things this hook does:
- **strip** ``store`` and ``temperature`` (the agent owns persistence via ``FileHistoryProvider``),
- **inject a session** keyed on the request body. The OpenAI Responses
``previous_response_id`` field doubles as our isolation key — the
``ResponsesChannel`` already lifts it onto ``request.session``, so any
caller can resume an arbitrary AgentSession (including one written by
another channel, e.g. ``telegram:8741188429``) by passing it as
``previous_response_id``. When the caller doesn't pass one, fall back
to a key derived from the OpenAI ``safety_identifier`` field
(``responses:<id>``).
"""
options = dict(request.options or {})
# this agent will only run with models that do not support Temperature, so removing it.
options.pop("temperature", None)
options.pop("store", None)
body = protocol_request or {}
if request.session is not None and request.session.isolation_key:
# Caller supplied ``previous_response_id`` — the channel already
# used it as the AgentSession id. Keep it as-is.
session = request.session
else:
safety_id = body.get("safety_identifier") or "anonymous"
session = ChannelSession(isolation_key=f"responses:{safety_id}")
return replace(
request,
session=session,
options=options or None,
)
def telegram_hook(request: ChannelRequest, **_: object) -> ChannelRequest:
"""Telegram users get a chattier model — bump temperature on every turn."""
options = dict(request.options or {})
options["reasoning"] = {"effort": "high", "summary": "detailed"}
return replace(request, options=options)
# --------------------------------------------------------------------------- #
# Telegram commands
# --------------------------------------------------------------------------- #
def _isolation_key(ctx: ChannelCommandContext) -> str:
return telegram_isolation_key(ctx.request.attributes.get("chat_id"))
def make_commands(host_ref: dict[str, AgentFrameworkHost]) -> list[ChannelCommand]:
"""Build commands that close over the host so ``/new`` can reset state."""
async def handle_start(ctx: ChannelCommandContext) -> None:
await ctx.reply("Hi! I'm a multi-channel agent.\nCommands: /new, /whoami, /weather <city>, /help.")
async def handle_help(ctx: ChannelCommandContext) -> None:
await ctx.reply(
"/new — start a fresh conversation\n"
"/whoami — show your isolation key\n"
"/weather <city> — call the weather tool directly\n"
"/help — this message"
)
async def handle_new(ctx: ChannelCommandContext) -> None:
host_ref["host"].reset_session(_isolation_key(ctx))
await ctx.reply("New session started. Previous history is cleared for this chat.")
async def handle_whoami(ctx: ChannelCommandContext) -> None:
await ctx.reply(f"Your isolation key on this host is: {_isolation_key(ctx)}")
async def handle_weather(ctx: ChannelCommandContext) -> None:
# Bypass the agent and call the tool directly to demonstrate that
# commands have full control over how they reply.
command_text = ctx.request.input if isinstance(ctx.request.input, str) else ""
_, _, location = command_text.partition(" ")
location = location.strip() or "Seattle"
await ctx.reply(lookup_weather(location=location))
return [
ChannelCommand("start", "Introduce the bot", handle_start),
ChannelCommand("help", "List available commands", handle_help),
ChannelCommand("new", "Start a new session for this chat", handle_new),
ChannelCommand("whoami", "Show the isolation key for this chat", handle_whoami),
ChannelCommand("weather", "Call the weather tool: /weather <city>", handle_weather),
]
# --------------------------------------------------------------------------- #
# Host wiring
# --------------------------------------------------------------------------- #
def build_host() -> AgentFrameworkHost:
agent = Agent(
client=FoundryChatClient(credential=DefaultAzureCredential()),
name="WeatherAgent",
instructions=(
"You are a friendly weather assistant. Use the lookup_weather tool "
"for any weather question and answer in one short sentence."
),
tools=[lookup_weather],
context_providers=[FileHistoryProvider(SESSIONS_DIR)],
default_options={"store": False},
)
host_ref: dict[str, AgentFrameworkHost] = {}
host = AgentFrameworkHost(
target=agent,
channels=[
ResponsesChannel(run_hook=responses_hook),
TelegramChannel(
bot_token=os.environ["TELEGRAM_BOT_TOKEN"],
webhook_url=os.environ.get("TELEGRAM_WEBHOOK_URL"),
secret_token=os.environ.get("TELEGRAM_WEBHOOK_SECRET"),
parse_mode="Markdown",
commands=make_commands(host_ref),
run_hook=telegram_hook,
),
],
debug=True,
)
host_ref["host"] = host
return host
app = build_host().app
if __name__ == "__main__":
build_host().serve(host="0.0.0.0", port=int(os.environ.get("PORT", "8000")))