Python: Add BaseAgent implementation for GitHub Copilot SDK (#3404)

* Added GithubCopilotAgent

* Fixed errors

* Updated examples

* Updated naming and tests

* Resolved comments and more tests

* Updated tool handling

* Updated tool handling

* Small fixes

* Removed default permission handler

* Resolved comments

* Updated positional args

* Updated docstrings

* Small fixes and more examples

* Added example with MCP
This commit is contained in:
Dmytro Struk
2026-01-26 10:09:04 -08:00
committed by GitHub
Unverified
parent a3a9147e61
commit 407fb3025e
21 changed files with 2207 additions and 0 deletions
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
+11
View File
@@ -0,0 +1,11 @@
# Get Started with Microsoft Agent Framework GitHub Copilot
Please install this package via pip:
```bash
pip install agent-framework-github-copilot --pre
```
## GitHub Copilot Agent
The GitHub Copilot agent enables integration with GitHub Copilot, allowing you to interact with Copilot's agentic capabilities through the Agent Framework.
@@ -0,0 +1,18 @@
# Copyright (c) Microsoft. All rights reserved.
import importlib.metadata
from ._agent import GithubCopilotAgent, GithubCopilotOptions
from ._settings import GithubCopilotSettings
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0"
__all__ = [
"GithubCopilotAgent",
"GithubCopilotOptions",
"GithubCopilotSettings",
"__version__",
]
@@ -0,0 +1,528 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import contextlib
import logging
import sys
from collections.abc import AsyncIterable, Callable, MutableMapping, Sequence
from typing import Any, ClassVar, Generic, TypedDict
from agent_framework import (
AgentMiddlewareTypes,
AgentResponse,
AgentResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Content,
ContextProvider,
Role,
normalize_messages,
)
from agent_framework._tools import AIFunction, ToolProtocol
from agent_framework._types import normalize_tools
from agent_framework.exceptions import ServiceException, ServiceInitializationError
from copilot import CopilotClient, CopilotSession
from copilot.generated.session_events import SessionEvent, SessionEventType
from copilot.types import (
CopilotClientOptions,
MCPServerConfig,
PermissionRequest,
PermissionRequestResult,
ResumeSessionConfig,
SessionConfig,
ToolInvocation,
ToolResult,
)
from copilot.types import Tool as CopilotTool
from pydantic import ValidationError
from ._settings import GithubCopilotSettings
if sys.version_info >= (3, 13):
from typing import TypeVar
else:
from typing_extensions import TypeVar
DEFAULT_TIMEOUT_SECONDS: float = 60.0
"""Default timeout in seconds for Copilot requests."""
PermissionHandlerType = Callable[[PermissionRequest, dict[str, str]], PermissionRequestResult]
"""Type for permission request handlers."""
logger = logging.getLogger("agent_framework.github_copilot")
class GithubCopilotOptions(TypedDict, total=False):
"""GitHub Copilot-specific options."""
instructions: str
"""System message to append to the session."""
cli_path: str
"""Path to the Copilot CLI executable. Defaults to GITHUB_COPILOT_CLI_PATH environment variable
or 'copilot' in PATH."""
model: str
"""Model to use (e.g., "gpt-5", "claude-sonnet-4"). Defaults to GITHUB_COPILOT_MODEL environment variable."""
timeout: float
"""Request timeout in seconds. Defaults to GITHUB_COPILOT_TIMEOUT environment variable or 60 seconds."""
log_level: str
"""CLI log level. Defaults to GITHUB_COPILOT_LOG_LEVEL environment variable."""
on_permission_request: PermissionHandlerType
"""Permission request handler.
Called when Copilot requests permission to perform an action (shell, read, write, etc.).
Takes a PermissionRequest and context dict, returns PermissionRequestResult.
If not provided, all permission requests will be denied by default.
"""
mcp_servers: dict[str, MCPServerConfig]
"""MCP (Model Context Protocol) server configurations.
A dictionary mapping server names to their configurations.
Supports both local (stdio) and remote (HTTP/SSE) servers.
"""
TOptions = TypeVar(
"TOptions",
bound=TypedDict, # type: ignore[valid-type]
default="GithubCopilotOptions",
covariant=True,
)
class GithubCopilotAgent(BaseAgent, Generic[TOptions]):
"""A GitHub Copilot Agent.
This agent wraps the GitHub Copilot SDK to provide Copilot agentic capabilities
within the Agent Framework. It supports both streaming and non-streaming responses,
custom tools, and session management.
The agent can be used as an async context manager to ensure proper cleanup:
Examples:
Basic usage:
.. code-block:: python
async with GithubCopilotAgent() as agent:
response = await agent.run("Hello, world!")
print(response)
With explicitly typed options:
.. code-block:: python
from agent_framework_github_copilot import GithubCopilotAgent, GithubCopilotOptions
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
default_options={"model": "claude-sonnet-4", "timeout": 120}
)
With tools:
.. code-block:: python
def get_weather(city: str) -> str:
return f"Weather in {city} is sunny"
async with GithubCopilotAgent(tools=[get_weather]) as agent:
response = await agent.run("What's the weather in Seattle?")
"""
AGENT_PROVIDER_NAME: ClassVar[str] = "github.copilot"
def __init__(
self,
*,
client: CopilotClient | None = None,
id: str | None = None,
name: str | None = None,
description: str | None = None,
context_provider: ContextProvider | None = None,
middleware: Sequence[AgentMiddlewareTypes] | None = None,
tools: ToolProtocol
| Callable[..., Any]
| MutableMapping[str, Any]
| Sequence[ToolProtocol | Callable[..., Any] | MutableMapping[str, Any]]
| None = None,
default_options: TOptions | None = None,
env_file_path: str | None = None,
env_file_encoding: str | None = None,
) -> None:
"""Initialize the GitHub Copilot Agent.
Keyword Args:
client: Optional pre-configured CopilotClient instance. If not provided,
a new client will be created using the other parameters.
id: ID of the GithubCopilotAgent.
name: Name of the GithubCopilotAgent.
description: Description of the GithubCopilotAgent.
context_provider: Context Provider, to be used by the agent.
middleware: Agent middleware used by the agent.
tools: Tools to use for the agent. Can be functions, ToolProtocol instances,
or tool definition dicts. These are converted to Copilot SDK tools internally.
default_options: Default options for the agent. Can include cli_path, model,
timeout, log_level, etc.
env_file_path: Optional path to .env file for loading configuration.
env_file_encoding: Encoding of the .env file, defaults to 'utf-8'.
Raises:
ServiceInitializationError: If required configuration is missing or invalid.
"""
super().__init__(
id=id,
name=name,
description=description,
context_provider=context_provider,
middleware=list(middleware) if middleware else None,
)
self._client = client
self._owns_client = client is None
# Parse options
opts: dict[str, Any] = dict(default_options) if default_options else {}
instructions = opts.pop("instructions", None)
cli_path = opts.pop("cli_path", None)
model = opts.pop("model", None)
timeout = opts.pop("timeout", None)
log_level = opts.pop("log_level", None)
on_permission_request: PermissionHandlerType | None = opts.pop("on_permission_request", None)
mcp_servers: dict[str, MCPServerConfig] | None = opts.pop("mcp_servers", None)
try:
self._settings = GithubCopilotSettings(
cli_path=cli_path,
model=model,
timeout=timeout,
log_level=log_level,
env_file_path=env_file_path,
env_file_encoding=env_file_encoding,
)
except ValidationError as ex:
raise ServiceInitializationError("Failed to create GitHub Copilot settings.", ex) from ex
self._instructions = instructions
self._tools = normalize_tools(tools)
self._permission_handler = on_permission_request
self._mcp_servers = mcp_servers
self._default_options = opts
self._started = False
async def __aenter__(self) -> "GithubCopilotAgent[TOptions]":
"""Start the agent when entering async context."""
await self.start()
return self
async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Stop the agent when exiting async context."""
await self.stop()
async def start(self) -> None:
"""Start the Copilot client.
This method initializes the Copilot client and establishes a connection
to the Copilot CLI server. It is called automatically when using the
agent as an async context manager.
Raises:
ServiceException: If the client fails to start.
"""
if self._started:
return
if self._client is None:
client_options: CopilotClientOptions = {}
if self._settings.cli_path:
client_options["cli_path"] = self._settings.cli_path
if self._settings.log_level:
client_options["log_level"] = self._settings.log_level # type: ignore[typeddict-item]
self._client = CopilotClient(client_options if client_options else None)
try:
await self._client.start()
self._started = True
except Exception as ex:
raise ServiceException(f"Failed to start GitHub Copilot client: {ex}") from ex
async def stop(self) -> None:
"""Stop the Copilot client and clean up resources.
Stops the Copilot client if owned by this agent. The client handles
session cleanup internally. Called automatically when using the agent
as an async context manager.
"""
if self._client and self._owns_client:
with contextlib.suppress(Exception):
await self._client.stop()
self._started = False
async def run(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
options: TOptions | None = None,
**kwargs: Any,
) -> AgentResponse:
"""Get a response from the agent.
This method returns the final result of the agent's execution
as a single AgentResponse object. The caller is blocked until
the final result is available.
Args:
messages: The message(s) to send to the agent.
Keyword Args:
thread: The conversation thread associated with the message(s).
options: Runtime options (model, timeout, etc.).
kwargs: Additional keyword arguments.
Returns:
An agent response item.
Raises:
ServiceException: If the request fails.
"""
if not self._started:
await self.start()
if not thread:
thread = self.get_new_thread()
opts: dict[str, Any] = dict(options) if options else {}
timeout = opts.pop("timeout", None) or self._settings.timeout or DEFAULT_TIMEOUT_SECONDS
session = await self._get_or_create_session(thread, streaming=False)
input_messages = normalize_messages(messages)
prompt = "\n".join([message.text for message in input_messages])
try:
response_event = await session.send_and_wait({"prompt": prompt}, timeout=timeout)
except Exception as ex:
raise ServiceException(f"GitHub Copilot request failed: {ex}") from ex
response_messages: list[ChatMessage] = []
response_id: str | None = None
# send_and_wait returns only the final ASSISTANT_MESSAGE event;
# other events (deltas, tool calls) are handled internally by the SDK.
if response_event and response_event.type == SessionEventType.ASSISTANT_MESSAGE:
message_id = response_event.data.message_id
if response_event.data.content:
response_messages.append(
ChatMessage(
role=Role.ASSISTANT,
contents=[Content.from_text(response_event.data.content)],
message_id=message_id,
raw_representation=response_event,
)
)
response_id = message_id
return AgentResponse(messages=response_messages, response_id=response_id)
async def run_stream(
self,
messages: str | ChatMessage | Sequence[str | ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
options: TOptions | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentResponseUpdate]:
"""Run the agent as a stream.
This method will return the intermediate steps and final results of the
agent's execution as a stream of AgentResponseUpdate objects to the caller.
Args:
messages: The message(s) to send to the agent.
Keyword Args:
thread: The conversation thread associated with the message(s).
options: Runtime options (model, timeout, etc.).
kwargs: Additional keyword arguments.
Yields:
An agent response update for each delta.
Raises:
ServiceException: If the request fails.
"""
if not self._started:
await self.start()
if not thread:
thread = self.get_new_thread()
session = await self._get_or_create_session(thread, streaming=True)
input_messages = normalize_messages(messages)
prompt = "\n".join([message.text for message in input_messages])
queue: asyncio.Queue[AgentResponseUpdate | Exception | None] = asyncio.Queue()
def event_handler(event: SessionEvent) -> None:
if event.type == SessionEventType.ASSISTANT_MESSAGE_DELTA:
if event.data.delta_content:
update = AgentResponseUpdate(
role=Role.ASSISTANT,
contents=[Content.from_text(event.data.delta_content)],
response_id=event.data.message_id,
message_id=event.data.message_id,
raw_representation=event,
)
queue.put_nowait(update)
elif event.type == SessionEventType.SESSION_IDLE:
queue.put_nowait(None)
elif event.type == SessionEventType.SESSION_ERROR:
error_msg = event.data.message or "Unknown error"
queue.put_nowait(ServiceException(f"GitHub Copilot session error: {error_msg}"))
unsubscribe = session.on(event_handler)
try:
await session.send({"prompt": prompt})
while (item := await queue.get()) is not None:
if isinstance(item, Exception):
raise item
yield item
finally:
unsubscribe()
def _prepare_tools(
self,
tools: list[ToolProtocol | MutableMapping[str, Any]],
) -> list[CopilotTool]:
"""Convert Agent Framework tools to Copilot SDK tools.
Args:
tools: List of Agent Framework tools.
Returns:
List of Copilot SDK tools.
"""
copilot_tools: list[CopilotTool] = []
for tool in tools:
if isinstance(tool, ToolProtocol):
match tool:
case AIFunction():
copilot_tools.append(self._ai_function_to_copilot_tool(tool)) # type: ignore
case _:
logger.debug(f"Unsupported tool type: {type(tool)}")
elif isinstance(tool, CopilotTool):
copilot_tools.append(tool)
return copilot_tools
def _ai_function_to_copilot_tool(self, ai_func: AIFunction[Any, Any]) -> CopilotTool:
"""Convert an AIFunction to a Copilot SDK tool."""
async def handler(invocation: ToolInvocation) -> ToolResult:
args = invocation.get("arguments", {})
try:
if ai_func.input_model:
args_instance = ai_func.input_model(**args)
result = await ai_func.invoke(arguments=args_instance)
else:
result = await ai_func.invoke(arguments=args)
return ToolResult(
textResultForLlm=str(result),
resultType="success",
)
except Exception as e:
return ToolResult(
textResultForLlm=f"Error: {e}",
resultType="failure",
error=str(e),
)
return CopilotTool(
name=ai_func.name,
description=ai_func.description,
handler=handler,
parameters=ai_func.parameters(),
)
async def _get_or_create_session(
self,
thread: AgentThread,
streaming: bool = False,
) -> CopilotSession:
"""Get an existing session or create a new one for the thread.
Args:
thread: The conversation thread.
streaming: Whether to enable streaming for the session.
Returns:
A CopilotSession instance.
Raises:
ServiceException: If the session cannot be created.
"""
if not self._client:
raise ServiceException("GitHub Copilot client not initialized. Call start() first.")
try:
if thread.service_thread_id:
return await self._resume_session(thread.service_thread_id, streaming)
session = await self._create_session(streaming)
thread.service_thread_id = session.session_id
return session
except Exception as ex:
raise ServiceException(f"Failed to create GitHub Copilot session: {ex}") from ex
async def _create_session(self, streaming: bool) -> CopilotSession:
"""Create a new Copilot session."""
if not self._client:
raise ServiceException("GitHub Copilot client not initialized. Call start() first.")
config: SessionConfig = {"streaming": streaming}
if self._settings.model:
config["model"] = self._settings.model # type: ignore[typeddict-item]
if self._instructions:
config["system_message"] = {"mode": "append", "content": self._instructions}
if self._tools:
config["tools"] = self._prepare_tools(self._tools)
if self._permission_handler:
config["on_permission_request"] = self._permission_handler
if self._mcp_servers:
config["mcp_servers"] = self._mcp_servers
return await self._client.create_session(config)
async def _resume_session(self, session_id: str, streaming: bool) -> CopilotSession:
"""Resume an existing Copilot session by ID."""
if not self._client:
raise ServiceException("GitHub Copilot client not initialized. Call start() first.")
config: ResumeSessionConfig = {"streaming": streaming}
if self._tools:
config["tools"] = self._prepare_tools(self._tools)
if self._permission_handler:
config["on_permission_request"] = self._permission_handler
if self._mcp_servers:
config["mcp_servers"] = self._mcp_servers
return await self._client.resume_session(session_id, config)
@@ -0,0 +1,49 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import ClassVar
from agent_framework._pydantic import AFBaseSettings
class GithubCopilotSettings(AFBaseSettings):
"""GitHub Copilot model settings.
The settings are first loaded from environment variables with the prefix 'GITHUB_COPILOT_'.
If the environment variables are not found, the settings can be loaded from a .env file
with the encoding 'utf-8'. If the settings are not found in the .env file, the settings
are ignored; however, validation will fail alerting that the settings are missing.
Keyword Args:
cli_path: Path to the Copilot CLI executable.
Can be set via environment variable GITHUB_COPILOT_CLI_PATH.
model: Model to use (e.g., "gpt-5", "claude-sonnet-4").
Can be set via environment variable GITHUB_COPILOT_MODEL.
timeout: Request timeout in seconds.
Can be set via environment variable GITHUB_COPILOT_TIMEOUT.
log_level: CLI log level.
Can be set via environment variable GITHUB_COPILOT_LOG_LEVEL.
env_file_path: If provided, the .env settings are read from this file path location.
env_file_encoding: The encoding of the .env file, defaults to 'utf-8'.
Examples:
.. code-block:: python
from agent_framework_github_copilot import GithubCopilotSettings
# Using environment variables
# Set GITHUB_COPILOT_MODEL=gpt-5
settings = GithubCopilotSettings()
# Or passing parameters directly
settings = GithubCopilotSettings(model="claude-sonnet-4", timeout=120)
# Or loading from a .env file
settings = GithubCopilotSettings(env_file_path="path/to/.env")
"""
env_prefix: ClassVar[str] = "GITHUB_COPILOT_"
cli_path: str | None = None
model: str | None = None
timeout: float | None = None
log_level: str | None = None
@@ -0,0 +1,89 @@
[project]
name = "agent-framework-github-copilot"
description = "GitHub Copilot integration for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "af-support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "1.0.0b260116"
license-files = ["LICENSE"]
urls.homepage = "https://aka.ms/agent-framework"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Typing :: Typed",
]
dependencies = [
"agent-framework-core",
"github-copilot-sdk>=0.1.0",
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*"
]
timeout = 120
[tool.ruff]
extend = "../../pyproject.toml"
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extends = "../../pyproject.toml"
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_github_copilot"]
exclude_dirs = ["tests"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks]
mypy = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_github_copilot"
test = "pytest --cov=agent_framework_github_copilot --cov-report=term-missing:skip-covered tests"
[build-system]
requires = ["flit-core >= 3.11,<4.0"]
build-backend = "flit_core.buildapi"
@@ -0,0 +1 @@
# Copyright (c) Microsoft. All rights reserved.
@@ -0,0 +1,871 @@
# Copyright (c) Microsoft. All rights reserved.
import unittest.mock
from datetime import datetime, timezone
from typing import Any
from unittest.mock import AsyncMock, MagicMock, patch
from uuid import uuid4
import pytest
from agent_framework import AgentResponse, AgentResponseUpdate, AgentThread, ChatMessage, Content, Role
from agent_framework.exceptions import ServiceException
from copilot.generated.session_events import Data, SessionEvent, SessionEventType
from agent_framework_github_copilot import GithubCopilotAgent, GithubCopilotOptions
def create_session_event(
event_type: SessionEventType,
content: str | None = None,
delta_content: str | None = None,
message_id: str | None = None,
error_message: str | None = None,
) -> SessionEvent:
"""Create a mock session event for testing."""
data = Data(
content=content,
delta_content=delta_content,
message_id=message_id or str(uuid4()),
message=error_message,
)
return SessionEvent(
data=data,
id=uuid4(),
timestamp=datetime.now(timezone.utc),
type=event_type,
)
@pytest.fixture
def mock_session() -> MagicMock:
"""Create a mock CopilotSession."""
session = MagicMock()
session.session_id = "test-session-id"
session.send = AsyncMock(return_value="test-message-id")
session.send_and_wait = AsyncMock()
session.destroy = AsyncMock()
session.on = MagicMock(return_value=lambda: None)
return session
@pytest.fixture
def mock_client(mock_session: MagicMock) -> MagicMock:
"""Create a mock CopilotClient."""
client = MagicMock()
client.start = AsyncMock()
client.stop = AsyncMock(return_value=[])
client.create_session = AsyncMock(return_value=mock_session)
client.resume_session = AsyncMock(return_value=mock_session)
return client
@pytest.fixture
def assistant_message_event() -> SessionEvent:
"""Create a mock assistant message event."""
return create_session_event(
SessionEventType.ASSISTANT_MESSAGE,
content="Test response",
message_id="test-msg-id",
)
@pytest.fixture
def assistant_delta_event() -> SessionEvent:
"""Create a mock assistant message delta event."""
return create_session_event(
SessionEventType.ASSISTANT_MESSAGE_DELTA,
delta_content="Hello",
message_id="test-msg-id",
)
@pytest.fixture
def session_idle_event() -> SessionEvent:
"""Create a mock session idle event."""
return create_session_event(SessionEventType.SESSION_IDLE)
@pytest.fixture
def session_error_event() -> SessionEvent:
"""Create a mock session error event."""
return create_session_event(
SessionEventType.SESSION_ERROR,
error_message="Test error",
)
class TestGithubCopilotAgentInit:
"""Test cases for GithubCopilotAgent initialization."""
def test_init_with_client(self, mock_client: MagicMock) -> None:
"""Test initialization with pre-configured client."""
agent = GithubCopilotAgent(client=mock_client)
assert agent._client == mock_client # type: ignore
assert agent._owns_client is False # type: ignore
assert agent.id is not None
def test_init_without_client(self) -> None:
"""Test initialization without client creates settings."""
agent = GithubCopilotAgent()
assert agent._client is None # type: ignore
assert agent._owns_client is True # type: ignore
assert agent._settings is not None # type: ignore
def test_init_with_default_options(self) -> None:
"""Test initialization with default_options parameter."""
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
default_options={"model": "claude-sonnet-4", "timeout": 120}
)
assert agent._settings.model == "claude-sonnet-4" # type: ignore
assert agent._settings.timeout == 120 # type: ignore
def test_init_with_tools(self) -> None:
"""Test initialization with function tools."""
def my_tool(arg: str) -> str:
return f"Result: {arg}"
agent = GithubCopilotAgent(tools=[my_tool])
assert len(agent._tools) == 1 # type: ignore
def test_init_with_instructions(self) -> None:
"""Test initialization with custom instructions."""
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
default_options={"instructions": "You are a helpful assistant."}
)
assert agent._instructions == "You are a helpful assistant." # type: ignore
class TestGithubCopilotAgentLifecycle:
"""Test cases for agent lifecycle management."""
async def test_start_creates_client(self) -> None:
"""Test that start creates a client if none provided."""
with patch("agent_framework_github_copilot._agent.CopilotClient") as MockClient:
mock_client = MagicMock()
mock_client.start = AsyncMock()
MockClient.return_value = mock_client
agent = GithubCopilotAgent()
await agent.start()
MockClient.assert_called_once()
mock_client.start.assert_called_once()
assert agent._started is True # type: ignore
async def test_start_uses_existing_client(self, mock_client: MagicMock) -> None:
"""Test that start uses provided client."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
mock_client.start.assert_called_once()
assert agent._started is True # type: ignore
async def test_start_idempotent(self, mock_client: MagicMock) -> None:
"""Test that calling start multiple times is safe."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
await agent.start()
mock_client.start.assert_called_once()
async def test_stop_cleans_up(self, mock_client: MagicMock, mock_session: MagicMock) -> None:
"""Test that stop resets started state."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
await agent.stop()
assert agent._started is False # type: ignore
async def test_context_manager(self, mock_client: MagicMock) -> None:
"""Test async context manager usage."""
async with GithubCopilotAgent(client=mock_client) as agent:
assert agent._started is True # type: ignore
# When client is provided externally, agent doesn't own it and won't stop it
mock_client.stop.assert_not_called()
assert agent._started is False # type: ignore
async def test_stop_calls_client_stop_when_agent_owns_client(self) -> None:
"""Test that stop calls client.stop() when agent created the client."""
with patch("agent_framework_github_copilot._agent.CopilotClient") as MockClient:
mock_client = MagicMock()
mock_client.start = AsyncMock()
mock_client.stop = AsyncMock()
MockClient.return_value = mock_client
agent = GithubCopilotAgent()
await agent.start()
await agent.stop()
mock_client.stop.assert_called_once()
async def test_start_creates_client_with_options(self) -> None:
"""Test that start creates client with cli_path and log_level from settings."""
with patch("agent_framework_github_copilot._agent.CopilotClient") as MockClient:
mock_client = MagicMock()
mock_client.start = AsyncMock()
MockClient.return_value = mock_client
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
default_options={"cli_path": "/custom/path", "log_level": "debug"}
)
await agent.start()
call_args = MockClient.call_args[0][0]
assert call_args["cli_path"] == "/custom/path"
assert call_args["log_level"] == "debug"
class TestGithubCopilotAgentRun:
"""Test cases for run method."""
async def test_run_string_message(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test run method with string message."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
response = await agent.run("Hello")
assert isinstance(response, AgentResponse)
assert len(response.messages) == 1
assert response.messages[0].role == Role.ASSISTANT
assert response.messages[0].contents[0].text == "Test response"
async def test_run_chat_message(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test run method with ChatMessage."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
chat_message = ChatMessage(role=Role.USER, contents=[Content.from_text("Hello")])
response = await agent.run(chat_message)
assert isinstance(response, AgentResponse)
assert len(response.messages) == 1
async def test_run_with_thread(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test run method with existing thread."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
thread = AgentThread()
response = await agent.run("Hello", thread=thread)
assert isinstance(response, AgentResponse)
assert thread.service_thread_id == mock_session.session_id
async def test_run_with_runtime_options(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test run method with runtime options."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
response = await agent.run("Hello", options={"timeout": 30})
assert isinstance(response, AgentResponse)
async def test_run_empty_response(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test run method with no response event."""
mock_session.send_and_wait.return_value = None
agent = GithubCopilotAgent(client=mock_client)
response = await agent.run("Hello")
assert isinstance(response, AgentResponse)
assert len(response.messages) == 0
async def test_run_auto_starts(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test that run auto-starts the agent if not started."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
assert agent._started is False # type: ignore
await agent.run("Hello")
assert agent._started is True # type: ignore
mock_client.start.assert_called_once()
class TestGithubCopilotAgentRunStream:
"""Test cases for run_stream method."""
async def test_run_stream_basic(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_delta_event: SessionEvent,
session_idle_event: SessionEvent,
) -> None:
"""Test basic streaming response."""
events = [assistant_delta_event, session_idle_event]
def mock_on(handler: Any) -> Any:
for event in events:
handler(event)
return lambda: None
mock_session.on = mock_on
agent = GithubCopilotAgent(client=mock_client)
responses: list[AgentResponseUpdate] = []
async for update in agent.run_stream("Hello"):
responses.append(update)
assert len(responses) == 1
assert isinstance(responses[0], AgentResponseUpdate)
assert responses[0].role == Role.ASSISTANT
assert responses[0].contents[0].text == "Hello"
async def test_run_stream_with_thread(
self,
mock_client: MagicMock,
mock_session: MagicMock,
session_idle_event: SessionEvent,
) -> None:
"""Test streaming with existing thread."""
def mock_on(handler: Any) -> Any:
handler(session_idle_event)
return lambda: None
mock_session.on = mock_on
agent = GithubCopilotAgent(client=mock_client)
thread = AgentThread()
async for _ in agent.run_stream("Hello", thread=thread):
pass
assert thread.service_thread_id == mock_session.session_id
async def test_run_stream_error(
self,
mock_client: MagicMock,
mock_session: MagicMock,
session_error_event: SessionEvent,
) -> None:
"""Test streaming error handling."""
def mock_on(handler: Any) -> Any:
handler(session_error_event)
return lambda: None
mock_session.on = mock_on
agent = GithubCopilotAgent(client=mock_client)
with pytest.raises(ServiceException, match="session error"):
async for _ in agent.run_stream("Hello"):
pass
async def test_run_stream_auto_starts(
self,
mock_client: MagicMock,
mock_session: MagicMock,
session_idle_event: SessionEvent,
) -> None:
"""Test that run_stream auto-starts the agent if not started."""
def mock_on(handler: Any) -> Any:
handler(session_idle_event)
return lambda: None
mock_session.on = mock_on
agent = GithubCopilotAgent(client=mock_client)
assert agent._started is False # type: ignore
async for _ in agent.run_stream("Hello"):
pass
assert agent._started is True # type: ignore
mock_client.start.assert_called_once()
class TestGithubCopilotAgentSessionManagement:
"""Test cases for session management."""
async def test_session_resumed_for_same_thread(
self,
mock_client: MagicMock,
mock_session: MagicMock,
assistant_message_event: SessionEvent,
) -> None:
"""Test that subsequent calls on the same thread resume the session."""
mock_session.send_and_wait.return_value = assistant_message_event
agent = GithubCopilotAgent(client=mock_client)
thread = AgentThread()
await agent.run("Hello", thread=thread)
await agent.run("World", thread=thread)
mock_client.create_session.assert_called_once()
mock_client.resume_session.assert_called_once_with(mock_session.session_id, unittest.mock.ANY)
async def test_session_config_includes_model(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config includes model setting."""
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client, default_options={"model": "claude-sonnet-4"}
)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert config["model"] == "claude-sonnet-4"
async def test_session_config_includes_instructions(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config includes instructions."""
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client,
default_options={"instructions": "You are a helpful assistant."},
)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert config["system_message"]["mode"] == "append"
assert config["system_message"]["content"] == "You are a helpful assistant."
async def test_session_config_includes_streaming_flag(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config includes the streaming flag."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
await agent._get_or_create_session(AgentThread(), streaming=True) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert config["streaming"] is True
async def test_resume_session_with_existing_service_thread_id(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session is resumed when thread has a service_thread_id."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
thread = AgentThread()
thread.service_thread_id = "existing-session-id"
await agent._get_or_create_session(thread) # type: ignore
mock_client.create_session.assert_not_called()
mock_client.resume_session.assert_called_once()
call_args = mock_client.resume_session.call_args
assert call_args[0][0] == "existing-session-id"
async def test_resume_session_includes_tools_and_permissions(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that resumed session config includes tools and permission handler."""
from copilot.types import PermissionRequest, PermissionRequestResult
def my_handler(request: PermissionRequest, context: dict[str, str]) -> PermissionRequestResult:
return PermissionRequestResult(kind="approved")
def my_tool(arg: str) -> str:
"""A test tool."""
return arg
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client,
tools=[my_tool],
default_options={"on_permission_request": my_handler},
)
await agent.start()
thread = AgentThread()
thread.service_thread_id = "existing-session-id"
await agent._get_or_create_session(thread) # type: ignore
mock_client.resume_session.assert_called_once()
call_args = mock_client.resume_session.call_args
config = call_args[0][1]
assert "tools" in config
assert "on_permission_request" in config
class TestGithubCopilotAgentMCPServers:
"""Test cases for MCP server configuration."""
async def test_mcp_servers_passed_to_create_session(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that mcp_servers are passed through to create_session config."""
from copilot.types import MCPServerConfig
mcp_servers: dict[str, MCPServerConfig] = {
"filesystem": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "."],
"tools": ["*"],
},
"remote": {
"type": "http",
"url": "https://example.com/mcp",
"tools": ["*"],
},
}
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client,
default_options={"mcp_servers": mcp_servers},
)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert "mcp_servers" in config
assert "filesystem" in config["mcp_servers"]
assert "remote" in config["mcp_servers"]
assert config["mcp_servers"]["filesystem"]["command"] == "npx"
assert config["mcp_servers"]["remote"]["url"] == "https://example.com/mcp"
async def test_mcp_servers_passed_to_resume_session(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that mcp_servers are passed through to resume_session config."""
from copilot.types import MCPServerConfig
mcp_servers: dict[str, MCPServerConfig] = {
"test-server": {
"type": "stdio",
"command": "echo",
"args": ["hello"],
"tools": ["*"],
},
}
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client,
default_options={"mcp_servers": mcp_servers},
)
await agent.start()
thread = AgentThread()
thread.service_thread_id = "existing-session-id"
await agent._get_or_create_session(thread) # type: ignore
mock_client.resume_session.assert_called_once()
call_args = mock_client.resume_session.call_args
config = call_args[0][1]
assert "mcp_servers" in config
assert "test-server" in config["mcp_servers"]
async def test_session_config_excludes_mcp_servers_when_not_set(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config does not include mcp_servers when not set."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert "mcp_servers" not in config
class TestGithubCopilotAgentToolConversion:
"""Test cases for tool conversion."""
async def test_function_tool_conversion(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that function tools are converted to Copilot tools."""
def my_tool(arg: str) -> str:
"""A test tool."""
return f"Result: {arg}"
agent = GithubCopilotAgent(client=mock_client, tools=[my_tool])
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert "tools" in config
assert len(config["tools"]) == 1
assert config["tools"][0].name == "my_tool"
assert config["tools"][0].description == "A test tool."
async def test_tool_handler_returns_success_result(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that tool handler returns success result on successful invocation."""
def my_tool(arg: str) -> str:
"""A test tool."""
return f"Result: {arg}"
agent = GithubCopilotAgent(client=mock_client, tools=[my_tool])
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
copilot_tool = config["tools"][0]
result = await copilot_tool.handler({"arguments": {"arg": "test"}})
assert result["resultType"] == "success"
assert result["textResultForLlm"] == "Result: test"
async def test_tool_handler_returns_failure_result_on_error(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that tool handler returns failure result when invocation raises exception."""
def failing_tool(arg: str) -> str:
"""A tool that fails."""
raise ValueError("Something went wrong")
agent = GithubCopilotAgent(client=mock_client, tools=[failing_tool])
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
copilot_tool = config["tools"][0]
result = await copilot_tool.handler({"arguments": {"arg": "test"}})
assert result["resultType"] == "failure"
assert "Something went wrong" in result["textResultForLlm"]
assert "Something went wrong" in result["error"]
def test_copilot_tool_passthrough(
self,
mock_client: MagicMock,
) -> None:
"""Test that CopilotTool instances are passed through as-is."""
from copilot.types import Tool as CopilotTool
async def tool_handler(invocation: Any) -> Any:
return {"textResultForLlm": "result", "resultType": "success"}
copilot_tool = CopilotTool(
name="direct_tool",
description="A direct CopilotTool",
handler=tool_handler,
parameters={"type": "object", "properties": {}},
)
agent = GithubCopilotAgent(client=mock_client)
result = agent._prepare_tools([copilot_tool]) # type: ignore
assert len(result) == 1
assert result[0] == copilot_tool
def test_mixed_tools_conversion(
self,
mock_client: MagicMock,
) -> None:
"""Test that mixed tool types are handled correctly."""
from agent_framework._tools import ai_function
from copilot.types import Tool as CopilotTool
@ai_function
def my_function(arg: str) -> str:
"""A function tool."""
return arg
async def tool_handler(invocation: Any) -> Any:
return {"textResultForLlm": "result", "resultType": "success"}
copilot_tool = CopilotTool(
name="direct_tool",
description="A direct CopilotTool",
handler=tool_handler,
)
agent = GithubCopilotAgent(client=mock_client)
result = agent._prepare_tools([my_function, copilot_tool]) # type: ignore
assert len(result) == 2
# First tool is converted AIFunction
assert result[0].name == "my_function"
# Second tool is CopilotTool passthrough
assert result[1] == copilot_tool
class TestGithubCopilotAgentErrorHandling:
"""Test cases for error handling."""
async def test_start_raises_on_client_error(self, mock_client: MagicMock) -> None:
"""Test that start raises ServiceException when client fails to start."""
mock_client.start.side_effect = Exception("Connection failed")
agent = GithubCopilotAgent(client=mock_client)
with pytest.raises(ServiceException, match="Failed to start GitHub Copilot client"):
await agent.start()
async def test_run_raises_on_send_error(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that run raises ServiceException when send_and_wait fails."""
mock_session.send_and_wait.side_effect = Exception("Request timeout")
agent = GithubCopilotAgent(client=mock_client)
with pytest.raises(ServiceException, match="GitHub Copilot request failed"):
await agent.run("Hello")
async def test_get_or_create_session_raises_on_create_error(
self,
mock_client: MagicMock,
) -> None:
"""Test that _get_or_create_session raises ServiceException when create_session fails."""
mock_client.create_session.side_effect = Exception("Session creation failed")
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
with pytest.raises(ServiceException, match="Failed to create GitHub Copilot session"):
await agent._get_or_create_session(AgentThread()) # type: ignore
async def test_get_or_create_session_raises_when_client_not_initialized(self) -> None:
"""Test that _get_or_create_session raises ServiceException when client is not initialized."""
agent = GithubCopilotAgent()
# Don't call start() - client remains None
with pytest.raises(ServiceException, match="GitHub Copilot client not initialized"):
await agent._get_or_create_session(AgentThread()) # type: ignore
class TestGithubCopilotAgentPermissions:
"""Test cases for permission handling."""
def test_no_permission_handler_when_not_provided(self) -> None:
"""Test that no handler is set when on_permission_request is not provided."""
agent = GithubCopilotAgent()
assert agent._permission_handler is None # type: ignore
def test_permission_handler_set_when_provided(self) -> None:
"""Test that a handler is set when on_permission_request is provided."""
from copilot.types import PermissionRequest, PermissionRequestResult
def approve_shell(request: PermissionRequest, context: dict[str, str]) -> PermissionRequestResult:
if request.get("kind") == "shell":
return PermissionRequestResult(kind="approved")
return PermissionRequestResult(kind="denied-interactively-by-user")
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
default_options={"on_permission_request": approve_shell}
)
assert agent._permission_handler is not None # type: ignore
async def test_session_config_includes_permission_handler(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config includes permission handler when provided."""
from copilot.types import PermissionRequest, PermissionRequestResult
def approve_shell_read(request: PermissionRequest, context: dict[str, str]) -> PermissionRequestResult:
if request.get("kind") in ("shell", "read"):
return PermissionRequestResult(kind="approved")
return PermissionRequestResult(kind="denied-interactively-by-user")
agent: GithubCopilotAgent[GithubCopilotOptions] = GithubCopilotAgent(
client=mock_client,
default_options={"on_permission_request": approve_shell_read},
)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert "on_permission_request" in config
assert config["on_permission_request"] is not None
async def test_session_config_excludes_permission_handler_when_not_set(
self,
mock_client: MagicMock,
mock_session: MagicMock,
) -> None:
"""Test that session config does not include permission handler when not set."""
agent = GithubCopilotAgent(client=mock_client)
await agent.start()
await agent._get_or_create_session(AgentThread()) # type: ignore
call_args = mock_client.create_session.call_args
config = call_args[0][0]
assert "on_permission_request" not in config