diff --git a/python/packages/core/agent_framework/_agents.py b/python/packages/core/agent_framework/_agents.py index 80e867a51d..55e0ae42e3 100644 --- a/python/packages/core/agent_framework/_agents.py +++ b/python/packages/core/agent_framework/_agents.py @@ -9,11 +9,14 @@ from itertools import chain from typing import Any, ClassVar, Literal, Protocol, TypeVar, cast, runtime_checkable from uuid import uuid4 +from mcp import types +from mcp.server.lowlevel import Server +from mcp.shared.exceptions import McpError from pydantic import BaseModel, Field, create_model from ._clients import BaseChatClient, ChatClientProtocol from ._logging import get_logger -from ._mcp import MCPTool +from ._mcp import LOG_LEVEL_MAPPING, MCPTool from ._memory import AggregateContextProvider, Context, ContextProvider from ._middleware import Middleware, use_agent_middleware from ._serialization import SerializationMixin @@ -431,7 +434,7 @@ class BaseAgent(SerializationMixin): name=tool_name, description=tool_description, func=agent_wrapper, - input_model=input_model, + input_model=input_model, # type: ignore ) def _normalize_messages( @@ -999,6 +1002,115 @@ class ChatAgent(BaseAgent): ) return AgentThread(context_provider=self.context_provider) + def as_mcp_server( + self, + *, + server_name: str = "Agent", + version: str | None = None, + instructions: str | None = None, + lifespan: Callable[["Server[Any]"], AbstractAsyncContextManager[Any]] | None = None, + **kwargs: Any, + ) -> "Server[Any]": + """Create an MCP server from an agent instance. + + This function automatically creates a MCP server from an agent instance, it uses the provided arguments to + configure the server and exposes the agent as a single MCP tool. + + Keyword Args: + server_name: The name of the server. + version: The version of the server. + instructions: The instructions to use for the server. + lifespan: The lifespan of the server. + **kwargs: Any extra arguments to pass to the server creation. + + Returns: + The MCP server instance. + """ + server_args: dict[str, Any] = { + "name": server_name, + "version": version, + "instructions": instructions, + } + if lifespan: + server_args["lifespan"] = lifespan + if kwargs: + server_args.update(kwargs) + + server: "Server[Any]" = Server(**server_args) # type: ignore[call-arg] + + agent_tool = self.as_tool(name=self._get_agent_name()) + + async def _log(level: types.LoggingLevel, data: Any) -> None: + """Log a message to the server and logger.""" + # Log to the local logger + logger.log(LOG_LEVEL_MAPPING[level], data) + if server and server.request_context and server.request_context.session: + try: + await server.request_context.session.send_log_message(level=level, data=data) + except Exception as e: + logger.error("Failed to send log message to server: %s", e) + + @server.list_tools() # type: ignore + async def _list_tools() -> list[types.Tool]: # type: ignore + """List all tools in the agent.""" + # Get the JSON schema from the Pydantic model + schema = agent_tool.input_model.model_json_schema() + + tool = types.Tool( + name=agent_tool.name, + description=agent_tool.description, + inputSchema={ + "type": "object", + "properties": schema.get("properties", {}), + "required": schema.get("required", []), + }, + ) + + await _log(level="debug", data=f"Agent tool: {agent_tool}") + return [tool] + + @server.call_tool() # type: ignore + async def _call_tool( # type: ignore + name: str, arguments: dict[str, Any] + ) -> Sequence[types.TextContent | types.ImageContent | types.AudioContent | types.EmbeddedResource]: + """Call a tool in the agent.""" + await _log(level="debug", data=f"Calling tool with args: {arguments}") + + if name != agent_tool.name: + raise McpError( + error=types.ErrorData( + code=types.INTERNAL_ERROR, + message=f"Tool {name} not found", + ), + ) + + # Create an instance of the input model with the arguments + try: + args_instance = agent_tool.input_model(**arguments) + result = await agent_tool.invoke(arguments=args_instance) + except Exception as e: + raise McpError( + error=types.ErrorData( + code=types.INTERNAL_ERROR, + message=f"Error calling tool {name}: {e}", + ), + ) from e + + # Convert result to MCP content + if isinstance(result, str): + return [types.TextContent(type="text", text=result)] + + return [types.TextContent(type="text", text=str(result))] + + @server.set_logging_level() # type: ignore + async def _set_logging_level(level: types.LoggingLevel) -> None: # type: ignore + """Set the logging level for the server.""" + logger.setLevel(LOG_LEVEL_MAPPING[level]) + # emit this log with the new minimum level + await _log(level=level, data=f"Log level set to {level}") + + return server + async def _update_thread_with_type_and_conversation_id( self, thread: AgentThread, response_conversation_id: str | None ) -> None: diff --git a/python/samples/getting_started/mcp/agent_as_mcp_server.py b/python/samples/getting_started/mcp/agent_as_mcp_server.py new file mode 100644 index 0000000000..da2feac9c2 --- /dev/null +++ b/python/samples/getting_started/mcp/agent_as_mcp_server.py @@ -0,0 +1,71 @@ +# Copyright (c) Microsoft. All rights reserved. + +from typing import Annotated, Any + +import anyio +from agent_framework.openai import OpenAIResponsesClient + +""" +This sample demonstrates how to expose an Agent as an MCP server. + +To run this sample, set up your MCP host (like Claude Desktop or VSCode Github Copilot Agents) +with the following configuration: +```json +{ + "servers": { + "agent-framework": { + "command": "uv", + "args": [ + "--directory=/agent-framework/python/samples/getting_started/mcp", + "run", + "agent_as_mcp_server.py" + ], + "env": { + "OPENAI_API_KEY": "", + "OPENAI_RESPONSES_MODEL_ID": "", + } + } + } +} +``` +""" + + +def get_specials() -> Annotated[str, "Returns the specials from the menu."]: + return """ + Special Soup: Clam Chowder + Special Salad: Cobb Salad + Special Drink: Chai Tea + """ + + +def get_item_price( + menu_item: Annotated[str, "The name of the menu item."], +) -> Annotated[str, "Returns the price of the menu item."]: + return "$9.99" + + +async def run() -> None: + # Define an agent + # Agent's name and description provide better context for AI model + agent = OpenAIResponsesClient().create_agent( + name="RestaurantAgent", + description="Answer questions about the menu.", + tools=[get_specials, get_item_price], + ) + + # Expose the agent as an MCP server + server = agent.as_mcp_server() + + # Run server + from mcp.server.stdio import stdio_server + + async def handle_stdin(stdin: Any | None = None, stdout: Any | None = None) -> None: + async with stdio_server() as (read_stream, write_stream): + await server.run(read_stream, write_stream, server.create_initialization_options()) + + await handle_stdin() + + +if __name__ == "__main__": + anyio.run(run)