mirror of
https://github.com/microsoft/agent-framework.git
synced 2026-06-16 21:04:09 +08:00
36ce0950e4
Remove linking, multicast, durable delivery, and host push machinery from the v1 hosting core. Keep those scenarios in a proposed follow-up ADR and update channel packages, samples, docs, tests, and workspace metadata around the smaller host/channel contract. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
776 lines
31 KiB
Python
776 lines
31 KiB
Python
# Copyright (c) Microsoft. All rights reserved.
|
|
|
|
"""Unit tests for :mod:`agent_framework_hosting_activity_protocol`.
|
|
|
|
The Bot Framework outbound calls and azure-identity credentials are mocked
|
|
out so the suite never touches the network. Live token acquisition,
|
|
streaming edits and certificate paths are out of scope here.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from typing import Any
|
|
from unittest.mock import AsyncMock, MagicMock
|
|
|
|
import pytest
|
|
from agent_framework_hosting import (
|
|
AgentFrameworkHost,
|
|
ChannelCommand,
|
|
ChannelCommandContext,
|
|
ChannelRequest,
|
|
HostedRunResult,
|
|
)
|
|
from starlette.testclient import TestClient
|
|
|
|
from agent_framework_hosting_activity_protocol import ActivityProtocolChannel, activity_protocol_isolation_key
|
|
from agent_framework_hosting_activity_protocol._channel import _command_text, _parse_activity
|
|
|
|
|
|
def test_activity_protocol_isolation_key_format() -> None:
|
|
assert activity_protocol_isolation_key("19:meeting_xyz@thread.v2") == "activity:19:meeting_xyz@thread.v2"
|
|
assert activity_protocol_isolation_key(123) == "activity:123"
|
|
|
|
|
|
class TestParseActivity:
|
|
def test_text_only(self) -> None:
|
|
msg = _parse_activity({"type": "message", "text": "hello"})
|
|
assert msg.role == "user"
|
|
assert msg.text == "hello"
|
|
|
|
def test_with_attachment(self) -> None:
|
|
msg = _parse_activity({
|
|
"type": "message",
|
|
"text": "see this",
|
|
"attachments": [
|
|
{"contentType": "image/png", "contentUrl": "https://example.com/x.png"},
|
|
],
|
|
})
|
|
assert msg.text == "see this"
|
|
assert any((getattr(c, "uri", None) or "").endswith("/x.png") for c in msg.contents)
|
|
|
|
def test_skips_invalid_attachments(self) -> None:
|
|
msg = _parse_activity({
|
|
"type": "message",
|
|
"text": "hi",
|
|
"attachments": [
|
|
"not-a-mapping",
|
|
{"contentType": "image/png"}, # no url
|
|
{"contentUrl": "https://example.com/y", "contentType": "no-slash"},
|
|
],
|
|
})
|
|
assert msg.text == "hi"
|
|
# No URI content survived.
|
|
assert not any(getattr(c, "uri", None) for c in msg.contents)
|
|
|
|
def test_skips_teams_text_html_inline_content(self) -> None:
|
|
# Teams attaches a text/html rendering whose inline ``content`` is raw
|
|
# HTML (not a URL). It must not be parsed as a URI.
|
|
msg = _parse_activity({
|
|
"type": "message",
|
|
"text": "hello there",
|
|
"attachments": [
|
|
{"contentType": "text/html", "content": "<p>hello there</p>"},
|
|
],
|
|
})
|
|
assert msg.text == "hello there"
|
|
assert not any(getattr(c, "uri", None) for c in msg.contents)
|
|
|
|
def test_skips_attachment_contenturl_without_scheme(self) -> None:
|
|
msg = _parse_activity({
|
|
"type": "message",
|
|
"text": "hi",
|
|
"attachments": [
|
|
{"contentType": "image/png", "contentUrl": "/relative/path.png"},
|
|
],
|
|
})
|
|
assert msg.text == "hi"
|
|
assert not any(getattr(c, "uri", None) for c in msg.contents)
|
|
|
|
|
|
class TestCommandText:
|
|
def test_plain_text_unchanged(self) -> None:
|
|
assert _command_text({"text": "/help"}) == "/help"
|
|
|
|
def test_non_string_text_returns_empty(self) -> None:
|
|
assert _command_text({"text": None}) == ""
|
|
assert _command_text({}) == ""
|
|
|
|
def test_strips_bot_mention(self) -> None:
|
|
activity = {
|
|
"text": "<at>Personal Assistant</at> /todos",
|
|
"recipient": {"id": "bot-1"},
|
|
"entities": [
|
|
{"type": "mention", "text": "<at>Personal Assistant</at>", "mentioned": {"id": "bot-1"}},
|
|
],
|
|
}
|
|
assert _command_text(activity) == "/todos"
|
|
|
|
def test_strips_bot_mention_without_space(self) -> None:
|
|
activity = {
|
|
"text": "<at>Bot</at>/help",
|
|
"recipient": {"id": "bot-1"},
|
|
"entities": [{"type": "mention", "text": "<at>Bot</at>", "mentioned": {"id": "bot-1"}}],
|
|
}
|
|
assert _command_text(activity) == "/help"
|
|
|
|
def test_keeps_other_user_mention(self) -> None:
|
|
activity = {
|
|
"text": "/whoami <at>Someone</at>",
|
|
"recipient": {"id": "bot-1"},
|
|
"entities": [{"type": "mention", "text": "<at>Someone</at>", "mentioned": {"id": "user-9"}}],
|
|
}
|
|
# Another user's mention must not be stripped.
|
|
assert _command_text(activity) == "/whoami <at>Someone</at>"
|
|
|
|
def test_malformed_entities_are_ignored(self) -> None:
|
|
activity = {
|
|
"text": "/help",
|
|
"recipient": {"id": "bot-1"},
|
|
"entities": ["not-a-mapping", {"type": "clientInfo"}, {"type": "mention"}],
|
|
}
|
|
assert _command_text(activity) == "/help"
|
|
|
|
|
|
@dataclass
|
|
class _FakeAgentResponse:
|
|
text: str
|
|
|
|
|
|
@dataclass
|
|
class _FakeUpdate:
|
|
text: str
|
|
|
|
|
|
class _FakeStream:
|
|
def __init__(self, chunks: list[str]) -> None:
|
|
self._chunks = chunks
|
|
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
for chunk in self._chunks:
|
|
yield _FakeUpdate(chunk)
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> _FakeAgentResponse:
|
|
return _FakeAgentResponse(text="".join(self._chunks))
|
|
|
|
|
|
class _FakeAgent:
|
|
def __init__(self, reply: str = "ok") -> None:
|
|
self._reply = reply
|
|
self.runs: list[Any] = []
|
|
|
|
def create_session(self, *, session_id: str | None = None) -> Any:
|
|
return {"session_id": session_id}
|
|
|
|
def run(self, messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any:
|
|
self.runs.append({"messages": messages, "stream": stream, "kwargs": kwargs})
|
|
if stream:
|
|
return _FakeStream([self._reply])
|
|
|
|
async def _coro() -> _FakeAgentResponse:
|
|
return _FakeAgentResponse(text=self._reply)
|
|
|
|
return _coro()
|
|
|
|
|
|
def _make_teams(
|
|
stream: bool = False, *, path: str = "/activity/messages"
|
|
) -> tuple[ActivityProtocolChannel, _FakeAgent]:
|
|
agent = _FakeAgent("hi there")
|
|
ch = ActivityProtocolChannel(path=path, stream=stream, send_typing_action=False)
|
|
fake_http = MagicMock()
|
|
response_mock = MagicMock()
|
|
response_mock.raise_for_status = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"id": "act-1"})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.put = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
return ch, agent
|
|
|
|
|
|
_VALID_ACTIVITY: dict[str, Any] = {
|
|
"type": "message",
|
|
"id": "in-1",
|
|
"text": "hello bot",
|
|
"conversation": {"id": "19:meeting_xyz@thread.v2"},
|
|
"from": {"id": "user-1"},
|
|
"recipient": {"id": "bot-1"},
|
|
"channelId": "msteams",
|
|
"serviceUrl": "https://smba.trafficmanager.net/amer/",
|
|
}
|
|
|
|
# Minimal request envelope for direct ``_stream_to_conversation`` calls.
|
|
_VALID_REQUEST = ChannelRequest(channel="activity", operation="message.create", input=[])
|
|
|
|
|
|
class TestTeamsWebhook:
|
|
def test_message_activity_dispatches_to_agent(self) -> None:
|
|
ch, agent = _make_teams()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 200
|
|
assert agent.runs, "expected the agent to be invoked"
|
|
# And the channel posted a reply back to the conversation URL.
|
|
assert ch._http is not None
|
|
ch._http.post.assert_called() # type: ignore[attr-defined]
|
|
url, _ = ch._http.post.call_args[0], ch._http.post.call_args[1] # type: ignore[attr-defined] # noqa: F841
|
|
assert "/v3/conversations/" in ch._http.post.call_args[0][0] # type: ignore[attr-defined]
|
|
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert body["text"] == "hi there"
|
|
|
|
def test_empty_path_mounts_at_app_root(self) -> None:
|
|
ch, agent = _make_teams(path="")
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 200
|
|
assert agent.runs, "expected the agent to be invoked"
|
|
|
|
def test_response_hook_can_rewrite_originating_reply(self) -> None:
|
|
seen_kwargs: list[dict[str, Any]] = []
|
|
|
|
def hook(result: HostedRunResult, **kwargs: Any) -> HostedRunResult:
|
|
seen_kwargs.append(dict(kwargs))
|
|
return HostedRunResult(_FakeAgentResponse(text=result.result.text.upper()), session=result.session)
|
|
|
|
ch, agent = _make_teams()
|
|
ch.response_hook = hook
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
|
|
assert r.status_code == 200
|
|
assert ch._http is not None
|
|
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert body["text"] == "HI THERE"
|
|
assert seen_kwargs
|
|
assert seen_kwargs[0]["channel_name"] == "activity"
|
|
|
|
def test_non_message_activities_are_acked(self) -> None:
|
|
ch, agent = _make_teams()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/activity/messages",
|
|
json={"type": "conversationUpdate", "conversation": {"id": "x"}},
|
|
)
|
|
assert r.status_code == 202
|
|
assert not agent.runs
|
|
|
|
def test_invalid_json_returns_400(self) -> None:
|
|
ch, agent = _make_teams()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post(
|
|
"/activity/messages",
|
|
content=b"not-json",
|
|
headers={"content-type": "application/json"},
|
|
)
|
|
assert r.status_code == 400
|
|
assert not agent.runs
|
|
|
|
def test_message_missing_serviceurl_is_dropped(self) -> None:
|
|
ch, agent = _make_teams()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
bad = dict(_VALID_ACTIVITY)
|
|
bad.pop("serviceUrl")
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=bad)
|
|
# No serviceUrl → fails the allow-list check (None doesn't match
|
|
# any allowed host suffix), surfaced as 400 so a misconfigured
|
|
# caller knows the activity was structurally invalid.
|
|
assert r.status_code == 400
|
|
assert not agent.runs
|
|
|
|
|
|
class TestCommands:
|
|
def _make_with_commands(self, commands: list[ChannelCommand]) -> tuple[ActivityProtocolChannel, _FakeAgent]:
|
|
agent = _FakeAgent("hi there")
|
|
ch = ActivityProtocolChannel(send_typing_action=False, commands=commands)
|
|
fake_http = MagicMock()
|
|
response_mock = MagicMock()
|
|
response_mock.raise_for_status = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"id": "act-1"})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.put = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
return ch, agent
|
|
|
|
def test_slash_command_bypasses_agent_and_replies(self) -> None:
|
|
seen: list[ChannelCommandContext] = []
|
|
|
|
async def handle(ctx: ChannelCommandContext) -> None:
|
|
seen.append(ctx)
|
|
await ctx.reply("listed")
|
|
|
|
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
activity = dict(_VALID_ACTIVITY, text="/todos")
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=activity)
|
|
assert r.status_code == 200
|
|
assert not agent.runs, "command must bypass the agent"
|
|
assert seen and seen[0].request.operation == "command.invoke"
|
|
assert seen[0].request.input == "/todos"
|
|
assert seen[0].request.session is not None
|
|
assert seen[0].request.session.isolation_key == activity_protocol_isolation_key("19:meeting_xyz@thread.v2")
|
|
assert ch._http is not None
|
|
assert ch._http.post.call_args[1]["json"]["text"] == "listed" # type: ignore[attr-defined]
|
|
|
|
def test_command_match_is_case_insensitive(self) -> None:
|
|
ran = False
|
|
|
|
async def handle(ctx: ChannelCommandContext) -> None:
|
|
nonlocal ran
|
|
ran = True
|
|
|
|
ch, agent = self._make_with_commands([ChannelCommand("New", "reset", handle)])
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/new"))
|
|
assert r.status_code == 200
|
|
assert ran
|
|
assert not agent.runs
|
|
|
|
def test_unknown_command_falls_through_to_agent(self) -> None:
|
|
async def handle(ctx: ChannelCommandContext) -> None: # pragma: no cover - never called
|
|
raise AssertionError("should not run")
|
|
|
|
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/unknown"))
|
|
assert r.status_code == 200
|
|
assert agent.runs, "unknown /command must reach the agent"
|
|
|
|
def test_command_failure_does_not_retry(self) -> None:
|
|
async def handle(ctx: ChannelCommandContext) -> None:
|
|
raise RuntimeError("boom")
|
|
|
|
ch, agent = self._make_with_commands([ChannelCommand("todos", "List", handle)])
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos"))
|
|
# Best-effort: a failing command is swallowed and acked with 200 so Bot
|
|
# Service does not retry (and re-run a non-idempotent command).
|
|
assert r.status_code == 200
|
|
assert not agent.runs
|
|
|
|
def test_command_request_uses_activity_session(self) -> None:
|
|
captured: list[str] = []
|
|
|
|
async def handle(ctx: ChannelCommandContext) -> None:
|
|
assert ctx.request.session is not None
|
|
captured.append(ctx.request.session.isolation_key)
|
|
|
|
agent = _FakeAgent("hi")
|
|
ch = ActivityProtocolChannel(send_typing_action=False, commands=[ChannelCommand("todos", "x", handle)])
|
|
fake_http = MagicMock()
|
|
response_mock = MagicMock()
|
|
response_mock.raise_for_status = MagicMock()
|
|
response_mock.json = MagicMock(return_value={"id": "act-1"})
|
|
fake_http.post = AsyncMock(return_value=response_mock)
|
|
fake_http.aclose = AsyncMock()
|
|
ch._http = fake_http
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=dict(_VALID_ACTIVITY, text="/todos"))
|
|
assert r.status_code == 200
|
|
assert captured == [activity_protocol_isolation_key("19:meeting_xyz@thread.v2")]
|
|
|
|
|
|
class TestOutbound:
|
|
async def test_send_message_posts_to_conversation_url(self) -> None:
|
|
ch, _agent = _make_teams()
|
|
await ch._send_message(_VALID_ACTIVITY, "hi")
|
|
assert ch._http is not None
|
|
ch._http.post.assert_called() # type: ignore[attr-defined]
|
|
url = ch._http.post.call_args[0][0] # type: ignore[attr-defined]
|
|
assert "/v3/conversations/" in url
|
|
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert body["text"] == "hi"
|
|
|
|
|
|
class TestIdentityRecording:
|
|
"""``_process_activity`` must stamp the inbound conversation reference
|
|
onto ``ChannelRequest.identity`` so hooks and commands can inspect it."""
|
|
|
|
async def test_inbound_sets_request_identity(self) -> None:
|
|
ch, agent = _make_teams()
|
|
captured: dict[str, Any] = {}
|
|
|
|
async def hook(req: ChannelRequest, **_: Any) -> ChannelRequest:
|
|
captured["request"] = req
|
|
return req
|
|
|
|
ch._hook = hook # type: ignore[assignment]
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 200
|
|
request = captured["request"]
|
|
assert request.identity is not None
|
|
assert request.identity.channel == "activity"
|
|
assert request.identity.native_id == "19:meeting_xyz@thread.v2"
|
|
attrs = request.identity.attributes
|
|
assert attrs["service_url"] == "https://smba.trafficmanager.net/amer/"
|
|
assert attrs["bot"] == {"id": "bot-1"}
|
|
assert attrs["user"] == {"id": "user-1"}
|
|
|
|
|
|
class TestConfig:
|
|
def test_rejects_both_secret_and_certificate(self) -> None:
|
|
with pytest.raises(ValueError, match="not both"):
|
|
ActivityProtocolChannel(
|
|
app_id="x",
|
|
app_password="s",
|
|
certificate_path="/tmp/does-not-exist.pem",
|
|
)
|
|
|
|
def test_dev_mode_no_credential(self) -> None:
|
|
ch = ActivityProtocolChannel()
|
|
assert ch._credential is None
|
|
|
|
|
|
class TestServiceUrlAllowList:
|
|
"""``serviceUrl`` is supplied by the inbound activity and the channel
|
|
POSTs a real bearer token to it — anything outside the Bot Framework
|
|
host suffixes must be rejected so a malicious caller can't redirect
|
|
outbound replies to an attacker-controlled host."""
|
|
|
|
def test_default_allows_smba_trafficmanager(self) -> None:
|
|
ch = ActivityProtocolChannel()
|
|
assert ch._is_service_url_allowed("https://smba.trafficmanager.net/amer/")
|
|
assert ch._is_service_url_allowed("https://emea.smba.trafficmanager.net/")
|
|
assert ch._is_service_url_allowed("https://api.botframework.com/")
|
|
|
|
def test_default_rejects_arbitrary_host(self) -> None:
|
|
ch = ActivityProtocolChannel()
|
|
assert not ch._is_service_url_allowed("https://attacker.example.com/")
|
|
assert not ch._is_service_url_allowed("https://botframework.com.attacker.com/")
|
|
assert not ch._is_service_url_allowed("")
|
|
assert not ch._is_service_url_allowed(None)
|
|
|
|
def test_custom_allowlist(self) -> None:
|
|
ch = ActivityProtocolChannel(service_url_allowed_hosts=("internal.contoso.com",))
|
|
assert ch._is_service_url_allowed("https://internal.contoso.com/v3/")
|
|
assert ch._is_service_url_allowed("https://eu.internal.contoso.com/")
|
|
assert not ch._is_service_url_allowed("https://smba.trafficmanager.net/")
|
|
|
|
def test_empty_allowlist_disables_check(self) -> None:
|
|
ch = ActivityProtocolChannel(service_url_allowed_hosts=())
|
|
assert ch._is_service_url_allowed("https://anywhere.example.org/")
|
|
|
|
def test_webhook_rejects_disallowed_serviceurl(self) -> None:
|
|
ch, agent = _make_teams()
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
bad = dict(_VALID_ACTIVITY)
|
|
bad["serviceUrl"] = "https://attacker.example.com/v3/"
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=bad)
|
|
assert r.status_code == 400
|
|
assert not agent.runs
|
|
# No outbound POST attempted with a bearer token.
|
|
assert ch._http is not None
|
|
ch._http.post.assert_not_called() # type: ignore[attr-defined]
|
|
|
|
|
|
class TestInboundAuthValidator:
|
|
def test_allow_passes_through(self) -> None:
|
|
async def allow(_req: Any) -> bool:
|
|
return True
|
|
|
|
ch, agent = _make_teams()
|
|
ch._inbound_auth_validator = allow
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 200
|
|
assert agent.runs
|
|
|
|
def test_reject_returns_401(self) -> None:
|
|
async def deny(_req: Any) -> bool:
|
|
return False
|
|
|
|
ch, agent = _make_teams()
|
|
ch._inbound_auth_validator = deny
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 401
|
|
assert not agent.runs
|
|
|
|
def test_validator_raises_returns_401(self) -> None:
|
|
async def boom(_req: Any) -> bool:
|
|
raise RuntimeError("validator broke")
|
|
|
|
ch, agent = _make_teams()
|
|
ch._inbound_auth_validator = boom
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 401
|
|
assert not agent.runs
|
|
|
|
|
|
class TestOutboundAuthHeader:
|
|
async def test_no_credential_sends_no_authorization_header(self) -> None:
|
|
ch, _agent = _make_teams()
|
|
# Default _make_teams has no credential — dev mode.
|
|
await ch._send_message(_VALID_ACTIVITY, "hi")
|
|
assert ch._http is not None
|
|
headers = ch._http.post.call_args[1]["headers"] # type: ignore[attr-defined]
|
|
assert "Authorization" not in headers
|
|
|
|
async def test_with_credential_sends_bearer_token(self) -> None:
|
|
ch, _agent = _make_teams()
|
|
# Inject a fake credential with a fixed token.
|
|
token_obj = MagicMock()
|
|
token_obj.token = "tok-abc123"
|
|
cred = MagicMock()
|
|
cred.get_token = AsyncMock(return_value=token_obj)
|
|
ch._credential = cred # type: ignore[assignment]
|
|
await ch._send_message(_VALID_ACTIVITY, "hi")
|
|
assert ch._http is not None
|
|
headers = ch._http.post.call_args[1]["headers"] # type: ignore[attr-defined]
|
|
assert headers.get("Authorization") == "Bearer tok-abc123"
|
|
|
|
|
|
class TestRetrySignal:
|
|
"""Distinguish transient outbound failures (network / 5xx) — which
|
|
must surface 502 so Bot Service retries — from deterministic agent
|
|
failures (which must return 200 to avoid retry loops)."""
|
|
|
|
def test_outbound_http_error_returns_502(self) -> None:
|
|
import httpx as _httpx
|
|
|
|
ch, agent = _make_teams()
|
|
# Make _send_message raise a transient httpx error.
|
|
assert ch._http is not None
|
|
ch._http.post = AsyncMock(side_effect=_httpx.ConnectError("nope")) # type: ignore[attr-defined]
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
assert r.status_code == 502
|
|
|
|
def test_deterministic_agent_failure_returns_200(self) -> None:
|
|
ch, agent = _make_teams()
|
|
|
|
def boom(messages: Any = None, *, stream: bool = False, **kwargs: Any) -> Any:
|
|
async def _coro() -> Any:
|
|
raise ValueError("agent crashed")
|
|
|
|
return _coro()
|
|
|
|
agent.run = boom # type: ignore[assignment]
|
|
host = AgentFrameworkHost(target=agent, channels=[ch])
|
|
with TestClient(host.app) as client:
|
|
r = client.post("/activity/messages", json=_VALID_ACTIVITY)
|
|
# Deterministic failure → 200 (Bot Service does not retry the same
|
|
# broken activity in a loop).
|
|
assert r.status_code == 200
|
|
|
|
|
|
class TestStreaming:
|
|
async def test_stream_sends_placeholder_and_edits(self) -> None:
|
|
ch, _agent = _make_teams(stream=True)
|
|
|
|
# Build a fake stream that emits two text chunks then finalizes.
|
|
@dataclass
|
|
class _Up:
|
|
text: str
|
|
|
|
class _Stream:
|
|
def __init__(self) -> None:
|
|
self._chunks = ["hel", "lo"]
|
|
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
for c in self._chunks:
|
|
yield _Up(c)
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="hello")
|
|
|
|
# Use a tight throttle so the test doesn't sit on `wait_for`.
|
|
ch._stream_edit_min_interval = 0.0
|
|
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
|
|
assert ch._http is not None
|
|
# Placeholder POST + at least one final PUT.
|
|
ch._http.post.assert_called() # type: ignore[attr-defined]
|
|
ch._http.put.assert_called() # type: ignore[attr-defined]
|
|
# Final edit body carries the full accumulated text.
|
|
last_put_body = ch._http.put.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert last_put_body["text"] == "hello"
|
|
|
|
async def test_stream_placeholder_failure_falls_back_to_single_post(self) -> None:
|
|
# The bug: when send_initial_placeholder fails, activity_id stays
|
|
# None, the edit_worker can never reach its exit condition
|
|
# (`accumulated == last_sent` while no PUT possible) and the
|
|
# whole conversation deadlocks. After the fix we fall back to
|
|
# buffering the stream and POSTing a single final activity.
|
|
ch, _agent = _make_teams(stream=True)
|
|
# Make the FIRST POST (placeholder) raise; subsequent POST (final
|
|
# fallback) succeeds.
|
|
import httpx as _httpx
|
|
|
|
ok_response = MagicMock()
|
|
ok_response.raise_for_status = MagicMock()
|
|
ok_response.json = MagicMock(return_value={"id": "act-final"})
|
|
ok_response.content = b"{}"
|
|
post_mock = AsyncMock(side_effect=[_httpx.HTTPError("boom"), ok_response])
|
|
assert ch._http is not None
|
|
ch._http.post = post_mock # type: ignore[attr-defined]
|
|
|
|
@dataclass
|
|
class _Up:
|
|
text: str
|
|
|
|
class _Stream:
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
yield _Up("partial-1")
|
|
yield _Up("-partial-2")
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="partial-1-partial-2")
|
|
|
|
ch._stream_edit_min_interval = 0.0
|
|
# Should NOT hang. Use asyncio.wait_for with a small timeout to
|
|
# guard the test against future regressions of the deadlock.
|
|
import asyncio as _asyncio
|
|
|
|
await _asyncio.wait_for(
|
|
ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()), # type: ignore[arg-type]
|
|
timeout=2.0,
|
|
)
|
|
# Two POSTs total: placeholder (failed) + fallback final.
|
|
assert post_mock.await_count == 2
|
|
# Fallback POST contains the full accumulated text.
|
|
fallback_body = post_mock.call_args[1]["json"]
|
|
assert fallback_body["text"] == "partial-1-partial-2"
|
|
|
|
async def test_stream_with_no_text_replaces_placeholder(self) -> None:
|
|
ch, _agent = _make_teams(stream=True)
|
|
|
|
class _EmptyStream:
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
if False:
|
|
yield None # type: ignore[unreachable]
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="")
|
|
|
|
ch._stream_edit_min_interval = 0.0
|
|
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
|
|
# The placeholder PUT-replaces with "(no response)" so the user
|
|
# isn't left staring at "…".
|
|
assert ch._http is not None
|
|
last_put_body = ch._http.put.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert last_put_body["text"] == "(no response)"
|
|
|
|
async def test_non_edit_channel_buffers_and_posts_single_message(self) -> None:
|
|
# Web Chat (and every non-Teams channel) does not support
|
|
# PUT /activities/{id}; the channel must buffer the stream and POST
|
|
# a single final message rather than the placeholder+edit dance.
|
|
ch, _agent = _make_teams(stream=True)
|
|
webchat_activity = {**_VALID_ACTIVITY, "channelId": "webchat"}
|
|
|
|
@dataclass
|
|
class _Up:
|
|
text: str
|
|
|
|
class _Stream:
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
yield _Up("hel")
|
|
yield _Up("lo")
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="hello")
|
|
|
|
ch._stream_edit_min_interval = 0.0
|
|
await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
|
|
assert ch._http is not None
|
|
# No PUT (no editing); exactly one POST with the full text.
|
|
ch._http.put.assert_not_called() # type: ignore[attr-defined]
|
|
assert ch._http.post.await_count == 1 # type: ignore[attr-defined]
|
|
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert body["text"] == "hello"
|
|
|
|
async def test_non_edit_channel_empty_stream_posts_no_response(self) -> None:
|
|
ch, _agent = _make_teams(stream=True)
|
|
webchat_activity = {**_VALID_ACTIVITY, "channelId": "directline"}
|
|
|
|
class _EmptyStream:
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
if False:
|
|
yield None # type: ignore[unreachable]
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="")
|
|
|
|
ch._stream_edit_min_interval = 0.0
|
|
await ch._stream_to_conversation(webchat_activity, _VALID_REQUEST, _EmptyStream()) # type: ignore[arg-type]
|
|
assert ch._http is not None
|
|
ch._http.put.assert_not_called() # type: ignore[attr-defined]
|
|
body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert body["text"] == "(no response)"
|
|
|
|
async def test_edit_405_falls_back_to_single_post(self) -> None:
|
|
# Defensive: a channel advertised as edit-capable that nonetheless
|
|
# rejects the PUT with 405 must stop editing and POST the final
|
|
# text as a fresh message instead of silently leaving "…".
|
|
import httpx as _httpx
|
|
|
|
ch, _agent = _make_teams(stream=True)
|
|
assert ch._http is not None
|
|
|
|
request_405 = _httpx.Request("PUT", "https://smba.trafficmanager.net/amer/v3/x")
|
|
response_405 = _httpx.Response(405, request=request_405)
|
|
ch._http.put = AsyncMock( # type: ignore[attr-defined]
|
|
side_effect=_httpx.HTTPStatusError("405", request=request_405, response=response_405)
|
|
)
|
|
|
|
@dataclass
|
|
class _Up:
|
|
text: str
|
|
|
|
class _Stream:
|
|
def __aiter__(self) -> Any:
|
|
async def gen() -> Any:
|
|
yield _Up("hel")
|
|
yield _Up("lo")
|
|
|
|
return gen()
|
|
|
|
async def get_final_response(self) -> Any:
|
|
return _FakeAgentResponse(text="hello")
|
|
|
|
ch._stream_edit_min_interval = 0.0
|
|
await ch._stream_to_conversation(_VALID_ACTIVITY, _VALID_REQUEST, _Stream()) # type: ignore[arg-type]
|
|
# Placeholder POST + fallback final POST = 2 POSTs; the final one
|
|
# carries the full text.
|
|
assert ch._http.post.await_count == 2 # type: ignore[attr-defined]
|
|
final_body = ch._http.post.call_args[1]["json"] # type: ignore[attr-defined]
|
|
assert final_body["text"] == "hello"
|