Python: Added method to expose agent as MCP server (#1248)

* Added method to expose agent as MCP server

* Update python/packages/core/agent_framework/_agents.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Update python/packages/core/agent_framework/_agents.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* Addressed PR feedback

* Updated doc string

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
Dmytro Struk
2025-10-07 10:21:53 -07:00
committed by GitHub
Unverified
parent 1cff86b4ff
commit a45e2ab28e
2 changed files with 185 additions and 2 deletions
+114 -2
View File
@@ -9,11 +9,14 @@ from itertools import chain
from typing import Any, ClassVar, Literal, Protocol, TypeVar, cast, runtime_checkable
from uuid import uuid4
from mcp import types
from mcp.server.lowlevel import Server
from mcp.shared.exceptions import McpError
from pydantic import BaseModel, Field, create_model
from ._clients import BaseChatClient, ChatClientProtocol
from ._logging import get_logger
from ._mcp import MCPTool
from ._mcp import LOG_LEVEL_MAPPING, MCPTool
from ._memory import AggregateContextProvider, Context, ContextProvider
from ._middleware import Middleware, use_agent_middleware
from ._serialization import SerializationMixin
@@ -431,7 +434,7 @@ class BaseAgent(SerializationMixin):
name=tool_name,
description=tool_description,
func=agent_wrapper,
input_model=input_model,
input_model=input_model, # type: ignore
)
def _normalize_messages(
@@ -999,6 +1002,115 @@ class ChatAgent(BaseAgent):
)
return AgentThread(context_provider=self.context_provider)
def as_mcp_server(
self,
*,
server_name: str = "Agent",
version: str | None = None,
instructions: str | None = None,
lifespan: Callable[["Server[Any]"], AbstractAsyncContextManager[Any]] | None = None,
**kwargs: Any,
) -> "Server[Any]":
"""Create an MCP server from an agent instance.
This function automatically creates a MCP server from an agent instance, it uses the provided arguments to
configure the server and exposes the agent as a single MCP tool.
Keyword Args:
server_name: The name of the server.
version: The version of the server.
instructions: The instructions to use for the server.
lifespan: The lifespan of the server.
**kwargs: Any extra arguments to pass to the server creation.
Returns:
The MCP server instance.
"""
server_args: dict[str, Any] = {
"name": server_name,
"version": version,
"instructions": instructions,
}
if lifespan:
server_args["lifespan"] = lifespan
if kwargs:
server_args.update(kwargs)
server: "Server[Any]" = Server(**server_args) # type: ignore[call-arg]
agent_tool = self.as_tool(name=self._get_agent_name())
async def _log(level: types.LoggingLevel, data: Any) -> None:
"""Log a message to the server and logger."""
# Log to the local logger
logger.log(LOG_LEVEL_MAPPING[level], data)
if server and server.request_context and server.request_context.session:
try:
await server.request_context.session.send_log_message(level=level, data=data)
except Exception as e:
logger.error("Failed to send log message to server: %s", e)
@server.list_tools() # type: ignore
async def _list_tools() -> list[types.Tool]: # type: ignore
"""List all tools in the agent."""
# Get the JSON schema from the Pydantic model
schema = agent_tool.input_model.model_json_schema()
tool = types.Tool(
name=agent_tool.name,
description=agent_tool.description,
inputSchema={
"type": "object",
"properties": schema.get("properties", {}),
"required": schema.get("required", []),
},
)
await _log(level="debug", data=f"Agent tool: {agent_tool}")
return [tool]
@server.call_tool() # type: ignore
async def _call_tool( # type: ignore
name: str, arguments: dict[str, Any]
) -> Sequence[types.TextContent | types.ImageContent | types.AudioContent | types.EmbeddedResource]:
"""Call a tool in the agent."""
await _log(level="debug", data=f"Calling tool with args: {arguments}")
if name != agent_tool.name:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Tool {name} not found",
),
)
# Create an instance of the input model with the arguments
try:
args_instance = agent_tool.input_model(**arguments)
result = await agent_tool.invoke(arguments=args_instance)
except Exception as e:
raise McpError(
error=types.ErrorData(
code=types.INTERNAL_ERROR,
message=f"Error calling tool {name}: {e}",
),
) from e
# Convert result to MCP content
if isinstance(result, str):
return [types.TextContent(type="text", text=result)]
return [types.TextContent(type="text", text=str(result))]
@server.set_logging_level() # type: ignore
async def _set_logging_level(level: types.LoggingLevel) -> None: # type: ignore
"""Set the logging level for the server."""
logger.setLevel(LOG_LEVEL_MAPPING[level])
# emit this log with the new minimum level
await _log(level=level, data=f"Log level set to {level}")
return server
async def _update_thread_with_type_and_conversation_id(
self, thread: AgentThread, response_conversation_id: str | None
) -> None: