# Copyright (c) Microsoft. All rights reserved. """Unit tests for :mod:`agent_framework_hosting_activity_protocol`. The Bot Framework outbound calls and azure-identity credentials are mocked out so the suite never touches the network. Live token acquisition, streaming edits and certificate paths are out of scope here. """ from __future__ import annotations from dataclasses import dataclass from typing import Any from unittest.mock import AsyncMock, MagicMock import pytest from agent_framework_hosting import ( AgentFrameworkHost, ChannelCommand, ChannelCommandContext, ChannelRequest, HostedRunResult, ) from starlette.testclient import TestClient from agent_framework_hosting_activity_protocol import ActivityProtocolChannel, activity_protocol_isolation_key from agent_framework_hosting_activity_protocol._channel import _command_text, _parse_activity def test_activity_protocol_isolation_key_format() -> None: assert activity_protocol_isolation_key("19:meeting_xyz@thread.v2") == "activity:19:meeting_xyz@thread.v2" assert activity_protocol_isolation_key(123) == "activity:123" class TestParseActivity: def test_text_only(self) -> None: msg = _parse_activity({"type": "message", "text": "hello"}) assert msg.role == "user" assert msg.text == "hello" def test_with_attachment(self) -> None: msg = _parse_activity({ "type": "message", "text": "see this", "attachments": [ {"contentType": "image/png", "contentUrl": "https://example.com/x.png"}, ], }) assert msg.text == "see this" assert any((getattr(c, "uri", None) or "").endswith("/x.png") for c in msg.contents) def test_skips_invalid_attachments(self) -> None: msg = _parse_activity({ "type": "message", "text": "hi", "attachments": [ "not-a-mapping", {"contentType": "image/png"}, # no url {"contentUrl": "https://example.com/y", "contentType": "no-slash"}, ], }) assert msg.text == "hi" # No URI content survived. assert not any(getattr(c, "uri", None) for c in msg.contents) def test_skips_teams_text_html_inline_content(self) -> None: # Teams attaches a text/html rendering whose inline ``content`` is raw # HTML (not a URL). It must not be parsed as a URI. msg = _parse_activity({ "type": "message", "text": "hello there", "attachments": [ {"contentType": "text/html", "content": "

hello there

"}, ], }) assert msg.text == "hello there" assert not any(getattr(c, "uri", None) for c in msg.contents) def test_skips_attachment_contenturl_without_scheme(self) -> None: msg = _parse_activity({ "type": "message", "text": "hi", "attachments": [ {"contentType": "image/png", "contentUrl": "/relative/path.png"}, ], }) assert msg.text == "hi" assert not any(getattr(c, "uri", None) for c in msg.contents) class TestCommandText: def test_plain_text_unchanged(self) -> None: assert _command_text({"text": "/help"}) == "/help" def test_non_string_text_returns_empty(self) -> None: assert _command_text({"text": None}) == "" assert _command_text({}) == "" def test_strips_bot_mention(self) -> None: activity = { "text": "Personal Assistant /todos", "recipient": {"id": "bot-1"}, "entities": [ {"type": "mention", "text": "Personal Assistant", "mentioned": {"id": "bot-1"}}, ], } assert _command_text(activity) == "/todos" def test_strips_bot_mention_without_space(self) -> None: activity = { "text": "Bot/help", "recipient": {"id": "bot-1"}, "entities": [{"type": "mention", "text": "Bot", "mentioned": {"id": "bot-1"}}], } assert _command_text(activity) == "/help" def test_keeps_other_user_mention(self) -> None: activity = { "text": "/whoami Someone", "recipient": {"id": "bot-1"}, "entities": [{"type": "mention", "text": "Someone", "mentioned": {"id": "user-9"}}], } # Another user's mention must not be stripped. assert _command_text(activity) == "/whoami Someone" def test_malformed_entities_are_ignored(self) -> None: activity = { "text": "/help", "recipient": {"id": "bot-1"}, "entities": ["not-a-mapping", {"type": "clientInfo"}, {"type": "mention"}], } assert _command_text(activity) == "/help" @dataclass class _FakeAgentResponse: text: str @dataclass class _FakeUpdate: text: str class _FakeStream: def __init__(self, chunks: list[str]) -> None: self._chunks = chunks def __aiter__(self) -> Any: async def gen() -> Any: for chunk in self._chunks: yield _FakeUpdate(chunk) return gen() async def get_final_response(self) -> _FakeAgentResponse: return _FakeAgentResponse(text="".join(self._chunks)) class _FakeAgent: def __init__(self, reply: str = "ok") -> None: self._reply = reply self.runs: list[Any] = [] def create_session(self, *, session_id: str | None = None) -> Any: return {"session_id": session_id} def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs}) if stream: return _FakeStream([self._reply]) async def _coro() -> _FakeAgentResponse: return _FakeAgentResponse(text=self._reply) return _coro() def _make_teams( stream: bool = False, *, path: str = "/activity/messages" ) -> tuple[ActivityProtocolChannel, _FakeAgent]: agent = _FakeAgent("hi there") ch = ActivityProtocolChannel(path=path, stream=stream, send_typing_action=False) fake_http = MagicMock() response_mock = MagicMock() response_mock.raise_for_status = MagicMock() response_mock.json = MagicMock(return_value={"id": "act-1"}) fake_http.post = AsyncMock(return_value=response_mock) fake_http.put = AsyncMock(return_value=response_mock) fake_http.aclose = AsyncMock() ch._http = fake_http return ch, agent _VALID_ACTIVITY: dict[str, Any] = { "type": "message", "id": "in-1", "text": "hello bot", "conversation": {"id": "19:meeting_xyz@thread.v2"}, "from": {"id": "user-1"}, "recipient": {"id": "bot-1"}, "channelId": "msteams", "serviceUrl": "https://smba.trafficmanager.net/amer/", } # Minimal request envelope for direct ``_stream_to_conversation`` calls. _VALID_REQUEST = ChannelRequest(channel="activity", operation="message.create", input=[]) class TestTeamsWebhook: def test_message_activity_dispatches_to_agent(self) -> None: ch, agent = _make_teams() host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 200 assert agent.runs, "expected the agent to be invoked" # And the channel posted a reply back to the conversation URL. assert ch._http is not None ch._http.post.assert_called() # type: ignore[attr-defined] url, _ = ch._http.post.call_args[0], ch._http.post.call_args[1] # type: ignore[attr-defined] # noqa: F841 assert "/v3/conversations/" in ch._http.post.call_args[0][0] # type: ignore[attr-defined] body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert body["text"] == "hi there" def test_empty_path_mounts_at_app_root(self) -> None: ch, agent = _make_teams(path="") host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/", json=_VALID_ACTIVITY) assert r.status_code == 200 assert agent.runs, "expected the agent to be invoked" def test_response_hook_can_rewrite_originating_reply(self) -> None: seen_kwargs: list[dict[str, Any]] = [] def hook(result: HostedRunResult, **kwargs: Any) -> HostedRunResult: seen_kwargs.append(dict(kwargs)) return HostedRunResult(_FakeAgentResponse(text=result.result.text.upper()), session=result.session) ch, agent = _make_teams() ch.response_hook = hook host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 200 assert ch._http is not None body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert body["text"] == "HI THERE" assert seen_kwargs assert seen_kwargs[0]["channel_name"] == "activity" def test_non_message_activities_are_acked(self) -> None: ch, agent = _make_teams() host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post( "/activity/messages", json={"type": "conversationUpdate", "conversation": {"id": "x"}}, ) assert r.status_code == 202 assert not agent.runs def test_invalid_json_returns_400(self) -> None: ch, agent = _make_teams() host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post( "/activity/messages", content=b"not-json", headers={"content-type": "application/json"}, ) assert r.status_code == 400 assert not agent.runs def test_message_missing_serviceurl_is_dropped(self) -> None: ch, agent = _make_teams() host = AgentFrameworkHost(target=agent, channels=[ch]) bad = dict(_VALID_ACTIVITY) bad.pop("serviceUrl") with TestClient(host.app) as client: r = client.post("/activity/messages", json=bad) # No serviceUrl → fails the allow-list check (None doesn't match # any allowed host suffix), surfaced as 400 so a misconfigured # caller knows the activity was structurally invalid. assert r.status_code == 400 assert not agent.runs class TestCommands: def _make_with_commands(self, commands: list[ChannelCommand]) -> tuple[ActivityProtocolChannel, _FakeAgent]: agent = _FakeAgent("hi there") ch = ActivityProtocolChannel(send_typing_action=False, commands=commands) fake_http = MagicMock() response_mock = MagicMock() response_mock.raise_for_status = MagicMock() response_mock.json = MagicMock(return_value={"id": "act-1"}) fake_http.post = AsyncMock(return_value=response_mock) fake_http.put = AsyncMock(return_value=response_mock) fake_http.aclose = AsyncMock() ch._http = fake_http return ch, agent def test_slash_command_bypasses_agent_and_replies(self) -> None: seen: list[ChannelCommandContext] = [] async def handle(ctx: ChannelCommandContext) -> None: seen.append(ctx) await ctx.reply("listed") ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) host = AgentFrameworkHost(target=agent, channels=[ch]) activity = dict(_VALID_ACTIVITY, text="/todos") with TestClient(host.app) as client: r = client.post("/activity/messages", json=activity) assert r.status_code == 200 assert not agent.runs, "command must bypass the agent" assert seen and seen[0].request.operation == "command.invoke" assert seen[0].request.input == "/todos" assert seen[0].request.session is not None assert seen[0].request.session.isolation_key == activity_protocol_isolation_key("19:meeting_xyz@thread.v2") assert ch._http is not None assert ch._http.post.call_args[1]["json"]["text"] == "listed" # type: ignore[attr-defined] def test_command_match_is_case_insensitive(self) -> None: ran = False async def handle(ctx: ChannelCommandContext) -> None: nonlocal ran ran = True ch, agent = self._make_with_commands([ChannelCommand("New", "reset", handle)]) host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/new")) assert r.status_code == 200 assert ran assert not agent.runs def test_unknown_command_falls_through_to_agent(self) -> None: async def handle(ctx: ChannelCommandContext) -> None: # pragma: no cover - never called raise AssertionError("should not run") ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/unknown")) assert r.status_code == 200 assert agent.runs, "unknown /command must reach the agent" def test_command_failure_does_not_retry(self) -> None: async def handle(ctx: ChannelCommandContext) -> None: raise RuntimeError("boom") ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)]) host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos")) # Best-effort: a failing command is swallowed and acked with 200 so Bot # Service does not retry (and re-run a non-idempotent command). assert r.status_code == 200 assert not agent.runs def test_command_request_uses_activity_session(self) -> None: captured: list[str] = [] async def handle(ctx: ChannelCommandContext) -> None: assert ctx.request.session is not None captured.append(ctx.request.session.isolation_key) agent = _FakeAgent("hi") ch = ActivityProtocolChannel(send_typing_action=False, commands=[ChannelCommand("todos", "x", handle)]) fake_http = MagicMock() response_mock = MagicMock() response_mock.raise_for_status = MagicMock() response_mock.json = MagicMock(return_value={"id": "act-1"}) fake_http.post = AsyncMock(return_value=response_mock) fake_http.aclose = AsyncMock() ch._http = fake_http host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos")) assert r.status_code == 200 assert captured == [activity_protocol_isolation_key("19:meeting_xyz@thread.v2")] class TestOutbound: async def test_send_message_posts_to_conversation_url(self) -> None: ch, _agent = _make_teams() await ch._send_message(_VALID_ACTIVITY, "hi") assert ch._http is not None ch._http.post.assert_called() # type: ignore[attr-defined] url = ch._http.post.call_args[0][0] # type: ignore[attr-defined] assert "/v3/conversations/" in url body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert body["text"] == "hi" class TestIdentityRecording: """``_process_activity`` must stamp the inbound conversation reference onto ``ChannelRequest.identity`` so hooks and commands can inspect it.""" async def test_inbound_sets_request_identity(self) -> None: ch, agent = _make_teams() captured: dict[str, Any] = {} async def hook(req: ChannelRequest, **_: Any) -> ChannelRequest: captured["request"] = req return req ch._hook = hook # type: ignore[assignment] host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 200 request = captured["request"] assert request.identity is not None assert request.identity.channel == "activity" assert request.identity.native_id == "19:meeting_xyz@thread.v2" attrs = request.identity.attributes assert attrs["service_url"] == "https://smba.trafficmanager.net/amer/" assert attrs["bot"] == {"id": "bot-1"} assert attrs["user"] == {"id": "user-1"} class TestConfig: def test_rejects_both_secret_and_certificate(self) -> None: with pytest.raises(ValueError, match="not both"): ActivityProtocolChannel( app_id="x", app_password="s", certificate_path="/tmp/does-not-exist.pem", ) def test_dev_mode_no_credential(self) -> None: ch = ActivityProtocolChannel() assert ch._credential is None class TestServiceUrlAllowList: """``serviceUrl`` is supplied by the inbound activity and the channel POSTs a real bearer token to it — anything outside the Bot Framework host suffixes must be rejected so a malicious caller can't redirect outbound replies to an attacker-controlled host.""" def test_default_allows_smba_trafficmanager(self) -> None: ch = ActivityProtocolChannel() assert ch._is_service_url_allowed("https://smba.trafficmanager.net/amer/") assert ch._is_service_url_allowed("https://emea.smba.trafficmanager.net/") assert ch._is_service_url_allowed("https://api.botframework.com/") def test_default_rejects_arbitrary_host(self) -> None: ch = ActivityProtocolChannel() assert not ch._is_service_url_allowed("https://attacker.example.com/") assert not ch._is_service_url_allowed("https://botframework.com.attacker.com/") assert not ch._is_service_url_allowed("") assert not ch._is_service_url_allowed(None) def test_custom_allowlist(self) -> None: ch = ActivityProtocolChannel(service_url_allowed_hosts=("internal.contoso.com",)) assert ch._is_service_url_allowed("https://internal.contoso.com/v3/") assert ch._is_service_url_allowed("https://eu.internal.contoso.com/") assert not ch._is_service_url_allowed("https://smba.trafficmanager.net/") def test_empty_allowlist_disables_check(self) -> None: ch = ActivityProtocolChannel(service_url_allowed_hosts=()) assert ch._is_service_url_allowed("https://anywhere.example.org/") def test_webhook_rejects_disallowed_serviceurl(self) -> None: ch, agent = _make_teams() host = AgentFrameworkHost(target=agent, channels=[ch]) bad = dict(_VALID_ACTIVITY) bad["serviceUrl"] = "https://attacker.example.com/v3/" with TestClient(host.app) as client: r = client.post("/activity/messages", json=bad) assert r.status_code == 400 assert not agent.runs # No outbound POST attempted with a bearer token. assert ch._http is not None ch._http.post.assert_not_called() # type: ignore[attr-defined] class TestInboundAuthValidator: def test_allow_passes_through(self) -> None: async def allow(_req: Any) -> bool: return True ch, agent = _make_teams() ch._inbound_auth_validator = allow host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 200 assert agent.runs def test_reject_returns_401(self) -> None: async def deny(_req: Any) -> bool: return False ch, agent = _make_teams() ch._inbound_auth_validator = deny host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 401 assert not agent.runs def test_validator_raises_returns_401(self) -> None: async def boom(_req: Any) -> bool: raise RuntimeError("validator broke") ch, agent = _make_teams() ch._inbound_auth_validator = boom host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 401 assert not agent.runs class TestOutboundAuthHeader: async def test_no_credential_sends_no_authorization_header(self) -> None: ch, _agent = _make_teams() # Default _make_teams has no credential — dev mode. await ch._send_message(_VALID_ACTIVITY, "hi") assert ch._http is not None headers = ch._http.post.call_args[1]["headers"] # type: ignore[attr-defined] assert "Authorization" not in headers async def test_with_credential_sends_bearer_token(self) -> None: ch, _agent = _make_teams() # Inject a fake credential with a fixed token. token_obj = MagicMock() token_obj.token = "tok-abc123" cred = MagicMock() cred.get_token = AsyncMock(return_value=token_obj) ch._credential = cred # type: ignore[assignment] await ch._send_message(_VALID_ACTIVITY, "hi") assert ch._http is not None headers = ch._http.post.call_args[1]["headers"] # type: ignore[attr-defined] assert headers.get("Authorization") == "Bearer tok-abc123" class TestRetrySignal: """Distinguish transient outbound failures (network / 5xx) — which must surface 502 so Bot Service retries — from deterministic agent failures (which must return 200 to avoid retry loops).""" def test_outbound_http_error_returns_502(self) -> None: import httpx as _httpx ch, agent = _make_teams() # Make _send_message raise a transient httpx error. assert ch._http is not None ch._http.post = AsyncMock(side_effect=_httpx.ConnectError("nope")) # type: ignore[attr-defined] host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) assert r.status_code == 502 def test_deterministic_agent_failure_returns_200(self) -> None: ch, agent = _make_teams() def boom(messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any: async def _coro() -> Any: raise ValueError("agent crashed") return _coro() agent.run = boom # type: ignore[assignment] host = AgentFrameworkHost(target=agent, channels=[ch]) with TestClient(host.app) as client: r = client.post("/activity/messages", json=_VALID_ACTIVITY) # Deterministic failure → 200 (Bot Service does not retry the same # broken activity in a loop). assert r.status_code == 200 class TestStreaming: async def test_stream_sends_placeholder_and_edits(self) -> None: ch, _agent = _make_teams(stream=True) # Build a fake stream that emits two text chunks then finalizes. @dataclass class _Up: text: str class _Stream: def __init__(self) -> None: self._chunks = ["hel", "lo"] def __aiter__(self) -> Any: async def gen() -> Any: for c in self._chunks: yield _Up(c) return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="hello") # Use a tight throttle so the test doesn't sit on `wait_for`. ch._stream_edit_min_interval = 0.0 await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type] assert ch._http is not None # Placeholder POST + at least one final PUT. ch._http.post.assert_called() # type: ignore[attr-defined] ch._http.put.assert_called() # type: ignore[attr-defined] # Final edit body carries the full accumulated text. last_put_body = ch._http.put.call_args[1]["json"] # type: ignore[attr-defined] assert last_put_body["text"] == "hello" async def test_stream_placeholder_failure_falls_back_to_single_post(self) -> None: # The bug: when send_initial_placeholder fails, activity_id stays # None, the edit_worker can never reach its exit condition # (`accumulated == last_sent` while no PUT possible) and the # whole conversation deadlocks. After the fix we fall back to # buffering the stream and POSTing a single final activity. ch, _agent = _make_teams(stream=True) # Make the FIRST POST (placeholder) raise; subsequent POST (final # fallback) succeeds. import httpx as _httpx ok_response = MagicMock() ok_response.raise_for_status = MagicMock() ok_response.json = MagicMock(return_value={"id": "act-final"}) ok_response.content = b"{}" post_mock = AsyncMock(side_effect=[_httpx.HTTPError("boom"), ok_response]) assert ch._http is not None ch._http.post = post_mock # type: ignore[attr-defined] @dataclass class _Up: text: str class _Stream: def __aiter__(self) -> Any: async def gen() -> Any: yield _Up("partial-1") yield _Up("-partial-2") return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="partial-1-partial-2") ch._stream_edit_min_interval = 0.0 # Should NOT hang. Use asyncio.wait_for with a small timeout to # guard the test against future regressions of the deadlock. import asyncio as _asyncio await _asyncio.wait_for( ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()), # type: ignore[arg-type] timeout=2.0, ) # Two POSTs total: placeholder (failed) + fallback final. assert post_mock.await_count == 2 # Fallback POST contains the full accumulated text. fallback_body = post_mock.call_args[1]["json"] assert fallback_body["text"] == "partial-1-partial-2" async def test_stream_with_no_text_replaces_placeholder(self) -> None: ch, _agent = _make_teams(stream=True) class _EmptyStream: def __aiter__(self) -> Any: async def gen() -> Any: if False: yield None # type: ignore[unreachable] return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="") ch._stream_edit_min_interval = 0.0 await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type] # The placeholder PUT-replaces with "(no response)" so the user # isn't left staring at "…". assert ch._http is not None last_put_body = ch._http.put.call_args[1]["json"] # type: ignore[attr-defined] assert last_put_body["text"] == "(no response)" async def test_non_edit_channel_buffers_and_posts_single_message(self) -> None: # Web Chat (and every non-Teams channel) does not support # PUT /activities/{id}; the channel must buffer the stream and POST # a single final message rather than the placeholder+edit dance. ch, _agent = _make_teams(stream=True) webchat_activity = {**_VALID_ACTIVITY, "channelId": "webchat"} @dataclass class _Up: text: str class _Stream: def __aiter__(self) -> Any: async def gen() -> Any: yield _Up("hel") yield _Up("lo") return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="hello") ch._stream_edit_min_interval = 0.0 await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _Stream()) # type: ignore[arg-type] assert ch._http is not None # No PUT (no editing); exactly one POST with the full text. ch._http.put.assert_not_called() # type: ignore[attr-defined] assert ch._http.post.await_count == 1 # type: ignore[attr-defined] body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert body["text"] == "hello" async def test_non_edit_channel_empty_stream_posts_no_response(self) -> None: ch, _agent = _make_teams(stream=True) webchat_activity = {**_VALID_ACTIVITY, "channelId": "directline"} class _EmptyStream: def __aiter__(self) -> Any: async def gen() -> Any: if False: yield None # type: ignore[unreachable] return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="") ch._stream_edit_min_interval = 0.0 await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type] assert ch._http is not None ch._http.put.assert_not_called() # type: ignore[attr-defined] body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert body["text"] == "(no response)" async def test_edit_405_falls_back_to_single_post(self) -> None: # Defensive: a channel advertised as edit-capable that nonetheless # rejects the PUT with 405 must stop editing and POST the final # text as a fresh message instead of silently leaving "…". import httpx as _httpx ch, _agent = _make_teams(stream=True) assert ch._http is not None request_405 = _httpx.Request("PUT", "https://smba.trafficmanager.net/amer/v3/x") response_405 = _httpx.Response(405, request=request_405) ch._http.put = AsyncMock( # type: ignore[attr-defined] side_effect=_httpx.HTTPStatusError("405", request=request_405, response=response_405) ) @dataclass class _Up: text: str class _Stream: def __aiter__(self) -> Any: async def gen() -> Any: yield _Up("hel") yield _Up("lo") return gen() async def get_final_response(self) -> Any: return _FakeAgentResponse(text="hello") ch._stream_edit_min_interval = 0.0 await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type] # Placeholder POST + fallback final POST = 2 POSTs; the final one # carries the full text. assert ch._http.post.await_count == 2 # type: ignore[attr-defined] final_body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined] assert final_body["text"] == "hello"