Python: feat(python): Add MCP client OTel spans per GenAI semantic conventions (#6349)

* feat(python): Add MCP client OTel spans per GenAI semantic conventions

Implement MCP client spans per the OTel GenAI Semantic Conventions for MCP
(https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#client).

Operations instrumented:
- initialize: CLIENT span capturing MCP session setup
- tools/list: CLIENT span for tool listing (per-page)
- prompts/list: CLIENT span for prompt listing (per-page)
- tools/call: CLIENT span (nested under execute_tool when called via FunctionTool)
- prompts/get: CLIENT span

Span attributes follow the MCP semantic conventions:
- Required: mcp.method.name
- Conditional: error.type, gen_ai.tool.name, gen_ai.prompt.name
- Recommended: gen_ai.operation.name, mcp.protocol.version, mcp.session.id,
  network.transport, server.address, server.port

Transport-specific attributes per subclass:
- MCPStdioTool: network.transport=pipe
- MCPStreamableHTTPTool: network.transport=tcp, network.protocol.name=http
- MCPWebsocketTool: network.transport=tcp, network.protocol.name=websocket

All span creation gated behind OBSERVABILITY_SETTINGS.ENABLED.

Closes #3624
Closes #4697

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* refactor: simplify MCP spans — remove enrichment logic and protocol version caching

- Always create nested CLIENT spans for tools/call instead of enriching
  the parent execute_tool span
- Remove _ACTIVE_TOOL_EXECUTION_SPAN contextvar (no longer needed)
- Remove enrich_span_with_mcp_attributes() helper
- Remove _otel_error_type preservation in FunctionTool.invoke()
- Remove _mcp_protocol_version instance variable; protocol version is
  only set on the initialize span where it is available

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Refine copilot solution

* fix: enable automatic exception recording on MCP spans

Remove record_exception=False and set_status_on_exception=False from
create_mcp_client_span. Let OTel handle exception recording and status
setting automatically. The manual set_mcp_span_error calls for tools/call
still correctly set error.type (which OTel's automatic handling doesn't
touch), so tool_error is preserved.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Reduce number of lines

* Add comment to sample

* test: address PR review comments on MCP observability tests

- Fix initialize test to call mocked session.initialize() and read
  protocolVersion from the result instead of hardcoding it
- Add tools/call McpError error-path test
- Add prompts/get McpError error-path test

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>

* Fix export error

---------

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
Tao Chen
2026-06-05 12:23:01 -07:00
committed by GitHub
Unverified
parent 6bd2cfec03
commit dcc218dbac
4 changed files with 575 additions and 40 deletions
+132 -39
View File
@@ -19,6 +19,7 @@ from functools import partial
from typing import TYPE_CHECKING, Any, Literal, TypedDict, cast
from opentelemetry import propagate
from opentelemetry import trace as otel_trace
from ._feature_stage import ExperimentalFeature, experimental
from ._tools import FunctionTool
@@ -28,6 +29,11 @@ from ._types import (
Message,
)
from .exceptions import ToolException, ToolExecutionException
from .observability import (
OtelAttr,
create_mcp_client_span,
set_mcp_span_error,
)
if sys.version_info >= (3, 11):
from typing import Self # pragma: no cover
@@ -362,6 +368,13 @@ class MCPTool:
def __str__(self) -> str:
return f"MCPTool(name={self.name}, description={self.description})"
def _mcp_base_span_attributes(self) -> dict[str, Any]:
"""Return base MCP span attributes shared across all operations.
Subclasses override to add transport-specific attributes (server address, port, etc.).
"""
return {}
def _parse_prompt_result_from_mcp(
self,
mcp_type: types.GetPromptResult,
@@ -872,8 +885,10 @@ class MCPTool:
inner_exception=ex if isinstance(ex, Exception) else None,
) from ex
try:
initialize_result = await session.initialize()
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
with create_mcp_client_span("initialize", attributes=self._mcp_base_span_attributes()) as init_span:
initialize_result = await session.initialize()
init_span.set_attribute(OtelAttr.MCP_PROTOCOL_VERSION, initialize_result.protocolVersion)
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
except (Exception, asyncio.CancelledError) as ex:
if await self._close_and_check_cancelled(ex):
raise
@@ -891,8 +906,10 @@ class MCPTool:
self.session = session
elif self.session._request_id == 0: # type: ignore[attr-defined]
# If the session is not initialized, we need to reinitialize it
initialize_result = await self.session.initialize()
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
with create_mcp_client_span("initialize", attributes=self._mcp_base_span_attributes()) as init_span:
initialize_result = await self.session.initialize()
init_span.set_attribute(OtelAttr.MCP_PROTOCOL_VERSION, initialize_result.protocolVersion)
self._set_server_capabilities(getattr(initialize_result, "capabilities", None))
elif self._server_capabilities is None:
self._set_server_capabilities(getattr(self.session, "_server_capabilities", None))
logger.debug("Connected to MCP server: %s", self.session)
@@ -1136,7 +1153,8 @@ class MCPTool:
"Skipping MCP prompt loading because the server did not advertise prompts support."
)
return
prompt_list = await self.session.list_prompts(params=params) # type: ignore[union-attr]
with create_mcp_client_span("prompts/list", attributes=self._mcp_base_span_attributes()):
prompt_list = await self.session.list_prompts(params=params) # type: ignore[union-attr]
break
except ClosedResourceError as cl_ex:
if attempt == 0:
@@ -1222,7 +1240,8 @@ class MCPTool:
if not self._supports_tools:
logger.debug("Skipping MCP tool loading because the server did not advertise tools support.")
return
tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr]
with create_mcp_client_span("tools/list", attributes=self._mcp_base_span_attributes()):
tool_list = await self.session.list_tools(params=params) # type: ignore[union-attr]
break
except ClosedResourceError as cl_ex:
if attempt == 0:
@@ -1422,9 +1441,6 @@ class MCPTool:
ToolExecutionException: If the MCP server is not connected, tools are not loaded,
or the tool call fails.
"""
from anyio import ClosedResourceError
from mcp.shared.exceptions import McpError
if not self.load_tools_flag:
raise ToolExecutionException(
"Tools are not loaded for this server, please set load_tools=True in the constructor."
@@ -1438,7 +1454,28 @@ class MCPTool:
filtered_kwargs, meta = self._prepare_call_kwargs(tool_name, kwargs)
parser = self.parse_tool_results or self._parse_tool_result_from_mcp
# Try the operation, reconnecting once if the connection is closed
# Build MCP span attributes for tools/call
mcp_span_attrs = self._mcp_base_span_attributes()
mcp_span_attrs.update({
OtelAttr.TOOL_NAME: tool_name,
OtelAttr.OPERATION: OtelAttr.TOOL_EXECUTION_OPERATION,
})
with create_mcp_client_span("tools/call", target=tool_name, attributes=mcp_span_attrs) as span: # type: ignore
return await self._call_tool_with_retries(tool_name, filtered_kwargs, meta, parser, span)
async def _call_tool_with_retries(
self,
tool_name: str,
filtered_kwargs: dict[str, Any],
meta: dict[str, Any] | None,
parser: Callable[..., str | list[Content]],
span: otel_trace.Span,
) -> str | list[Content]:
"""Execute the MCP tools/call RPC with retry logic."""
from anyio import ClosedResourceError
from mcp.shared.exceptions import McpError
for attempt in range(2):
try:
result = await self.session.call_tool(tool_name, arguments=filtered_kwargs, meta=meta) # type: ignore
@@ -1449,6 +1486,9 @@ class MCPTool:
if isinstance(parsed, list)
else str(parsed)
)
# Per OTel MCP semconv: set error.type="tool_error" for isError results
if span.is_recording():
set_mcp_span_error(span, "tool_error", text or str(parsed))
raise ToolExecutionException(text or str(parsed))
return parser(result)
except ToolExecutionException:
@@ -1460,6 +1500,8 @@ class MCPTool:
is_connection_lost = isinstance(call_ex, ClosedResourceError) or is_session_terminated
if not is_connection_lost:
error_message = call_ex.error.message if isinstance(call_ex, McpError) else str(call_ex)
if span.is_recording():
set_mcp_span_error(span, type(call_ex).__name__, error_message)
raise ToolExecutionException(error_message, inner_exception=call_ex) from call_ex
if attempt == 0:
@@ -1476,11 +1518,15 @@ class MCPTool:
# Second attempt also failed, give up.
logger.error("MCP connection closed unexpectedly after reconnection: %s", call_ex)
if span.is_recording():
set_mcp_span_error(span, type(call_ex).__name__, str(call_ex))
raise ToolExecutionException(
f"Failed to call tool '{tool_name}' - connection lost.",
inner_exception=call_ex,
) from call_ex
except Exception as ex:
if span.is_recording():
set_mcp_span_error(span, type(ex).__name__, str(ex))
raise ToolExecutionException(f"Failed to call tool '{tool_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to call tool '{tool_name}' after retries.")
@@ -1982,36 +2028,42 @@ class MCPTool:
)
parser = self.parse_prompt_results or self._parse_prompt_result_from_mcp
# Try the operation, reconnecting once if the connection is closed
for attempt in range(2):
try:
prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
mcp_span_attrs = self._mcp_base_span_attributes()
mcp_span_attrs.update({OtelAttr.PROMPT_NAME: prompt_name})
with create_mcp_client_span("prompts/get", target=prompt_name, attributes=mcp_span_attrs) as span:
for attempt in range(2):
try:
prompt_result = await self.session.get_prompt(prompt_name, arguments=kwargs) # type: ignore
return parser(prompt_result)
except ClosedResourceError as cl_ex:
if attempt == 0:
# First attempt failed, try reconnecting
logger.info("MCP connection closed unexpectedly. Reconnecting...")
try:
await self.connect(reset=True)
continue # Retry the operation
except Exception as reconn_ex:
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
set_mcp_span_error(span, type(cl_ex).__name__, str(cl_ex))
raise ToolExecutionException(
"Failed to reconnect to MCP server.",
inner_exception=reconn_ex,
) from reconn_ex
else:
# Second attempt also failed, give up
logger.error(f"MCP connection closed unexpectedly after reconnection: {cl_ex}")
raise ToolExecutionException(
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
error_message = mcp_exc.error.message
raise ToolExecutionException(error_message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")
f"Failed to call prompt '{prompt_name}' - connection lost.",
inner_exception=cl_ex,
) from cl_ex
except McpError as mcp_exc:
error_message = mcp_exc.error.message
set_mcp_span_error(span, type(mcp_exc).__name__, error_message)
raise ToolExecutionException(error_message, inner_exception=mcp_exc) from mcp_exc
except Exception as ex:
set_mcp_span_error(span, type(ex).__name__, str(ex))
raise ToolExecutionException(f"Failed to call prompt '{prompt_name}'.", inner_exception=ex) from ex
raise ToolExecutionException(f"Failed to get prompt '{prompt_name}' after retries.")
async def __aenter__(self) -> Self:
"""Enter the async context manager.
@@ -2171,6 +2223,11 @@ class MCPStdioTool(MCPTool):
self.encoding = encoding
self._client_kwargs = kwargs
def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "pipe"
return attrs
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP stdio client.
@@ -2315,6 +2372,24 @@ class MCPStreamableHTTPTool(MCPTool):
self._httpx_client: AsyncClient | None = http_client
self._header_provider = header_provider
def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "tcp"
attrs[OtelAttr.NETWORK_PROTOCOL_NAME] = "http"
try:
from httpx import URL
parsed = URL(self.url)
if parsed.host:
attrs[OtelAttr.ADDRESS] = parsed.host
port = parsed.port
if port is None:
port = 443 if parsed.scheme == "https" else 80
attrs[OtelAttr.PORT] = port
except Exception:
logger.debug("Failed to parse URL for MCP span transport attributes", exc_info=True)
return attrs
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP streamable HTTP client.
@@ -2482,6 +2557,24 @@ class MCPWebsocketTool(MCPTool):
self.url = url
self._client_kwargs = kwargs
def _mcp_base_span_attributes(self) -> dict[str, Any]:
attrs = super()._mcp_base_span_attributes()
attrs[OtelAttr.NETWORK_TRANSPORT] = "tcp"
attrs[OtelAttr.NETWORK_PROTOCOL_NAME] = "websocket"
try:
from urllib.parse import urlparse
parsed = urlparse(self.url)
if parsed.hostname:
attrs[OtelAttr.ADDRESS] = parsed.hostname
port = parsed.port
if port is None:
port = 443 if parsed.scheme == "wss" else 80
attrs[OtelAttr.PORT] = port
except Exception:
logger.debug("Failed to parse URL for MCP span transport attributes", exc_info=True)
return attrs
def get_mcp_client(self) -> _AsyncGeneratorContextManager[Any, None]:
"""Get an MCP WebSocket client.
@@ -80,6 +80,7 @@ __all__ = [
"EmbeddingTelemetryLayer",
"OtelAttr",
"configure_otel_providers",
"create_mcp_client_span",
"create_metric_views",
"create_resource",
"disable_instrumentation",
@@ -87,6 +88,7 @@ __all__ = [
"enable_sensitive_telemetry",
"get_meter",
"get_tracer",
"set_mcp_span_error",
]
@@ -110,7 +112,6 @@ INNER_ACCUMULATED_USAGE: Final[contextvars.ContextVar[UsageDetails | None]] = co
"inner_accumulated_usage", default=None
)
OTEL_METRICS: Final[str] = "__otel_metrics__"
TOKEN_USAGE_BUCKET_BOUNDARIES: Final[tuple[float, ...]] = (
1,
@@ -292,6 +293,14 @@ class OtelAttr(str, Enum):
AGENT_CREATE_OPERATION = "create_agent"
AGENT_INVOKE_OPERATION = "invoke_agent"
# MCP attributes (https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/)
MCP_METHOD_NAME = "mcp.method.name"
MCP_PROTOCOL_VERSION = "mcp.protocol.version"
MCP_SESSION_ID = "mcp.session.id"
PROMPT_NAME = "gen_ai.prompt.name"
NETWORK_TRANSPORT = "network.transport"
NETWORK_PROTOCOL_NAME = "network.protocol.name"
# Agent Framework specific attributes
MEASUREMENT_FUNCTION_TAG_NAME = "agent_framework.function.name"
MEASUREMENT_FUNCTION_INVOCATION_DURATION = "agent_framework.function.invocation.duration"
@@ -2013,6 +2022,61 @@ def get_function_span(
)
# region MCP span helpers
@contextlib.contextmanager
def create_mcp_client_span(
method_name: str,
target: str | None = None,
attributes: dict[str, Any] | None = None,
) -> Generator[trace.Span, Any, Any]:
"""Create an MCP client span per OTel MCP semantic conventions.
Span name follows the format ``{mcp.method.name} {target}`` when a target
is available, otherwise just ``{mcp.method.name}``.
See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#client
Args:
method_name: The MCP method name (e.g. ``initialize``, ``tools/call``).
target: Optional low-cardinality target (tool name, prompt name).
attributes: Additional span attributes.
"""
span_name = f"{method_name} {target}" if target else method_name
attrs: dict[str, Any] = {OtelAttr.MCP_METHOD_NAME: method_name}
if attributes:
attrs.update(attributes)
tracer = get_tracer() if OBSERVABILITY_SETTINGS.ENABLED else trace.NoOpTracer()
span = tracer.start_span(span_name, kind=trace.SpanKind.CLIENT, attributes=attrs)
with trace.use_span(
span=span,
end_on_exit=True,
record_exception=True,
set_status_on_exception=True,
) as current_span:
yield current_span
def set_mcp_span_error(
span: trace.Span,
error_type: str,
description: str | None = None,
) -> None:
"""Set error status and ``error.type`` on an MCP span.
Args:
span: The span to mark as errored.
error_type: The error type string (e.g. ``tool_error``, exception class name).
description: Optional description (e.g. JSON-RPC error message).
"""
span.set_attribute(OtelAttr.ERROR_TYPE, error_type)
span.set_status(trace.StatusCode.ERROR, description=description)
# endregion
@contextlib.contextmanager
def _activate_span(span: trace.Span) -> Generator[None]:
"""Attach ``span`` as the current span in the OpenTelemetry context.
@@ -0,0 +1,376 @@
# Copyright (c) Microsoft. All rights reserved.
"""Tests for MCP client span instrumentation per OTel GenAI Semantic Conventions.
See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#client
"""
from __future__ import annotations
from typing import Any
from unittest.mock import AsyncMock, Mock
import pytest
from mcp import types
from mcp.shared.exceptions import McpError
from mcp.types import ErrorData
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import SpanKind, StatusCode
from agent_framework import MCPStdioTool, MCPStreamableHTTPTool, MCPWebsocketTool
from agent_framework._mcp import MCPTool
from agent_framework.exceptions import ToolExecutionException
from agent_framework.observability import OtelAttr
# region helpers
def _make_connected_mcp_tool(
name: str = "test-mcp",
*,
supports_tools: bool = True,
supports_prompts: bool = True,
) -> MCPTool:
"""Create an MCPTool with a mocked session, ready for testing."""
tool = MCPTool(name=name)
tool.session = AsyncMock()
tool.is_connected = True
tool._supports_tools = supports_tools
tool._supports_prompts = supports_prompts
tool.load_tools_flag = True
tool.load_prompts_flag = True
return tool
def _make_tool_list_result(
tools: list[dict[str, Any]] | None = None,
) -> Mock:
"""Create a mock ListToolsResult."""
if tools is None:
tools = [{"name": "get-weather", "description": "Get weather", "inputSchema": {"type": "object"}}]
result = Mock()
result.tools = [
types.Tool(name=t["name"], description=t.get("description", ""), inputSchema=t.get("inputSchema", {}))
for t in tools
]
result.nextCursor = None
return result
def _make_prompt_list_result(
prompts: list[dict[str, Any]] | None = None,
) -> Mock:
"""Create a mock ListPromptsResult."""
if prompts is None:
prompts = [{"name": "analyze-code", "description": "Analyze code"}]
result = Mock()
result.prompts = [
types.Prompt(name=p["name"], description=p.get("description", ""), arguments=None) for p in prompts
]
result.nextCursor = None
return result
def _make_call_tool_result(text: str = "result", is_error: bool = False) -> Mock:
"""Create a mock CallToolResult."""
result = Mock()
result.isError = is_error
result.content = [types.TextContent(type="text", text=text)]
return result
def _make_get_prompt_result(text: str = "prompt result") -> types.GetPromptResult:
"""Create a mock GetPromptResult."""
return types.GetPromptResult(
description="test prompt",
messages=[
types.PromptMessage(
role="user",
content=types.TextContent(type="text", text=text),
)
],
)
# endregion
# region initialize span
async def test_mcp_initialize_span(span_exporter: InMemorySpanExporter):
"""session.initialize() should produce an MCP CLIENT span named 'initialize'."""
tool = MCPTool(name="test-server")
mock_session_cls = AsyncMock()
init_result = Mock()
init_result.capabilities = None
init_result.protocolVersion = "2025-06-18"
mock_session_cls.initialize = AsyncMock(return_value=init_result)
# Create a mock transport context manager
mock_transport = AsyncMock()
mock_transport.__aenter__ = AsyncMock(return_value=(Mock(), Mock()))
mock_transport.__aexit__ = AsyncMock(return_value=False)
# Mock get_mcp_client and the session creation
tool.session = None
tool.load_tools_flag = False
tool.load_prompts_flag = False
span_exporter.clear()
with pytest.MonkeyPatch.context() as m:
m.setattr(tool, "get_mcp_client", lambda: mock_transport)
async def patched_connect(self_: Any, *, reset: bool = False, load_configured: bool = True) -> None:
# Simulate _connect_on_owner: create initialize span and call session.initialize()
from agent_framework._mcp import create_mcp_client_span
from agent_framework.observability import OtelAttr
with create_mcp_client_span("initialize", attributes=self_._mcp_base_span_attributes()) as init_span:
result = await mock_session_cls.initialize()
protocol_version = getattr(result, "protocolVersion", None)
if protocol_version:
init_span.set_attribute(OtelAttr.MCP_PROTOCOL_VERSION, protocol_version)
self_.session = mock_session_cls
self_.is_connected = True
m.setattr(MCPTool, "_connect_on_owner", patched_connect)
await tool.connect()
mock_session_cls.initialize.assert_awaited_once()
spans = span_exporter.get_finished_spans()
init_spans = [s for s in spans if s.name == "initialize"]
assert len(init_spans) == 1
span = init_spans[0]
assert span.kind == SpanKind.CLIENT
assert span.attributes[OtelAttr.MCP_METHOD_NAME] == "initialize"
assert span.attributes.get(OtelAttr.MCP_PROTOCOL_VERSION) == "2025-06-18"
# endregion
# region tools/list span
async def test_mcp_tools_list_span(span_exporter: InMemorySpanExporter):
"""session.list_tools() should produce an MCP CLIENT span named 'tools/list'."""
tool = _make_connected_mcp_tool()
tool.session.list_tools = AsyncMock(return_value=_make_tool_list_result())
span_exporter.clear()
await tool.load_tools()
spans = span_exporter.get_finished_spans()
list_spans = [s for s in spans if s.name == "tools/list"]
assert len(list_spans) == 1
span = list_spans[0]
assert span.kind == SpanKind.CLIENT
assert span.attributes[OtelAttr.MCP_METHOD_NAME] == "tools/list"
# endregion
# region prompts/list span
async def test_mcp_prompts_list_span(span_exporter: InMemorySpanExporter):
"""session.list_prompts() should produce an MCP CLIENT span named 'prompts/list'."""
tool = _make_connected_mcp_tool()
tool.session.list_prompts = AsyncMock(return_value=_make_prompt_list_result())
span_exporter.clear()
await tool.load_prompts()
spans = span_exporter.get_finished_spans()
list_spans = [s for s in spans if s.name == "prompts/list"]
assert len(list_spans) == 1
span = list_spans[0]
assert span.kind == SpanKind.CLIENT
assert span.attributes[OtelAttr.MCP_METHOD_NAME] == "prompts/list"
# endregion
# region tools/call span
async def test_mcp_tools_call_creates_client_span_when_no_parent(span_exporter: InMemorySpanExporter):
"""Direct call_tool() without FunctionTool wrapper creates new MCP CLIENT span."""
tool = _make_connected_mcp_tool()
tool.session.call_tool = AsyncMock(return_value=_make_call_tool_result("hello"))
span_exporter.clear()
result = await tool.call_tool("get-weather", city="Seattle")
assert result is not None
spans = span_exporter.get_finished_spans()
call_spans = [s for s in spans if "tools/call" in s.name]
assert len(call_spans) == 1
span = call_spans[0]
assert span.kind == SpanKind.CLIENT
assert span.name == "tools/call get-weather"
assert span.attributes[OtelAttr.MCP_METHOD_NAME] == "tools/call"
assert span.attributes[OtelAttr.TOOL_NAME] == "get-weather"
async def test_mcp_tools_call_tool_error_sets_error_type(span_exporter: InMemorySpanExporter):
"""When CallToolResult.isError is true, error.type should be 'tool_error' per MCP spec."""
tool = _make_connected_mcp_tool()
tool.session.call_tool = AsyncMock(return_value=_make_call_tool_result("bad input", is_error=True))
span_exporter.clear()
with pytest.raises(ToolExecutionException):
await tool.call_tool("get-weather", city="invalid")
spans = span_exporter.get_finished_spans()
call_spans = [s for s in spans if "tools/call" in s.name]
assert len(call_spans) == 1
span = call_spans[0]
assert span.attributes.get(OtelAttr.ERROR_TYPE) == "tool_error"
assert span.status.status_code == StatusCode.ERROR
async def test_mcp_tools_call_mcp_error_sets_error_type(span_exporter: InMemorySpanExporter):
"""When session.call_tool() raises McpError, error.type should be the exception class name."""
tool = _make_connected_mcp_tool()
tool.session.call_tool = AsyncMock(side_effect=McpError(ErrorData(code=-32600, message="invalid request")))
span_exporter.clear()
with pytest.raises(ToolExecutionException):
await tool.call_tool("get-weather")
spans = span_exporter.get_finished_spans()
call_spans = [s for s in spans if "tools/call" in s.name]
assert len(call_spans) == 1
span = call_spans[0]
assert span.attributes.get(OtelAttr.ERROR_TYPE) == "McpError"
assert span.status.status_code == StatusCode.ERROR
# endregion
# region prompts/get span
async def test_mcp_prompts_get_creates_client_span(span_exporter: InMemorySpanExporter):
"""get_prompt() should always create a new MCP CLIENT span (not enrich execute_tool)."""
tool = _make_connected_mcp_tool()
tool.session.get_prompt = AsyncMock(return_value=_make_get_prompt_result("code analysis"))
span_exporter.clear()
result = await tool.get_prompt("analyze-code", language="python")
assert "code analysis" in result
spans = span_exporter.get_finished_spans()
prompt_spans = [s for s in spans if "prompts/get" in s.name]
assert len(prompt_spans) == 1
span = prompt_spans[0]
assert span.kind == SpanKind.CLIENT
assert span.name == "prompts/get analyze-code"
assert span.attributes[OtelAttr.MCP_METHOD_NAME] == "prompts/get"
assert span.attributes[OtelAttr.PROMPT_NAME] == "analyze-code"
async def test_mcp_prompts_get_mcp_error_sets_error_type(span_exporter: InMemorySpanExporter):
"""When session.get_prompt() raises McpError, the span should have error.type and ERROR status."""
tool = _make_connected_mcp_tool()
tool.session.get_prompt = AsyncMock(
side_effect=McpError(ErrorData(code=-32602, message="prompt not found"))
)
span_exporter.clear()
with pytest.raises(ToolExecutionException):
await tool.get_prompt("missing-prompt")
spans = span_exporter.get_finished_spans()
prompt_spans = [s for s in spans if "prompts/get" in s.name]
assert len(prompt_spans) == 1
span = prompt_spans[0]
assert span.attributes.get(OtelAttr.ERROR_TYPE) == "McpError"
assert span.status.status_code == StatusCode.ERROR
# endregion
# region transport attributes
def test_mcp_stdio_tool_transport_attributes():
"""MCPStdioTool should have network.transport='pipe'."""
tool = MCPStdioTool(name="test", command="python")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.NETWORK_TRANSPORT] == "pipe"
assert OtelAttr.ADDRESS not in attrs
def test_mcp_http_tool_transport_attributes():
"""MCPStreamableHTTPTool should have tcp transport and URL-based server address/port."""
tool = MCPStreamableHTTPTool(name="test", url="https://api.example.com:8443/mcp")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.NETWORK_TRANSPORT] == "tcp"
assert attrs[OtelAttr.NETWORK_PROTOCOL_NAME] == "http"
assert attrs[OtelAttr.ADDRESS] == "api.example.com"
assert attrs[OtelAttr.PORT] == 8443
def test_mcp_http_tool_default_port():
"""MCPStreamableHTTPTool should default to 443 for https."""
tool = MCPStreamableHTTPTool(name="test", url="https://api.example.com/mcp")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.PORT] == 443
def test_mcp_http_tool_http_default_port():
"""MCPStreamableHTTPTool should default to 80 for http."""
tool = MCPStreamableHTTPTool(name="test", url="http://localhost/mcp")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.PORT] == 80
def test_mcp_websocket_tool_transport_attributes():
"""MCPWebsocketTool should have tcp transport and URL-based server address/port."""
tool = MCPWebsocketTool(name="test", url="wss://ws.example.com:9090/mcp")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.NETWORK_TRANSPORT] == "tcp"
assert attrs[OtelAttr.NETWORK_PROTOCOL_NAME] == "websocket"
assert attrs[OtelAttr.ADDRESS] == "ws.example.com"
assert attrs[OtelAttr.PORT] == 9090
def test_mcp_websocket_tool_default_port():
"""MCPWebsocketTool should default to 443 for wss."""
tool = MCPWebsocketTool(name="test", url="wss://ws.example.com/mcp")
attrs = tool._mcp_base_span_attributes()
assert attrs[OtelAttr.PORT] == 443
# endregion
# region observability disabled
@pytest.mark.parametrize("enable_instrumentation", [False], indirect=True)
async def test_mcp_spans_not_created_when_observability_disabled(span_exporter: InMemorySpanExporter):
"""No MCP spans should be created when observability is disabled."""
tool = _make_connected_mcp_tool()
tool.session.list_tools = AsyncMock(return_value=_make_tool_list_result())
tool.session.call_tool = AsyncMock(return_value=_make_call_tool_result("ok"))
span_exporter.clear()
await tool.load_tools()
await tool.call_tool("get-weather", city="Seattle")
spans = span_exporter.get_finished_spans()
assert len(spans) == 0
# endregion
@@ -46,6 +46,8 @@ async def github_mcp_example() -> None:
# The MCP tool manages the connection to the MCP server and makes its tools available
# Set approval_mode="never_require" to allow the MCP tool to execute without approval
client = OpenAIChatClient()
# Note that the tool created here will be executed remotely by OpenAI, not locally by
# your application.
github_mcp_tool = client.get_mcp_tool(
name="GitHub",
url="https://api.githubcopilot.com/mcp/",