Files
agent-framework/python/packages/hosting/tests/test_isolation.py
Eduard van Valkenburg 36ce0950e4 Simplify Python hosting core (#6492)
Remove linking, multicast, durable delivery, and host push machinery from the v1 hosting core. Keep those scenarios in a proposed follow-up ADR and update channel packages, samples, docs, tests, and workspace metadata around the smaller host/channel contract.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
2026-06-12 08:34:08 +02:00

304 lines
13 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
"""Tests for the per-request isolation contextvar surface in
:mod:`agent_framework_hosting._isolation`.
The isolation keys are the ONLY seam Foundry-aware providers use to
find partition keys, and the host's ASGI middleware lifts them off the
two well-known headers on every inbound HTTP request. A regression
that drops the lookup, mistypes a header name, or fails to reset the
contextvar would silently misroute writes / leak per-request state
across requests, with zero unit-test signal — so cover the surface
fully here.
"""
from __future__ import annotations
import asyncio
import pytest
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import BaseRoute, Route
from starlette.testclient import TestClient
from agent_framework_hosting import (
Channel,
ChannelContext,
ChannelContribution,
IsolationKeys,
get_current_isolation_keys,
reset_current_isolation_keys,
set_current_isolation_keys,
)
from agent_framework_hosting._isolation import ( # pyright: ignore[reportPrivateUsage]
ISOLATION_HEADER_CHAT,
ISOLATION_HEADER_USER,
current_isolation_keys,
)
class TestIsolationKeys:
def test_defaults_to_none_pair(self) -> None:
keys = IsolationKeys()
assert keys.user_key is None
assert keys.chat_key is None
assert keys.is_empty is True
def test_partial_with_only_user_is_not_empty(self) -> None:
keys = IsolationKeys(user_key="alice")
assert keys.user_key == "alice"
assert keys.chat_key is None
assert keys.is_empty is False
def test_partial_with_only_chat_is_not_empty(self) -> None:
keys = IsolationKeys(chat_key="general")
assert keys.is_empty is False
def test_full_pair_is_not_empty(self) -> None:
keys = IsolationKeys(user_key="alice", chat_key="general")
assert keys.is_empty is False
class TestContextVarHelpers:
def test_default_is_none(self) -> None:
# Each test gets a fresh contextvar value because pytest runs
# tests in fresh contexts. ``get`` returns the default.
assert get_current_isolation_keys() is None
def test_set_and_get_round_trip(self) -> None:
token = set_current_isolation_keys(IsolationKeys(user_key="alice", chat_key="general"))
try:
current = get_current_isolation_keys()
assert current is not None
assert current.user_key == "alice"
assert current.chat_key == "general"
finally:
reset_current_isolation_keys(token)
# Reset restores prior value (None in the default context).
assert get_current_isolation_keys() is None
def test_set_with_none_clears(self) -> None:
outer = set_current_isolation_keys(IsolationKeys(user_key="alice"))
try:
inner = set_current_isolation_keys(None)
try:
assert get_current_isolation_keys() is None
finally:
reset_current_isolation_keys(inner)
# Reset surfaces the outer value again.
current = get_current_isolation_keys()
assert current is not None
assert current.user_key == "alice"
finally:
reset_current_isolation_keys(outer)
def test_module_level_contextvar_is_the_same_instance(self) -> None:
"""Direct contextvar access (used by the ASGI middleware) and the
public `get_current_isolation_keys()` helper read from the SAME
underlying contextvar. A regression that introduced a second
contextvar would silently break the middleware → provider hop."""
token = current_isolation_keys.set(IsolationKeys(user_key="bob"))
try:
via_helper = get_current_isolation_keys()
assert via_helper is not None
assert via_helper.user_key == "bob"
finally:
current_isolation_keys.reset(token)
class TestHeaderConstants:
"""The two header names are part of the public contract — they
match the ones the Foundry Hosted Agents runtime stamps on every
inbound request. A typo here would silently misroute partition
writes."""
def test_user_header_value(self) -> None:
assert ISOLATION_HEADER_USER == "x-agent-user-isolation-key"
def test_chat_header_value(self) -> None:
assert ISOLATION_HEADER_CHAT == "x-agent-chat-isolation-key"
# --------------------------------------------------------------------------- #
# End-to-end: ASGI middleware lifts the headers into the contextvar.
# --------------------------------------------------------------------------- #
class _IsolationProbeChannel:
"""A minimal Channel that exposes a single GET route which captures
the contextvar value INSIDE the request and returns it as JSON.
Tests use this to exercise the full middleware → contextvar →
handler hop end-to-end.
"""
name = "probe"
path = ""
def __init__(self) -> None:
self.captured: list[IsolationKeys | None] = []
async def _handler(_request: Request) -> JSONResponse:
keys = get_current_isolation_keys()
self.captured.append(keys)
payload = (
{"user": keys.user_key, "chat": keys.chat_key}
if keys is not None
else {"user": None, "chat": None, "_present": False}
)
return JSONResponse(payload)
self._routes: list[BaseRoute] = [Route("/probe", _handler)]
def contribute(self, _context: ChannelContext) -> ChannelContribution:
return ChannelContribution(routes=self._routes)
def _make_host_with_probe() -> tuple[object, _IsolationProbeChannel]:
from agent_framework_hosting import AgentFrameworkHost
class _NoopAgent:
async def run(self, *_args: object, **_kwargs: object) -> object: # pragma: no cover - never called
raise RuntimeError("not invoked")
probe = _IsolationProbeChannel()
assert isinstance(probe, Channel)
host = AgentFrameworkHost(target=_NoopAgent(), channels=[probe]) # type: ignore[arg-type]
return host, probe
class TestIsolationMiddlewareEndToEnd:
def test_headers_ignored_outside_foundry_environment(self) -> None:
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get(
"/probe",
headers={
ISOLATION_HEADER_USER: "alice-uid",
ISOLATION_HEADER_CHAT: "general-cid",
},
)
assert r.status_code == 200
assert r.json() == {"user": None, "chat": None, "_present": False}
assert probe.captured == [None]
def test_both_headers_lifted_into_contextvar(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get(
"/probe",
headers={
ISOLATION_HEADER_USER: "alice-uid",
ISOLATION_HEADER_CHAT: "general-cid",
},
)
assert r.status_code == 200
assert r.json() == {"user": "alice-uid", "chat": "general-cid"}
assert len(probe.captured) == 1
captured = probe.captured[0]
assert captured is not None
assert captured.user_key == "alice-uid"
assert captured.chat_key == "general-cid"
def test_only_user_header_lifted(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""One-header-only branch: the middleware still binds (chat=None)."""
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get("/probe", headers={ISOLATION_HEADER_USER: "alice-uid"})
assert r.status_code == 200
assert r.json() == {"user": "alice-uid", "chat": None}
def test_only_chat_header_lifted(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get("/probe", headers={ISOLATION_HEADER_CHAT: "general-cid"})
assert r.status_code == 200
assert r.json() == {"user": None, "chat": "general-cid"}
def test_no_headers_keeps_contextvar_none(self) -> None:
"""Local-dev path: with neither header present the middleware is
a no-op and the contextvar stays at its default ``None`` —
providers see "no isolation" and route to the in-memory
fallback rather than picking up stale per-request state."""
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get("/probe")
assert r.status_code == 200
assert r.json() == {"user": None, "chat": None, "_present": False}
assert probe.captured == [None]
def test_empty_header_value_treated_as_absent(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""A header that's present but empty must not bind an empty key —
``IsolationContext`` rejects empty strings on the read side."""
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get(
"/probe",
headers={
ISOLATION_HEADER_USER: "",
ISOLATION_HEADER_CHAT: "general-cid",
},
)
assert r.status_code == 200
# Empty user header decodes to None; chat key stays bound.
assert r.json() == {"user": None, "chat": "general-cid"}
def test_contextvar_resets_after_request(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""The middleware must call ``reset_current_isolation_keys`` in
a ``finally`` so per-request state never leaks across requests
or back into the calling thread's context."""
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
with TestClient(host.app) as client: # type: ignore[attr-defined]
r1 = client.get("/probe", headers={ISOLATION_HEADER_USER: "alice-uid"})
assert r1.status_code == 200
# Reading the contextvar OUTSIDE the request scope must see
# the default — not the value the prior request bound.
assert get_current_isolation_keys() is None
# And a follow-up request without headers gets a clean
# ``None`` rather than inheriting alice-uid.
r2 = client.get("/probe")
assert r2.json() == {"user": None, "chat": None, "_present": False}
def test_concurrent_requests_get_isolated_contextvars(self, monkeypatch: pytest.MonkeyPatch) -> None:
"""Different requests run in different async contexts; binding
from request A must NOT leak into a concurrent request B."""
monkeypatch.setenv("FOUNDRY_HOSTING_ENVIRONMENT", "1")
host, probe = _make_host_with_probe()
async def _drive() -> None:
# Run two requests in parallel asyncio tasks against the
# same TestClient and assert their captures don't bleed
# into each other.
async def _hit(user_key: str) -> dict[str, str | None]:
with TestClient(host.app) as client: # type: ignore[attr-defined]
r = client.get("/probe", headers={ISOLATION_HEADER_USER: user_key})
return r.json() # type: ignore[no-any-return]
r_alice, r_bob = await asyncio.gather(_hit("alice-uid"), _hit("bob-uid"))
assert r_alice == {"user": "alice-uid", "chat": None}
assert r_bob == {"user": "bob-uid", "chat": None}
asyncio.run(_drive())
class TestNonHttpScopesPassThrough:
"""The middleware intentionally only inspects ``http`` scopes;
lifespan / websocket scopes are forwarded untouched. A regression
that touched lifespan scopes here would crash boot."""
async def test_lifespan_scope_does_not_consult_headers(self) -> None:
# The TestClient context manager exercises the lifespan scope
# implicitly; if the middleware tried to decode headers on a
# non-http scope this would raise. Exercise it without binding
# any contextvar work.
host, _probe = _make_host_with_probe()
with TestClient(host.app): # type: ignore[attr-defined]
# Just enter / exit; no requests.
pass