Python: Added Copilot Studio Agent (#722)

* Small fix in dotnet conformance tests

* Added CopilotStudioAgent implementation

* Added examples

* Updated package README

* Small fixes

* Small improvements

* Fixed dotnet tests

* Add unit tests

* Updated tests

* Small updates

* Small test fixes

* Revert "Small test fixes"

This reverts commit 983ac44a70.

* Small fixes in documentation

* Updated test configuration

* Revert "Updated test configuration"

This reverts commit 2a16fea815.

* Small fix

* Reverted TODO item

* Small suppressions

* More fixes

* Small fixes

* Fixed tests

* Removed disallow_any_unimported rule in all packages

* Fixes
This commit is contained in:
Dmytro Struk
2025-09-15 16:21:07 -07:00
committed by GitHub
Unverified
parent 74879489a4
commit e7cd03b32e
34 changed files with 3778 additions and 2181 deletions
@@ -16,28 +16,20 @@ internal static class ActivityProcessor
{
await foreach (IActivity activity in activities.ConfigureAwait(false))
{
switch (activity.Type)
// TODO: Prototype a custom AIContent type for CardActions, where the user is instructed to
// pick from a list of actions.
// The activity text doesn't make sense without the actions, as the message
// is often instructing the user to pick from the provided list of actions.
if (!string.IsNullOrWhiteSpace(activity.Text))
{
case "message":
// For streaming scenarios, we sometimes receive intermediate text via "typing" activities, but not always.
// In some cases the response is also returned multiple times via "typing" activities, so the only reliable
// way to get the final response is to wait for a "message" activity.
// TODO: Prototype a custom AIContent type for CardActions, where the user is instructed to
// pick from a list of actions.
// The activity text doesn't make sense without the actions, as the message
// is often instructing the user to pick from the provided list of actions.
if ((activity.Type == "message" && !streaming) || (activity.Type == "typing" && streaming))
{
yield return CreateChatMessageFromActivity(activity, [new TextContent(activity.Text)]);
break;
case "typing":
case "event":
// TODO: Revisit usage of TextReasoningContent here, to evaluate whether all are really reasoning
// or whether simply an AIContent base type would be more appropriate.
yield return CreateChatMessageFromActivity(activity, [new TextReasoningContent(activity.Text)]);
break;
default:
}
else
{
logger.LogWarning("Unknown activity type '{ActivityType}' received.", activity.Type);
break;
}
}
}
}
+5
View File
@@ -17,3 +17,8 @@ AGENT_FRAMEWORK_ENABLE_SENSITIVE_DATA=true
AGENT_FRAMEWORK_WORKFLOW_ENABLE_OTEL=true
# Mem0
MEM0_API_KEY=""
# Copilot Studio
COPILOTSTUDIOAGENT__ENVIRONMENTID=""
COPILOTSTUDIOAGENT__SCHEMANAME=""
COPILOTSTUDIOAGENT__TENANTID=""
COPILOTSTUDIOAGENT__AGENTAPPID=""
-1
View File
@@ -70,7 +70,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework_azure"]
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) Microsoft Corporation.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE
+97
View File
@@ -0,0 +1,97 @@
# Get Started with Microsoft Agent Framework Copilot Studio
Please install this package as the extra for `agent-framework`:
```bash
pip install agent-framework[copilotstudio]
```
## Copilot Studio Agent
The Copilot Studio agent enables integration with Microsoft Copilot Studio, allowing you to interact with published copilots through the Agent Framework.
### Prerequisites
Before using the Copilot Studio agent, you need:
1. **Copilot Studio Environment**: Access to a Microsoft Copilot Studio environment with a published copilot
2. **App Registration**: An Azure AD App Registration with appropriate permissions for Power Platform API
3. **Environment Configuration**: Set the required environment variables or pass them as parameters
### Environment Variables
The following environment variables are used for configuration:
- `COPILOTSTUDIOAGENT__ENVIRONMENTID` - Your Copilot Studio environment ID
- `COPILOTSTUDIOAGENT__SCHEMANAME` - Your copilot's agent identifier/schema name
- `COPILOTSTUDIOAGENT__AGENTAPPID` - Your App Registration client ID
- `COPILOTSTUDIOAGENT__TENANTID` - Your Azure AD tenant ID
### Basic Usage Example
```python
import asyncio
from agent_framework.copilotstudio import CopilotStudioAgent
async def main():
# Create agent using environment variables
agent = CopilotStudioAgent()
# Run a simple query
result = await agent.run("What is the capital of France?")
print(result)
asyncio.run(main())
```
### Explicit Configuration Example
```python
import asyncio
import os
from agent_framework.copilotstudio import CopilotStudioAgent, acquire_token
from microsoft_agents.copilotstudio.client import ConnectionSettings, CopilotClient, PowerPlatformCloud, AgentType
async def main():
# Acquire authentication token
token = acquire_token(
client_id=os.environ["COPILOTSTUDIOAGENT__AGENTAPPID"],
tenant_id=os.environ["COPILOTSTUDIOAGENT__TENANTID"]
)
# Create connection settings
settings = ConnectionSettings(
environment_id=os.environ["COPILOTSTUDIOAGENT__ENVIRONMENTID"],
agent_identifier=os.environ["COPILOTSTUDIOAGENT__SCHEMANAME"],
cloud=PowerPlatformCloud.PROD,
copilot_agent_type=AgentType.PUBLISHED
)
# Create client and agent
client = CopilotClient(settings=settings, token=token)
agent = CopilotStudioAgent(client=client)
# Run a query
result = await agent.run("What is the capital of Italy?")
print(result)
asyncio.run(main())
```
### Authentication
The package uses MSAL (Microsoft Authentication Library) for authentication with interactive flows when needed. Ensure your App Registration has:
- **API Permissions**: Power Platform API permissions (https://api.powerplatform.com/.default)
- **Redirect URIs**: Configured appropriately for your authentication method
- **Public Client Flows**: Enabled if using interactive authentication
### Examples
For more comprehensive examples, see the [Copilot Studio examples](https://github.com/microsoft/agent-framework/tree/main/python/samples/getting_started/agents/copilotstudio/) which demonstrate:
- Basic non-streaming and streaming execution
- Explicit settings and manual token acquisition
- Different authentication patterns
- Error handling and troubleshooting
@@ -0,0 +1,13 @@
# Copyright (c) Microsoft. All rights reserved.
import importlib.metadata
from ._acquire_token import acquire_token
from ._agent import CopilotStudioAgent
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
__all__ = ["CopilotStudioAgent", "__version__", "acquire_token"]
@@ -0,0 +1,94 @@
# Copyright (c) Microsoft. All rights reserved.
# pyright: reportUnknownMemberType = false
# pyright: reportUnknownVariableType = false
# pyright: reportUnknownArgumentType = false
import logging
from typing import Any
from agent_framework.exceptions import ServiceException
from msal import PublicClientApplication
logger = logging.getLogger(__name__)
# Default scopes for Power Platform API
DEFAULT_SCOPES = ["https://api.powerplatform.com/.default"]
def acquire_token(
client_id: str,
tenant_id: str,
username: str | None = None,
token_cache: Any | None = None,
scopes: list[str] | None = None,
) -> str:
"""Acquire an authentication token using MSAL Public Client Application.
This function attempts to acquire a token silently first (using cached tokens),
and falls back to interactive authentication if needed.
Args:
client_id: The client ID of the application.
tenant_id: The tenant ID for authentication.
username: Optional username to filter accounts.
token_cache: Optional token cache for storing tokens.
scopes: Optional list of scopes. Defaults to Power Platform API scopes.
Returns:
The access token string.
Raises:
ServiceException: If authentication token cannot be acquired.
"""
if not client_id:
raise ServiceException("Client ID is required for token acquisition.")
if not tenant_id:
raise ServiceException("Tenant ID is required for token acquisition.")
authority = f"https://login.microsoftonline.com/{tenant_id}"
target_scopes = scopes or DEFAULT_SCOPES
pca = PublicClientApplication(client_id=client_id, authority=authority, token_cache=token_cache)
accounts = pca.get_accounts(username=username)
token: str | None = None
# Try silent token acquisition first if we have cached accounts
if accounts:
try:
logger.debug("Attempting silent token acquisition")
response = pca.acquire_token_silent(scopes=target_scopes, account=accounts[0])
if response and "access_token" in response:
token = str(response["access_token"]) # type: ignore[assignment]
logger.debug("Successfully acquired token silently")
elif response and "error" in response:
logger.warning(
"Silent token acquisition failed: %s - %s", response.get("error"), response.get("error_description")
)
except Exception as ex:
logger.warning("Silent token acquisition failed with exception: %s", ex)
# Fall back to interactive authentication if silent acquisition failed
if not token:
try:
logger.debug("Attempting interactive token acquisition")
response = pca.acquire_token_interactive(scopes=target_scopes)
if response and "access_token" in response:
token = str(response["access_token"]) # type: ignore[assignment]
logger.debug("Successfully acquired token interactively")
elif response and "error" in response:
logger.error(
"Interactive token acquisition failed: %s - %s",
response.get("error"),
response.get("error_description"),
)
except Exception as ex:
logger.error("Interactive token acquisition failed with exception: %s", ex)
raise ServiceException(f"Failed to acquire authentication token: {ex}") from ex
if not token:
raise ServiceException("Authentication token cannot be acquired.")
return token
@@ -0,0 +1,312 @@
# Copyright (c) Microsoft. All rights reserved.
from collections.abc import AsyncIterable
from typing import Any, ClassVar
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
BaseAgent,
ChatMessage,
Role,
TextContent,
)
from agent_framework._pydantic import AFBaseSettings
from agent_framework.exceptions import ServiceException, ServiceInitializationError
from microsoft_agents.copilotstudio.client import AgentType, ConnectionSettings, CopilotClient, PowerPlatformCloud
from pydantic import ValidationError
from ._acquire_token import acquire_token
class CopilotStudioSettings(AFBaseSettings):
"""Copilot Studio model settings.
The settings are first loaded from environment variables with the prefix 'COPILOTSTUDIOAGENT__'.
If the environment variables are not found, the settings can be loaded from a .env file
with the encoding 'utf-8'. If the settings are not found in the .env file, the settings
are ignored; however, validation will fail alerting that the settings are missing.
Attributes:
environmentid: Environment ID of environment with the Copilot Studio App..
(Env var COPILOTSTUDIOAGENT__ENVIRONMENTID)
schemaname: The agent identifier or schema name of the Copilot to use.
(Env var COPILOTSTUDIOAGENT__SCHEMANAME)
agentappid: The app ID of the App Registration used to login.
(Env var COPILOTSTUDIOAGENT__AGENTAPPID)
tenantid: The tenant ID of the App Registration used to login.
(Env var COPILOTSTUDIOAGENT__TENANTID)
Parameters:
env_file_path: If provided, the .env settings are read from this file path location.
env_file_encoding: The encoding of the .env file, defaults to 'utf-8'.
"""
env_prefix: ClassVar[str] = "COPILOTSTUDIOAGENT__"
environmentid: str | None = None
schemaname: str | None = None
agentappid: str | None = None
tenantid: str | None = None
class CopilotStudioAgent(BaseAgent):
"""A Copilot Studio Agent."""
client: CopilotClient
settings: ConnectionSettings | None
token: str | None
cloud: PowerPlatformCloud | None
agent_type: AgentType | None
custom_power_platform_cloud: str | None
username: str | None
token_cache: Any | None
scopes: list[str] | None
def __init__(
self,
client: CopilotClient | None = None,
settings: ConnectionSettings | None = None,
environment_id: str | None = None,
agent_identifier: str | None = None,
client_id: str | None = None,
tenant_id: str | None = None,
token: str | None = None,
cloud: PowerPlatformCloud | None = None,
agent_type: AgentType | None = None,
custom_power_platform_cloud: str | None = None,
username: str | None = None,
token_cache: Any | None = None,
scopes: list[str] | None = None,
env_file_path: str | None = None,
env_file_encoding: str | None = None,
) -> None:
"""Initialize the Copilot Studio Agent.
Args:
client: Optional pre-configured CopilotClient instance. If not provided,
a new client will be created using the other parameters.
settings: Optional pre-configured ConnectionSettings. If not provided,
settings will be created from the other parameters.
environment_id: Environment ID of the Power Platform environment containing
the Copilot Studio app. Can also be set via COPILOTSTUDIOAGENT__ENVIRONMENTID
environment variable.
agent_identifier: The agent identifier or schema name of the Copilot to use.
Can also be set via COPILOTSTUDIOAGENT__SCHEMANAME environment variable.
client_id: The app ID of the App Registration used for authentication.
Can also be set via COPILOTSTUDIOAGENT__AGENTAPPID environment variable.
tenant_id: The tenant ID of the App Registration used for authentication.
Can also be set via COPILOTSTUDIOAGENT__TENANTID environment variable.
token: Optional pre-acquired authentication token. If not provided,
token acquisition will be attempted using MSAL.
cloud: The Power Platform cloud to use (Public, GCC, etc.).
agent_type: The type of Copilot Studio agent (Copilot, Agent, etc.).
custom_power_platform_cloud: Custom Power Platform cloud URL if using
a custom environment.
username: Optional username for token acquisition.
token_cache: Optional token cache for storing authentication tokens.
scopes: Optional list of authentication scopes. Defaults to Power Platform
API scopes if not provided.
env_file_path: Optional path to .env file for loading configuration.
env_file_encoding: Encoding of the .env file, defaults to 'utf-8'.
Raises:
ServiceInitializationError: If required configuration is missing or invalid.
"""
if not client:
try:
copilot_studio_settings = CopilotStudioSettings(
environmentid=environment_id,
schemaname=agent_identifier,
agentappid=client_id,
tenantid=tenant_id,
env_file_path=env_file_path,
env_file_encoding=env_file_encoding,
)
except ValidationError as ex:
raise ServiceInitializationError("Failed to create Copilot Studio settings.", ex) from ex
if not settings:
if not copilot_studio_settings.environmentid:
raise ServiceInitializationError(
"Copilot Studio environment ID is required. Set via 'environment_id' parameter "
"or 'COPILOTSTUDIOAGENT__ENVIRONMENTID' environment variable."
)
if not copilot_studio_settings.schemaname:
raise ServiceInitializationError(
"Copilot Studio agent identifier/schema name is required. Set via 'agent_identifier' parameter "
"or 'COPILOTSTUDIOAGENT__SCHEMANAME' environment variable."
)
settings = ConnectionSettings(
environment_id=copilot_studio_settings.environmentid,
agent_identifier=copilot_studio_settings.schemaname,
cloud=cloud,
copilot_agent_type=agent_type,
custom_power_platform_cloud=custom_power_platform_cloud,
)
if not token:
if not copilot_studio_settings.agentappid:
raise ServiceInitializationError(
"Copilot Studio client ID is required. Set via 'client_id' parameter "
"or 'COPILOTSTUDIOAGENT__AGENTAPPID' environment variable."
)
if not copilot_studio_settings.tenantid:
raise ServiceInitializationError(
"Copilot Studio tenant ID is required. Set via 'tenant_id' parameter "
"or 'COPILOTSTUDIOAGENT__TENANTID' environment variable."
)
token = acquire_token(
client_id=copilot_studio_settings.agentappid,
tenant_id=copilot_studio_settings.tenantid,
username=username,
token_cache=token_cache,
scopes=scopes,
)
client = CopilotClient(settings=settings, token=token)
super().__init__(
client=client, # type: ignore[reportCallIssue]
settings=settings, # type: ignore[reportCallIssue]
token=token, # type: ignore[reportCallIssue]
cloud=cloud, # type: ignore[reportCallIssue]
agent_type=agent_type, # type: ignore[reportCallIssue]
custom_power_platform_cloud=custom_power_platform_cloud, # type: ignore[reportCallIssue]
username=username, # type: ignore[reportCallIssue]
token_cache=token_cache, # type: ignore[reportCallIssue]
scopes=scopes, # type: ignore[reportCallIssue]
)
async def run(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AgentRunResponse:
"""Get a response from the agent.
This method returns the final result of the agent's execution
as a single AgentRunResponse object. The caller is blocked until
the final result is available.
Note: For streaming responses, use the run_stream method, which returns
intermediate steps and the final result as a stream of AgentRunResponseUpdate
objects. Streaming only the final result is not feasible because the timing of
the final result's availability is unknown, and blocking the caller until then
is undesirable in streaming scenarios.
Args:
messages: The message(s) to send to the agent.
thread: The conversation thread associated with the message(s).
kwargs: Additional keyword arguments.
Returns:
An agent response item.
"""
if not thread:
thread = self.get_new_thread()
thread.service_thread_id = await self._start_new_conversation()
input_messages = self._normalize_messages(messages)
question = "\n".join([message.text for message in input_messages])
activities = self.client.ask_question(question, thread.service_thread_id)
response_messages: list[ChatMessage] = []
response_id: str | None = None
response_messages = [message async for message in self._process_activities(activities, streaming=False)]
response_id = response_messages[0].message_id if response_messages else None
return AgentRunResponse(messages=response_messages, response_id=response_id)
async def run_stream(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
*,
thread: AgentThread | None = None,
**kwargs: Any,
) -> AsyncIterable[AgentRunResponseUpdate]:
"""Run the agent as a stream.
This method will return the intermediate steps and final results of the
agent's execution as a stream of AgentRunResponseUpdate objects to the caller.
Note: An AgentRunResponseUpdate object contains a chunk of a message.
Args:
messages: The message(s) to send to the agent.
thread: The conversation thread associated with the message(s).
kwargs: Additional keyword arguments.
Yields:
An agent response item.
"""
if not thread:
thread = self.get_new_thread()
thread.service_thread_id = await self._start_new_conversation()
input_messages = self._normalize_messages(messages)
question = "\n".join([message.text for message in input_messages])
activities = self.client.ask_question(question, thread.service_thread_id)
async for message in self._process_activities(activities, streaming=True):
yield AgentRunResponseUpdate(
role=message.role,
contents=message.contents,
additional_properties=message.additional_properties,
author_name=message.author_name,
raw_representation=message.raw_representation,
response_id=message.message_id,
message_id=message.message_id,
)
async def _start_new_conversation(self) -> str:
"""Start a new conversation with the Copilot Studio agent.
Returns:
The conversation ID for the new conversation.
Raises:
ServiceException: If the conversation could not be started.
"""
conversation_id: str | None = None
async for activity in self.client.start_conversation(emit_start_conversation_event=True):
if activity and activity.conversation and activity.conversation.id:
conversation_id = activity.conversation.id
if not conversation_id:
raise ServiceException("Failed to start a new conversation.")
return conversation_id
async def _process_activities(self, activities: AsyncIterable[Any], streaming: bool) -> AsyncIterable[ChatMessage]:
"""Process activities from the Copilot Studio agent.
Args:
activities: Stream of activities from the agent.
streaming: Whether to process activities for streaming (typing activities)
or non-streaming (message activities) responses.
Yields:
ChatMessage objects created from the activities.
"""
async for activity in activities:
if activity.text and (
(activity.type == "message" and not streaming) or (activity.type == "typing" and streaming)
):
yield ChatMessage(
role=Role.ASSISTANT,
contents=[TextContent(activity.text)],
author_name=activity.from_property.name if activity.from_property else None,
message_id=activity.id,
raw_representation=activity,
)
@@ -0,0 +1,93 @@
[project]
name = "agent-framework-copilotstudio"
description = "Copilot Studio integration for Microsoft Agent Framework."
authors = [{ name = "Microsoft", email = "SK-Support@microsoft.com"}]
readme = "README.md"
requires-python = ">=3.10"
version = "0.1.0b1"
license-files = ["LICENSE"]
urls.homepage = "https://learn.microsoft.com/en-us/semantic-kernel/overview/"
urls.source = "https://github.com/microsoft/agent-framework/tree/main/python"
urls.release_notes = "https://github.com/microsoft/agent-framework/releases?q=tag%3Apython-1&expanded=true"
urls.issues = "https://github.com/microsoft/agent-framework/issues"
classifiers = [
"License :: OSI Approved :: MIT License",
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Framework :: Pydantic :: 2",
"Typing :: Typed",
]
dependencies = [
"agent-framework",
"microsoft-agents-copilotstudio-client>=0.3.1",
]
[tool.uv]
prerelease = "if-necessary-or-explicit"
environments = [
"sys_platform == 'darwin'",
"sys_platform == 'linux'",
"sys_platform == 'win32'"
]
[tool.uv-dynamic-versioning]
fallback-version = "0.0.0"
[tool.pytest.ini_options]
testpaths = 'tests'
addopts = "-ra -q -r fEX"
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
filterwarnings = [
"ignore:Support for class-based `config` is deprecated:DeprecationWarning:pydantic.*"
]
timeout = 120
[tool.ruff]
extend = "../../pyproject.toml"
[tool.coverage.run]
omit = [
"**/__init__.py"
]
[tool.pyright]
extend = "../../pyproject.toml"
exclude = ['tests']
[tool.mypy]
plugins = ['pydantic.mypy']
strict = true
python_version = "3.10"
ignore_missing_imports = true
disallow_untyped_defs = true
no_implicit_optional = true
check_untyped_defs = true
warn_return_any = true
show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
[tool.bandit]
targets = ["agent_framework_copilotstudio"]
exclude_dirs = ["tests"]
[tool.poe]
executor.type = "uv"
include = "../../shared_tasks.toml"
[tool.poe.tasks]
mypy = "mypy --config-file $POE_ROOT/pyproject.toml agent_framework_copilotstudio"
test = "pytest --cov=agent_framework_copilotstudio --cov-report=term-missing:skip-covered tests"
[tool.uv.build-backend]
module-name = "agent_framework_copilotstudio"
module-root = ""
[build-system]
requires = ["uv_build>=0.8.2,<0.9.0"]
build-backend = "uv_build"
@@ -0,0 +1,91 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any
from unittest.mock import MagicMock
import pytest
from microsoft_agents.copilotstudio.client import CopilotClient
@pytest.fixture
def exclude_list(request: Any) -> list[str]:
"""Fixture that returns a list of environment variables to exclude."""
return request.param if hasattr(request, "param") else []
@pytest.fixture
def override_env_param_dict(request: Any) -> dict[str, str]:
"""Fixture that returns a dict of environment variables to override."""
return request.param if hasattr(request, "param") else {}
@pytest.fixture()
def copilot_studio_unit_test_env(monkeypatch, exclude_list, override_env_param_dict): # type: ignore
"""Fixture to set environment variables for CopilotStudioSettings."""
if exclude_list is None:
exclude_list = []
if override_env_param_dict is None:
override_env_param_dict = {}
env_vars = {
"COPILOTSTUDIOAGENT__ENVIRONMENTID": "test-environment-id",
"COPILOTSTUDIOAGENT__SCHEMANAME": "test-schema-name",
"COPILOTSTUDIOAGENT__AGENTAPPID": "test-client-id",
"COPILOTSTUDIOAGENT__TENANTID": "test-tenant-id",
}
env_vars.update(override_env_param_dict) # type: ignore
for key, value in env_vars.items():
if key in exclude_list:
monkeypatch.delenv(key, raising=False) # type: ignore
continue
monkeypatch.setenv(key, value) # type: ignore
return env_vars
@pytest.fixture
def mock_copilot_client() -> MagicMock:
"""Mock CopilotClient for testing."""
return MagicMock(spec=CopilotClient)
@pytest.fixture
def mock_pca() -> MagicMock:
"""Mock PublicClientApplication for testing."""
mock_pca = MagicMock()
# Mock successful token response
mock_token_response = {
"access_token": "test-access-token-12345",
"token_type": "Bearer",
"expires_in": 3600,
}
mock_pca.get_accounts.return_value = []
mock_pca.acquire_token_interactive.return_value = mock_token_response
mock_pca.acquire_token_silent.return_value = mock_token_response
return mock_pca
@pytest.fixture
def mock_activity() -> MagicMock:
"""Mock Activity for testing."""
mock_activity = MagicMock()
mock_activity.text = "Test response"
mock_activity.type = "message"
mock_activity.id = "test-activity-id"
mock_activity.from_property.name = "Test Bot"
return mock_activity
@pytest.fixture
def mock_conversation() -> MagicMock:
"""Mock conversation for testing."""
mock_conversation = MagicMock()
mock_conversation.id = "test-conversation-id"
return mock_conversation
@@ -0,0 +1,243 @@
# Copyright (c) Microsoft. All rights reserved.
from unittest.mock import MagicMock, patch
import pytest
from agent_framework.exceptions import ServiceException
from agent_framework_copilotstudio._acquire_token import DEFAULT_SCOPES, acquire_token
class TestAcquireToken:
"""Test class for token acquisition functionality."""
def test_acquire_token_missing_client_id(self) -> None:
"""Test that acquire_token raises ServiceException when client_id is missing."""
with pytest.raises(ServiceException, match="Client ID is required for token acquisition"):
acquire_token(client_id="", tenant_id="test-tenant-id")
def test_acquire_token_missing_tenant_id(self) -> None:
"""Test that acquire_token raises ServiceException when tenant_id is missing."""
with pytest.raises(ServiceException, match="Tenant ID is required for token acquisition"):
acquire_token(client_id="test-client-id", tenant_id="")
def test_acquire_token_none_client_id(self) -> None:
"""Test that acquire_token raises ServiceException when client_id is None."""
with pytest.raises(ServiceException, match="Client ID is required for token acquisition"):
acquire_token(client_id=None, tenant_id="test-tenant-id") # type: ignore
def test_acquire_token_none_tenant_id(self) -> None:
"""Test that acquire_token raises ServiceException when tenant_id is None."""
with pytest.raises(ServiceException, match="Tenant ID is required for token acquisition"):
acquire_token(client_id="test-client-id", tenant_id=None) # type: ignore
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_silent_success(self, mock_pca_class: MagicMock) -> None:
"""Test successful silent token acquisition."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
mock_token_response = {"access_token": "test-access-token-12345"}
mock_pca.acquire_token_silent.return_value = mock_token_response
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
assert result == "test-access-token-12345"
mock_pca_class.assert_called_once_with(
client_id="test-client-id",
authority="https://login.microsoftonline.com/test-tenant-id",
token_cache=None,
)
mock_pca.get_accounts.assert_called_once_with(username=None)
mock_pca.acquire_token_silent.assert_called_once_with(scopes=DEFAULT_SCOPES, account=mock_account)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_silent_success_with_username(self, mock_pca_class: MagicMock) -> None:
"""Test successful silent token acquisition with username."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
mock_token_response = {"access_token": "test-access-token-12345"}
mock_pca.acquire_token_silent.return_value = mock_token_response
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
username="test-user@example.com",
)
assert result == "test-access-token-12345"
mock_pca.get_accounts.assert_called_once_with(username="test-user@example.com")
mock_pca.acquire_token_silent.assert_called_once_with(scopes=DEFAULT_SCOPES, account=mock_account)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_silent_success_with_custom_scopes(self, mock_pca_class: MagicMock) -> None:
"""Test successful silent token acquisition with custom scopes."""
# Setup
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
mock_token_response = {"access_token": "test-access-token-12345"}
mock_pca.acquire_token_silent.return_value = mock_token_response
custom_scopes = ["https://custom.api.com/.default"]
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
scopes=custom_scopes,
)
assert result == "test-access-token-12345"
mock_pca.acquire_token_silent.assert_called_once_with(scopes=custom_scopes, account=mock_account)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_interactive_success_no_accounts(self, mock_pca_class: MagicMock) -> None:
"""Test successful interactive token acquisition when no cached accounts exist."""
# Setup
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_pca.get_accounts.return_value = [] # No cached accounts
mock_token_response = {"access_token": "test-interactive-token-67890"}
mock_pca.acquire_token_interactive.return_value = mock_token_response
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
assert result == "test-interactive-token-67890"
mock_pca.acquire_token_interactive.assert_called_once_with(scopes=DEFAULT_SCOPES)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_fallback_to_interactive_after_silent_fails(self, mock_pca_class: MagicMock) -> None:
"""Test fallback to interactive authentication when silent acquisition fails."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
# Silent acquisition fails with error response
mock_silent_error_response = {"error": "invalid_grant", "error_description": "Token expired"}
mock_pca.acquire_token_silent.return_value = mock_silent_error_response
# Interactive acquisition succeeds
mock_interactive_response = {"access_token": "test-interactive-token-67890"}
mock_pca.acquire_token_interactive.return_value = mock_interactive_response
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
assert result == "test-interactive-token-67890"
mock_pca.acquire_token_silent.assert_called_once_with(scopes=DEFAULT_SCOPES, account=mock_account)
mock_pca.acquire_token_interactive.assert_called_once_with(scopes=DEFAULT_SCOPES)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_fallback_to_interactive_after_silent_exception(self, mock_pca_class: MagicMock) -> None:
"""Test fallback to interactive authentication when silent acquisition throws exception."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
# Silent acquisition throws exception
mock_pca.acquire_token_silent.side_effect = Exception("Network error")
# Interactive acquisition succeeds
mock_interactive_response = {"access_token": "test-interactive-token-67890"}
mock_pca.acquire_token_interactive.return_value = mock_interactive_response
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
assert result == "test-interactive-token-67890"
mock_pca.acquire_token_silent.assert_called_once_with(scopes=DEFAULT_SCOPES, account=mock_account)
mock_pca.acquire_token_interactive.assert_called_once_with(scopes=DEFAULT_SCOPES)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_interactive_error_response(self, mock_pca_class: MagicMock) -> None:
"""Test that acquire_token handles error responses from interactive authentication."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_pca.get_accounts.return_value = [] # No cached accounts
# Interactive acquisition returns error
mock_error_response = {"error": "access_denied", "error_description": "User denied consent"}
mock_pca.acquire_token_interactive.return_value = mock_error_response
with pytest.raises(ServiceException, match="Authentication token cannot be acquired"):
acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_interactive_exception(self, mock_pca_class: MagicMock) -> None:
"""Test that acquire_token handles exceptions from interactive authentication."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_pca.get_accounts.return_value = [] # No cached accounts
# Interactive acquisition throws exception
mock_pca.acquire_token_interactive.side_effect = Exception("Authentication service unavailable")
with pytest.raises(ServiceException, match="Failed to acquire authentication token"):
acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
)
@patch("agent_framework_copilotstudio._acquire_token.PublicClientApplication")
def test_acquire_token_with_token_cache(self, mock_pca_class: MagicMock) -> None:
"""Test acquire_token with custom token cache."""
mock_pca = MagicMock()
mock_pca_class.return_value = mock_pca
mock_account = MagicMock()
mock_pca.get_accounts.return_value = [mock_account]
mock_token_response = {"access_token": "test-cached-token"}
mock_pca.acquire_token_silent.return_value = mock_token_response
mock_token_cache = MagicMock()
result = acquire_token(
client_id="test-client-id",
tenant_id="test-tenant-id",
token_cache=mock_token_cache,
)
assert result == "test-cached-token"
mock_pca_class.assert_called_once_with(
client_id="test-client-id",
authority="https://login.microsoftonline.com/test-tenant-id",
token_cache=mock_token_cache,
)
def test_default_scopes_constant(self) -> None:
"""Test that DEFAULT_SCOPES constant is properly defined."""
assert DEFAULT_SCOPES == ["https://api.powerplatform.com/.default"]
assert isinstance(DEFAULT_SCOPES, list)
assert len(DEFAULT_SCOPES) == 1
@@ -0,0 +1,325 @@
# Copyright (c) Microsoft. All rights reserved.
from typing import Any
from unittest.mock import MagicMock, patch
import pytest
from agent_framework import (
AgentRunResponse,
AgentRunResponseUpdate,
AgentThread,
ChatMessage,
Role,
TextContent,
)
from agent_framework.exceptions import ServiceException, ServiceInitializationError
from microsoft_agents.copilotstudio.client import CopilotClient
from agent_framework_copilotstudio import CopilotStudioAgent
def create_async_generator(items: list[Any]) -> Any:
"""Helper to create async generator mock."""
async def async_gen() -> Any:
for item in items:
yield item
return async_gen()
class TestCopilotStudioAgent:
"""Test cases for CopilotStudioAgent."""
@pytest.fixture
def mock_activity(self) -> MagicMock:
activity = MagicMock()
activity.text = "Test response"
activity.type = "message"
activity.id = "test-id"
activity.from_property.name = "Test Bot"
return activity
@pytest.fixture
def mock_copilot_client(self) -> MagicMock:
return MagicMock(spec=CopilotClient)
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
@patch("agent_framework_copilotstudio._agent.CopilotStudioSettings")
def test_init_missing_environment_id(self, mock_settings: MagicMock, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
mock_settings.return_value.environmentid = None
mock_settings.return_value.schemaname = "test-bot"
mock_settings.return_value.tenantid = "test-tenant"
mock_settings.return_value.agentappid = "test-client"
with pytest.raises(ServiceInitializationError, match="environment ID is required"):
CopilotStudioAgent()
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
@patch("agent_framework_copilotstudio._agent.CopilotStudioSettings")
def test_init_missing_bot_id(self, mock_settings: MagicMock, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
mock_settings.return_value.environmentid = "test-env"
mock_settings.return_value.schemaname = None
mock_settings.return_value.tenantid = "test-tenant"
mock_settings.return_value.agentappid = "test-client"
with pytest.raises(ServiceInitializationError, match="agent identifier"):
CopilotStudioAgent()
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
@patch("agent_framework_copilotstudio._agent.CopilotStudioSettings")
def test_init_missing_tenant_id(self, mock_settings: MagicMock, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
mock_settings.return_value.environmentid = "test-env"
mock_settings.return_value.schemaname = "test-bot"
mock_settings.return_value.tenantid = None
mock_settings.return_value.agentappid = "test-client"
with pytest.raises(ServiceInitializationError, match="tenant ID is required"):
CopilotStudioAgent()
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
@patch("agent_framework_copilotstudio._agent.CopilotStudioSettings")
def test_init_missing_client_id(self, mock_settings: MagicMock, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
mock_settings.return_value.environmentid = "test-env"
mock_settings.return_value.schemaname = "test-bot"
mock_settings.return_value.tenantid = "test-tenant"
mock_settings.return_value.agentappid = None
with pytest.raises(ServiceInitializationError, match="client ID is required"):
CopilotStudioAgent()
def test_init_with_client(self, mock_copilot_client: MagicMock) -> None:
agent = CopilotStudioAgent(client=mock_copilot_client)
assert agent.client == mock_copilot_client
assert agent.id is not None
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
def test_init_empty_environment_id(self, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
with patch("agent_framework_copilotstudio._agent.CopilotStudioSettings") as mock_settings:
mock_settings.return_value.environmentid = ""
mock_settings.return_value.schemaname = "test-bot"
mock_settings.return_value.tenantid = "test-tenant"
mock_settings.return_value.agentappid = "test-client"
with pytest.raises(ServiceInitializationError, match="environment ID is required"):
CopilotStudioAgent()
@patch("agent_framework_copilotstudio._acquire_token.acquire_token")
def test_init_empty_schema_name(self, mock_acquire_token: MagicMock) -> None:
mock_acquire_token.return_value = "fake-token"
with patch("agent_framework_copilotstudio._agent.CopilotStudioSettings") as mock_settings:
mock_settings.return_value.environmentid = "test-env"
mock_settings.return_value.schemaname = ""
mock_settings.return_value.tenantid = "test-tenant"
mock_settings.return_value.agentappid = "test-client"
with pytest.raises(ServiceInitializationError, match="agent identifier"):
CopilotStudioAgent()
@pytest.mark.asyncio
async def test_run_with_string_message(self, mock_copilot_client: MagicMock, mock_activity: MagicMock) -> None:
"""Test run method with string message."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([mock_activity])
response = await agent.run("test message")
assert isinstance(response, AgentRunResponse)
assert len(response.messages) == 1
content = response.messages[0].contents[0]
assert isinstance(content, TextContent)
assert content.text == "Test response"
assert response.messages[0].role == Role.ASSISTANT
@pytest.mark.asyncio
async def test_run_with_chat_message(self, mock_copilot_client: MagicMock, mock_activity: MagicMock) -> None:
"""Test run method with ChatMessage."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([mock_activity])
chat_message = ChatMessage(role=Role.USER, contents=[TextContent("test message")])
response = await agent.run(chat_message)
assert isinstance(response, AgentRunResponse)
assert len(response.messages) == 1
content = response.messages[0].contents[0]
assert isinstance(content, TextContent)
assert content.text == "Test response"
assert response.messages[0].role == Role.ASSISTANT
@pytest.mark.asyncio
async def test_run_with_thread(self, mock_copilot_client: MagicMock, mock_activity: MagicMock) -> None:
"""Test run method with existing thread."""
agent = CopilotStudioAgent(client=mock_copilot_client)
thread = AgentThread()
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([mock_activity])
response = await agent.run("test message", thread=thread)
assert isinstance(response, AgentRunResponse)
assert len(response.messages) == 1
assert thread.service_thread_id == "test-conversation-id"
@pytest.mark.asyncio
async def test_run_start_conversation_failure(self, mock_copilot_client: MagicMock) -> None:
"""Test run method when conversation start fails."""
agent = CopilotStudioAgent(client=mock_copilot_client)
mock_copilot_client.start_conversation.return_value = create_async_generator([])
with pytest.raises(ServiceException, match="Failed to start a new conversation"):
await agent.run("test message")
@pytest.mark.asyncio
async def test_run_stream_with_string_message(self, mock_copilot_client: MagicMock) -> None:
"""Test run_stream method with string message."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
typing_activity = MagicMock()
typing_activity.text = "Streaming response"
typing_activity.type = "typing"
typing_activity.id = "test-typing-id"
typing_activity.from_property.name = "Test Bot"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([typing_activity])
response_count = 0
async for response in agent.run_stream("test message"):
assert isinstance(response, AgentRunResponseUpdate)
content = response.contents[0]
assert isinstance(content, TextContent)
assert content.text == "Streaming response"
response_count += 1
assert response_count == 1
@pytest.mark.asyncio
async def test_run_stream_with_thread(self, mock_copilot_client: MagicMock) -> None:
"""Test run_stream method with existing thread."""
agent = CopilotStudioAgent(client=mock_copilot_client)
thread = AgentThread()
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
typing_activity = MagicMock()
typing_activity.text = "Streaming response"
typing_activity.type = "typing"
typing_activity.id = "test-typing-id"
typing_activity.from_property.name = "Test Bot"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([typing_activity])
response_count = 0
async for response in agent.run_stream("test message", thread=thread):
assert isinstance(response, AgentRunResponseUpdate)
content = response.contents[0]
assert isinstance(content, TextContent)
assert content.text == "Streaming response"
response_count += 1
assert response_count == 1
assert thread.service_thread_id == "test-conversation-id"
@pytest.mark.asyncio
async def test_run_stream_no_typing_activity(self, mock_copilot_client: MagicMock) -> None:
"""Test run_stream method with non-typing activity."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
message_activity = MagicMock()
message_activity.text = "Message response"
message_activity.type = "message"
message_activity.id = "test-message-id"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([message_activity])
response_count = 0
async for _response in agent.run_stream("test message"):
response_count += 1
assert response_count == 0
@pytest.mark.asyncio
async def test_run_multiple_activities(self, mock_copilot_client: MagicMock) -> None:
"""Test run method with multiple message activities."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
activity1 = MagicMock()
activity1.text = "First response"
activity1.type = "message"
activity1.id = "test-id-1"
activity1.from_property.name = "Test Bot"
activity2 = MagicMock()
activity2.text = "Second response"
activity2.type = "message"
activity2.id = "test-id-2"
activity2.from_property.name = "Test Bot"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([activity1, activity2])
response = await agent.run("test message")
assert isinstance(response, AgentRunResponse)
assert len(response.messages) == 2
@pytest.mark.asyncio
async def test_run_list_of_messages(self, mock_copilot_client: MagicMock, mock_activity: MagicMock) -> None:
"""Test run method with list of messages."""
agent = CopilotStudioAgent(client=mock_copilot_client)
conversation_activity = MagicMock()
conversation_activity.conversation.id = "test-conversation-id"
mock_copilot_client.start_conversation.return_value = create_async_generator([conversation_activity])
mock_copilot_client.ask_question.return_value = create_async_generator([mock_activity])
messages = ["Hello", "How are you?"]
response = await agent.run(messages)
assert isinstance(response, AgentRunResponse)
assert len(response.messages) == 1
@pytest.mark.asyncio
async def test_run_stream_start_conversation_failure(self, mock_copilot_client: MagicMock) -> None:
"""Test run_stream method when conversation start fails."""
agent = CopilotStudioAgent(client=mock_copilot_client)
mock_copilot_client.start_conversation.return_value = create_async_generator([])
with pytest.raises(ServiceException, match="Failed to start a new conversation"):
async for _ in agent.run_stream("test message"):
pass
@@ -0,0 +1,28 @@
# Copyright (c) Microsoft. All rights reserved.
def test_self_through_main() -> None:
try:
from agent_framework.copilotstudio import __version__
except ImportError:
__version__ = None
assert __version__ is not None
def test_self() -> None:
try:
from agent_framework_copilotstudio import __version__
except ImportError:
__version__ = None
assert __version__ is not None
def test_agent_framework() -> None:
try:
from agent_framework import __version__
except ImportError:
__version__ = None
assert __version__ is not None
-1
View File
@@ -72,7 +72,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework_foundry"]
@@ -421,23 +421,6 @@ def test_foundry_chat_client_create_run_options_with_image_content(mock_ai_proje
assert len(message.content) == 1
def test_foundry_chat_client_convert_function_results_to_tool_output(mock_ai_project_client: MagicMock) -> None:
"""Test _convert_function_results_to_tool_output method."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
function_results = [
FunctionResultContent(call_id='["run_123", "call_456"]', result="Result 1"),
FunctionResultContent(call_id='["run_123", "call_789"]', result="Result 2"),
]
run_id, tool_outputs = chat_client._convert_function_results_to_tool_output(function_results) # type: ignore
assert run_id == "run_123"
assert tool_outputs is not None
assert len(tool_outputs) == 2
def test_foundry_chat_client_convert_function_results_to_tool_output_none(mock_ai_project_client: MagicMock) -> None:
"""Test _convert_function_results_to_tool_output with None input."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client)
@@ -566,30 +549,6 @@ async def test_foundry_chat_client_get_agent_id_or_create_with_run_options(
assert "response_format" in call_args
async def test_foundry_chat_client_create_agent_stream_with_tool_results(mock_ai_project_client: MagicMock) -> None:
"""Test _create_agent_stream when tool results match active thread run."""
chat_client = create_test_foundry_chat_client(
mock_ai_project_client, agent_id="test-agent", thread_id="test-thread"
)
mock_thread_run = MagicMock()
mock_thread_run.id = "run_123"
mock_thread_run.thread_id = "test-thread"
with patch.object(chat_client, "_get_active_thread_run", return_value=mock_thread_run):
tool_results = [FunctionResultContent(call_id='["run_123", "call_456"]', result="Result")]
mock_handler = MagicMock()
mock_ai_project_client.agents.runs.submit_tool_outputs_stream = AsyncMock(return_value=None)
with patch("agent_framework_foundry._chat_client.AsyncAgentEventHandler", return_value=mock_handler):
stream, thread_id = await chat_client._create_agent_stream("test-thread", "test-agent", {}, tool_results) # type: ignore
assert stream is mock_handler
assert thread_id == "test-thread"
mock_ai_project_client.agents.runs.submit_tool_outputs_stream.assert_called_once()
async def test_foundry_chat_client_prepare_thread_cancels_active_run(mock_ai_project_client: MagicMock) -> None:
"""Test _prepare_thread cancels active thread run when provided."""
chat_client = create_test_foundry_chat_client(mock_ai_project_client, agent_id="test-agent")
+15 -15
View File
@@ -170,6 +170,21 @@ class BaseAgent(AFBaseModel):
await deserialize_thread_state(thread, serialized_thread, **kwargs)
return thread
def _normalize_messages(
self,
messages: str | ChatMessage | Sequence[str] | Sequence[ChatMessage] | None = None,
) -> list[ChatMessage]:
if messages is None:
return []
if isinstance(messages, str):
return [ChatMessage(role=Role.USER, text=messages)]
if isinstance(messages, ChatMessage):
return [messages]
return [ChatMessage(role=Role.USER, text=msg) if isinstance(msg, str) else msg for msg in messages]
# region ChatAgent
@@ -669,21 +684,6 @@ class ChatAgent(BaseAgent):
messages.extend(input_messages or [])
return thread, messages
def _normalize_messages(
self,
messages: str | ChatMessage | Sequence[str] | Sequence[ChatMessage] | None = None,
) -> list[ChatMessage]:
if messages is None:
return []
if isinstance(messages, str):
return [ChatMessage(role=Role.USER, text=messages)]
if isinstance(messages, ChatMessage):
return [messages]
return [ChatMessage(role=Role.USER, text=msg) if isinstance(msg, str) else msg for msg in messages]
def _get_agent_name(self) -> str:
return self.name or "UnnamedAgent"
@@ -0,0 +1,24 @@
# Copyright (c) Microsoft. All rights reserved.
import importlib
from typing import Any
PACKAGE_NAME = "agent_framework_copilotstudio"
PACKAGE_EXTRA = "copilotstudio"
_IMPORTS = ["CopilotStudioAgent", "__version__", "acquire_token"]
def __getattr__(name: str) -> Any:
if name in _IMPORTS:
try:
return getattr(importlib.import_module(PACKAGE_NAME), name)
except ModuleNotFoundError as exc:
raise ModuleNotFoundError(
f"The '{PACKAGE_EXTRA}' extra is not installed, "
f"please do `pip install agent-framework[{PACKAGE_EXTRA}]`"
) from exc
raise AttributeError(f"Module {PACKAGE_NAME} has no attribute {name}.")
def __dir__() -> list[str]:
return _IMPORTS
@@ -0,0 +1,5 @@
# Copyright (c) Microsoft. All rights reserved.
from agent_framework_copilotstudio import CopilotStudioAgent, __version__, acquire_token
__all__ = ["CopilotStudioAgent", "__version__", "acquire_token"]
-1
View File
@@ -94,7 +94,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework"]
@@ -6,6 +6,7 @@ from typing import Any, Final
from agent_framework import ChatMessage, Context, ContextProvider, TextContent
from agent_framework.exceptions import ServiceInitializationError
from mem0 import AsyncMemoryClient
from pydantic import PrivateAttr
if sys.version_info >= (3, 11):
@@ -18,6 +19,7 @@ DEFAULT_CONTEXT_PROMPT: Final[str] = "## Memories\nConsider the following memori
class Mem0Provider(ContextProvider):
mem0_client: AsyncMemoryClient
api_key: str | None = None
application_id: str | None = None
agent_id: str | None = None
@@ -25,8 +27,6 @@ class Mem0Provider(ContextProvider):
user_id: str | None = None
scope_to_per_operation_thread_id: bool = False
context_prompt: str = DEFAULT_CONTEXT_PROMPT
# Use Any to avoid forward reference issues with AsyncMemoryClient
mem0_client: Any = None
_should_close_client: bool = PrivateAttr(default=False) # Track whether we should close client connection
@@ -39,7 +39,7 @@ class Mem0Provider(ContextProvider):
user_id: str | None = None,
scope_to_per_operation_thread_id: bool = False,
context_prompt: str = DEFAULT_CONTEXT_PROMPT,
mem0_client: Any = None,
mem0_client: AsyncMemoryClient | None = None,
) -> None:
"""Initializes a new instance of the Mem0Provider class.
@@ -56,8 +56,6 @@ class Mem0Provider(ContextProvider):
"""
should_close_client = False
if mem0_client is None:
from mem0 import AsyncMemoryClient
mem0_client = AsyncMemoryClient(api_key=api_key)
should_close_client = True
@@ -84,7 +82,7 @@ class Mem0Provider(ContextProvider):
async def __aexit__(self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any) -> None:
"""Async context manager exit."""
if self._should_close_client and self.mem0_client:
await self.mem0_client.__aexit__(exc_type, exc_val, exc_tb)
await self.mem0_client.__aexit__(exc_type, exc_val, exc_tb) # type: ignore
async def thread_created(self, thread_id: str | None = None) -> None:
"""Called when a new thread is created.
-1
View File
@@ -72,7 +72,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework_mem0"]
+47 -39
View File
@@ -9,7 +9,7 @@ from agent_framework.exceptions import ServiceInitializationError
from agent_framework.mem0 import Mem0Provider
def test_mem0_provider_import():
def test_mem0_provider_import() -> None:
"""Test that Mem0Provider can be imported."""
assert Mem0Provider is not None
@@ -17,7 +17,9 @@ def test_mem0_provider_import():
@pytest.fixture
def mock_mem0_client() -> AsyncMock:
"""Create a mock Mem0 AsyncMemoryClient."""
mock_client = AsyncMock()
from mem0 import AsyncMemoryClient
mock_client = AsyncMock(spec=AsyncMemoryClient)
mock_client.add = AsyncMock()
mock_client.search = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
@@ -40,7 +42,7 @@ def sample_messages() -> list[ChatMessage]:
class TestMem0ProviderInitialization:
"""Test initialization and configuration of Mem0Provider."""
def test_init_with_all_ids(self, mock_mem0_client: AsyncMock):
def test_init_with_all_ids(self, mock_mem0_client: AsyncMock) -> None:
"""Test initialization with all IDs provided."""
provider = Mem0Provider(
user_id="user123",
@@ -54,7 +56,7 @@ class TestMem0ProviderInitialization:
assert provider.application_id == "app123"
assert provider.thread_id == "thread123"
def test_init_without_filters_succeeds(self, mock_mem0_client: AsyncMock):
def test_init_without_filters_succeeds(self, mock_mem0_client: AsyncMock) -> None:
"""Test that initialization succeeds even without filters (validation happens during invocation)."""
provider = Mem0Provider(mem0_client=mock_mem0_client)
assert provider.user_id is None
@@ -62,13 +64,13 @@ class TestMem0ProviderInitialization:
assert provider.application_id is None
assert provider.thread_id is None
def test_init_with_custom_context_prompt(self, mock_mem0_client: AsyncMock):
def test_init_with_custom_context_prompt(self, mock_mem0_client: AsyncMock) -> None:
"""Test initialization with custom context prompt."""
custom_prompt = "## Custom Memories\nConsider these memories:"
provider = Mem0Provider(user_id="user123", context_prompt=custom_prompt, mem0_client=mock_mem0_client)
assert provider.context_prompt == custom_prompt
def test_init_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock):
def test_init_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test initialization with scope_to_per_operation_thread_id enabled."""
provider = Mem0Provider(
user_id="user123",
@@ -77,10 +79,12 @@ class TestMem0ProviderInitialization:
)
assert provider.scope_to_per_operation_thread_id is True
@patch("mem0.AsyncMemoryClient")
def test_init_creates_default_client_when_none_provided(self, mock_memory_client_class: AsyncMock):
@patch("agent_framework_mem0._provider.AsyncMemoryClient")
def test_init_creates_default_client_when_none_provided(self, mock_memory_client_class: AsyncMock) -> None:
"""Test that a default client is created when none is provided."""
mock_client = AsyncMock()
from mem0 import AsyncMemoryClient
mock_client = AsyncMock(spec=AsyncMemoryClient)
mock_memory_client_class.return_value = mock_client
provider = Mem0Provider(user_id="user123", api_key="test_api_key")
@@ -89,7 +93,7 @@ class TestMem0ProviderInitialization:
assert provider.mem0_client == mock_client
assert provider._should_close_client is True
def test_init_with_provided_client_should_not_close(self, mock_mem0_client: AsyncMock):
def test_init_with_provided_client_should_not_close(self, mock_mem0_client: AsyncMock) -> None:
"""Test that provided client should not be closed by provider."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
assert provider._should_close_client is False
@@ -98,21 +102,23 @@ class TestMem0ProviderInitialization:
class TestMem0ProviderAsyncContextManager:
"""Test async context manager behavior."""
async def test_async_context_manager_entry(self, mock_mem0_client: AsyncMock):
async def test_async_context_manager_entry(self, mock_mem0_client: AsyncMock) -> None:
"""Test async context manager entry returns self."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
async with provider as ctx:
assert ctx is provider
async def test_async_context_manager_exit_closes_client_when_should_close(self):
async def test_async_context_manager_exit_closes_client_when_should_close(self) -> None:
"""Test that async context manager closes client when it should."""
mock_client = AsyncMock()
from mem0 import AsyncMemoryClient
mock_client = AsyncMock(spec=AsyncMemoryClient)
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock()
mock_client.async_client = AsyncMock()
mock_client.async_client.aclose = AsyncMock()
with patch("mem0.AsyncMemoryClient", return_value=mock_client):
with patch("agent_framework_mem0._provider.AsyncMemoryClient", return_value=mock_client):
provider = Mem0Provider(user_id="user123", api_key="test_key")
assert provider._should_close_client is True
@@ -121,7 +127,7 @@ class TestMem0ProviderAsyncContextManager:
mock_client.__aexit__.assert_called_once()
async def test_async_context_manager_exit_does_not_close_provided_client(self, mock_mem0_client: AsyncMock):
async def test_async_context_manager_exit_does_not_close_provided_client(self, mock_mem0_client: AsyncMock) -> None:
"""Test that async context manager does not close provided client."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
assert provider._should_close_client is False
@@ -135,7 +141,7 @@ class TestMem0ProviderAsyncContextManager:
class TestMem0ProviderThreadMethods:
"""Test thread lifecycle methods."""
async def test_thread_created_sets_per_operation_thread_id(self, mock_mem0_client: AsyncMock):
async def test_thread_created_sets_per_operation_thread_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test that thread_created sets per-operation thread ID."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
@@ -143,7 +149,7 @@ class TestMem0ProviderThreadMethods:
assert provider._per_operation_thread_id == "thread123"
async def test_thread_created_with_existing_thread_id(self, mock_mem0_client: AsyncMock):
async def test_thread_created_with_existing_thread_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test thread_created when thread ID already exists."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
provider._per_operation_thread_id = "existing_thread"
@@ -153,7 +159,7 @@ class TestMem0ProviderThreadMethods:
# Should not overwrite existing thread ID
assert provider._per_operation_thread_id == "existing_thread"
async def test_thread_created_validation_with_scope_enabled(self, mock_mem0_client: AsyncMock):
async def test_thread_created_validation_with_scope_enabled(self, mock_mem0_client: AsyncMock) -> None:
"""Test thread_created validation when scope_to_per_operation_thread_id is enabled."""
provider = Mem0Provider(
user_id="user123",
@@ -169,7 +175,7 @@ class TestMem0ProviderThreadMethods:
async def test_messages_adding_sets_per_operation_thread_id(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test that messages_adding sets per-operation thread ID."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
@@ -181,7 +187,7 @@ class TestMem0ProviderThreadMethods:
class TestMem0ProviderMessagesAdding:
"""Test messages_adding method."""
async def test_messages_adding_fails_without_filters(self, mock_mem0_client: AsyncMock):
async def test_messages_adding_fails_without_filters(self, mock_mem0_client: AsyncMock) -> None:
"""Test that messages_adding fails when no filters are provided."""
provider = Mem0Provider(mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="Hello!")
@@ -191,7 +197,7 @@ class TestMem0ProviderMessagesAdding:
assert "At least one of the filters" in str(exc_info.value)
async def test_messages_adding_single_message(self, mock_mem0_client: AsyncMock):
async def test_messages_adding_single_message(self, mock_mem0_client: AsyncMock) -> None:
"""Test adding a single message."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="Hello!")
@@ -205,7 +211,7 @@ class TestMem0ProviderMessagesAdding:
async def test_messages_adding_multiple_messages(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test adding multiple messages."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
@@ -220,7 +226,9 @@ class TestMem0ProviderMessagesAdding:
]
assert call_args.kwargs["messages"] == expected_messages
async def test_messages_adding_with_agent_id(self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]):
async def test_messages_adding_with_agent_id(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
) -> None:
"""Test adding messages with agent_id."""
provider = Mem0Provider(agent_id="agent123", mem0_client=mock_mem0_client)
@@ -232,7 +240,7 @@ class TestMem0ProviderMessagesAdding:
async def test_messages_adding_with_application_id(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test adding messages with application_id in metadata."""
provider = Mem0Provider(user_id="user123", application_id="app123", mem0_client=mock_mem0_client)
@@ -243,7 +251,7 @@ class TestMem0ProviderMessagesAdding:
async def test_messages_adding_with_scope_to_per_operation_thread_id(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test adding messages with scope_to_per_operation_thread_id enabled."""
provider = Mem0Provider(
user_id="user123",
@@ -260,7 +268,7 @@ class TestMem0ProviderMessagesAdding:
async def test_messages_adding_without_scope_uses_base_thread_id(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test adding messages without scope uses base thread_id."""
provider = Mem0Provider(
user_id="user123",
@@ -274,7 +282,7 @@ class TestMem0ProviderMessagesAdding:
call_args = mock_mem0_client.add.call_args
assert call_args.kwargs["run_id"] == "base_thread"
async def test_messages_adding_filters_empty_messages(self, mock_mem0_client: AsyncMock):
async def test_messages_adding_filters_empty_messages(self, mock_mem0_client: AsyncMock) -> None:
"""Test that empty or invalid messages are filtered out."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
messages = [
@@ -289,7 +297,7 @@ class TestMem0ProviderMessagesAdding:
# Should only include the valid message
assert call_args.kwargs["messages"] == [{"role": "user", "content": "Valid message"}]
async def test_messages_adding_skips_when_no_valid_messages(self, mock_mem0_client: AsyncMock):
async def test_messages_adding_skips_when_no_valid_messages(self, mock_mem0_client: AsyncMock) -> None:
"""Test that mem0 client is not called when no valid messages exist."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
messages = [
@@ -305,7 +313,7 @@ class TestMem0ProviderMessagesAdding:
class TestMem0ProviderModelInvoking:
"""Test model_invoking method."""
async def test_model_invoking_fails_without_filters(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_fails_without_filters(self, mock_mem0_client: AsyncMock) -> None:
"""Test that model_invoking fails when no filters are provided."""
provider = Mem0Provider(mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="What's the weather?")
@@ -315,7 +323,7 @@ class TestMem0ProviderModelInvoking:
assert "At least one of the filters" in str(exc_info.value)
async def test_model_invoking_single_message(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_single_message(self, mock_mem0_client: AsyncMock) -> None:
"""Test model_invoking with a single message."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="What's the weather?")
@@ -345,7 +353,7 @@ class TestMem0ProviderModelInvoking:
async def test_model_invoking_multiple_messages(
self, mock_mem0_client: AsyncMock, sample_messages: list[ChatMessage]
):
) -> None:
"""Test model_invoking with multiple messages."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
@@ -357,7 +365,7 @@ class TestMem0ProviderModelInvoking:
expected_query = "Hello, how are you?\nI'm doing well, thank you!\nYou are a helpful assistant"
assert call_args.kwargs["query"] == expected_query
async def test_model_invoking_with_agent_id(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_with_agent_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test model_invoking with agent_id."""
provider = Mem0Provider(agent_id="agent123", mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="Hello")
@@ -370,7 +378,7 @@ class TestMem0ProviderModelInvoking:
assert call_args.kwargs["agent_id"] == "agent123"
assert call_args.kwargs["user_id"] is None
async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_with_scope_to_per_operation_thread_id(self, mock_mem0_client: AsyncMock) -> None:
"""Test model_invoking with scope_to_per_operation_thread_id enabled."""
provider = Mem0Provider(
user_id="user123",
@@ -388,7 +396,7 @@ class TestMem0ProviderModelInvoking:
call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["run_id"] == "operation_thread"
async def test_model_invoking_no_memories_returns_none_instructions(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_no_memories_returns_none_instructions(self, mock_mem0_client: AsyncMock) -> None:
"""Test that no memories returns context with None instructions."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
message = ChatMessage(role=Role.USER, text="Hello")
@@ -400,7 +408,7 @@ class TestMem0ProviderModelInvoking:
assert isinstance(context, Context)
assert not context.contents
async def test_model_invoking_filters_empty_message_text(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_filters_empty_message_text(self, mock_mem0_client: AsyncMock) -> None:
"""Test that empty message text is filtered out from query."""
provider = Mem0Provider(user_id="user123", mem0_client=mock_mem0_client)
messages = [
@@ -416,7 +424,7 @@ class TestMem0ProviderModelInvoking:
call_args = mock_mem0_client.search.call_args
assert call_args.kwargs["query"] == "Valid message"
async def test_model_invoking_custom_context_prompt(self, mock_mem0_client: AsyncMock):
async def test_model_invoking_custom_context_prompt(self, mock_mem0_client: AsyncMock) -> None:
"""Test model_invoking with custom context prompt."""
custom_prompt = "## Custom Context\nRemember these details:"
provider = Mem0Provider(
@@ -439,7 +447,7 @@ class TestMem0ProviderModelInvoking:
class TestMem0ProviderValidation:
"""Test validation methods."""
def test_validate_per_operation_thread_id_success(self, mock_mem0_client: AsyncMock):
def test_validate_per_operation_thread_id_success(self, mock_mem0_client: AsyncMock) -> None:
"""Test successful validation of per-operation thread ID."""
provider = Mem0Provider(
user_id="user123",
@@ -454,7 +462,7 @@ class TestMem0ProviderValidation:
# Should not raise exception for None
provider._validate_per_operation_thread_id(None)
def test_validate_per_operation_thread_id_failure(self, mock_mem0_client: AsyncMock):
def test_validate_per_operation_thread_id_failure(self, mock_mem0_client: AsyncMock) -> None:
"""Test validation failure for conflicting thread IDs."""
provider = Mem0Provider(
user_id="user123",
@@ -468,7 +476,7 @@ class TestMem0ProviderValidation:
assert "can only be used with one thread at a time" in str(exc_info.value)
def test_validate_per_operation_thread_id_disabled_scope(self, mock_mem0_client: AsyncMock):
def test_validate_per_operation_thread_id_disabled_scope(self, mock_mem0_client: AsyncMock) -> None:
"""Test that validation is skipped when scope is disabled."""
provider = Mem0Provider(
user_id="user123",
-1
View File
@@ -69,7 +69,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework_runtime"]
@@ -2,7 +2,7 @@
import logging
import uuid
from collections.abc import AsyncIterable
from collections.abc import AsyncIterable, Sequence
from datetime import datetime
from typing import TYPE_CHECKING, Any, ClassVar, TypedDict, cast
@@ -199,7 +199,7 @@ class WorkflowAgent(BaseAgent):
def _normalize_messages(
self,
messages: str | ChatMessage | list[str] | list[ChatMessage] | None = None,
messages: str | ChatMessage | Sequence[str] | Sequence[ChatMessage] | None = None,
) -> list[ChatMessage]:
"""Normalize input messages to a list of ChatMessage objects."""
if messages is None:
@@ -211,7 +211,7 @@ class WorkflowAgent(BaseAgent):
if isinstance(messages, ChatMessage):
return [messages]
normalized = []
normalized: list[ChatMessage] = []
for msg in messages:
if isinstance(msg, str):
normalized.append(ChatMessage(role=Role.USER, contents=[TextContent(text=msg)]))
-1
View File
@@ -74,7 +74,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework_workflow"]
+3 -2
View File
@@ -6,6 +6,7 @@ requires-python = ">=3.10"
dependencies = [
"agent-framework",
"agent-framework-azure",
"agent-framework-copilotstudio",
"agent-framework-foundry",
"agent-framework-mem0",
"agent-framework-workflow",
@@ -63,6 +64,7 @@ exclude = [ "packages/agent_framework_project.egg-info" ]
[tool.uv.sources]
agent-framework = { workspace = true }
agent-framework-azure = { workspace = true }
agent-framework-copilotstudio = { workspace = true }
agent-framework-foundry = { workspace = true }
agent-framework-mem0 = { workspace = true }
agent-framework-runtime = { workspace = true }
@@ -164,7 +166,6 @@ show_error_codes = true
warn_unused_ignores = false
disallow_incomplete_defs = true
disallow_untyped_decorators = true
disallow_any_unimported = true
[tool.bandit]
targets = ["agent_framework"]
@@ -193,7 +194,7 @@ build = "python run_tasks_in_packages_if_exists.py build"
# combined checks
check = ["fmt", "lint", "pyright", "mypy", "test", "markdown-code-lint", "samples-code-check"]
pre-commit-check = ["fmt", "lint", "pyright", "markdown-code-lint", "samples-code-check"]
all-tests = "pytest --import-mode=importlib --cov=agent_framework --cov=agent_framework_azure --cov=agent_framework_foundry --cov=agent_framework_mem0 --cov=agent_framework_workflow --cov-report=term-missing:skip-covered packages/azure/tests packages/foundry/tests packages/main/tests packages/mem0/tests packages/workflow/tests"
all-tests = "pytest --import-mode=importlib --cov=agent_framework --cov=agent_framework_azure --cov=agent_framework_copilotstudio --cov=agent_framework_foundry --cov=agent_framework_mem0 --cov=agent_framework_workflow --cov-report=term-missing:skip-covered packages/azure/tests packages/copilotstudio/tests packages/foundry/tests packages/main/tests packages/mem0/tests packages/workflow/tests"
[tool.poe.tasks.venv]
cmd = "uv venv --clear --python $python"
@@ -10,6 +10,12 @@ This folder contains examples demonstrating how to create and use agents with di
|--------|-------------|
| **[`foundry/`](foundry/)** | Create agents using Azure AI Foundry |
### Microsoft Copilot Studio Examples
| Folder | Description |
|--------|-------------|
| **[`copilotstudio/`](copilotstudio/)** | Create agents using Microsoft Copilot Studio |
### Azure OpenAI Examples
| Folder | Description |
@@ -0,0 +1,96 @@
# Copilot Studio Agent Examples
This folder contains examples demonstrating how to create and use agents with Microsoft Copilot Studio using the Agent Framework.
## Prerequisites
Before running these examples, you need:
1. **Copilot Studio Environment**: Access to a Microsoft Copilot Studio environment with a published copilot
2. **App Registration**: An Azure AD App Registration with appropriate permissions
3. **Environment Variables**: Set the following environment variables:
- `COPILOTSTUDIOAGENT__ENVIRONMENTID` - Your Copilot Studio environment ID
- `COPILOTSTUDIOAGENT__SCHEMANAME` - Your copilot's agent identifier/schema name
- `COPILOTSTUDIOAGENT__AGENTAPPID` - Your App Registration client ID
- `COPILOTSTUDIOAGENT__TENANTID` - Your Azure AD tenant ID
## Examples
| Example | Description |
|---------|-------------|
| **[`copilotstudio_basic.py`](copilotstudio_basic.py)** | Basic non-streaming and streaming execution with simple questions |
| **[`copilotstudio_with_explicit_settings.py`](copilotstudio_with_explicit_settings.py)** | Example with explicit settings and manual token acquisition |
## Authentication
The examples use MSAL (Microsoft Authentication Library) for authentication. The first time you run an example, you may need to complete an interactive authentication flow in your browser.
### App Registration Setup
Your Azure AD App Registration should have:
1. **API Permissions**:
- Power Platform API permissions (https://api.powerplatform.com/.default)
- Appropriate delegated permissions for your organization
2. **Redirect URIs**:
- For public client flows: `http://localhost`
- Configure as appropriate for your authentication method
3. **Authentication**:
- Enable "Allow public client flows" if using interactive authentication
## Usage Patterns
### Basic Usage with Environment Variables
```python
from agent_framework.copilotstudio import CopilotStudioAgent
# Uses environment variables for configuration
agent = CopilotStudioAgent()
result = await agent.run("What is the capital of France?")
```
### Explicit Configuration
```python
from agent_framework.copilotstudio import CopilotStudioAgent, acquire_token
from microsoft_agents.copilotstudio.client import ConnectionSettings, CopilotClient, PowerPlatformCloud, AgentType
# Acquire token manually
token = acquire_token(
client_id="your-client-id",
tenant_id="your-tenant-id"
)
# Create settings and client
settings = ConnectionSettings(
environment_id="your-environment-id",
agent_identifier="your-agent-schema-name",
cloud=PowerPlatformCloud.PROD,
copilot_agent_type=AgentType.PUBLISHED
)
client = CopilotClient(settings=settings, token=token)
agent = CopilotStudioAgent(client=client)
```
## Troubleshooting
### Common Issues
1. **Authentication Errors**:
- Verify your App Registration has correct permissions
- Ensure environment variables are set correctly
- Check that your tenant ID and client ID are valid
2. **Environment/Agent Not Found**:
- Verify your environment ID is correct
- Ensure your copilot is published and the schema name is correct
- Check that you have access to the specified environment
3. **Token Acquisition Failures**:
- Interactive authentication may require browser access
- Corporate firewalls may block authentication flows
- Try running with appropriate proxy settings if needed
@@ -0,0 +1,47 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from agent_framework.copilotstudio import CopilotStudioAgent
# Environment variables needed:
# COPILOTSTUDIOAGENT__ENVIRONMENTID - Environment ID where your copilot is deployed
# COPILOTSTUDIOAGENT__SCHEMANAME - Agent identifier/schema name of your copilot
# COPILOTSTUDIOAGENT__AGENTAPPID - Client ID for authentication
# COPILOTSTUDIOAGENT__TENANTID - Tenant ID for authentication
async def non_streaming_example() -> None:
"""Example of non-streaming response (get the complete result at once)."""
print("=== Non-streaming Response Example ===")
agent = CopilotStudioAgent()
query = "What is the capital of France?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}\n")
async def streaming_example() -> None:
"""Example of streaming response (get results as they are generated)."""
print("=== Streaming Response Example ===")
agent = CopilotStudioAgent()
query = "What is the capital of Spain?"
print(f"User: {query}")
print("Agent: ", end="", flush=True)
async for chunk in agent.run_stream(query):
if chunk.text:
print(chunk.text, end="", flush=True)
print("\n")
async def main() -> None:
await non_streaming_example()
await streaming_example()
if __name__ == "__main__":
asyncio.run(main())
@@ -0,0 +1,87 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import os
from agent_framework.copilotstudio import CopilotStudioAgent, acquire_token
from microsoft_agents.copilotstudio.client import AgentType, ConnectionSettings, CopilotClient, PowerPlatformCloud
# Environment variables needed:
# COPILOTSTUDIOAGENT__ENVIRONMENTID - Environment ID where your copilot is deployed
# COPILOTSTUDIOAGENT__SCHEMANAME - Agent identifier/schema name of your copilot
# COPILOTSTUDIOAGENT__AGENTAPPID - Client ID for authentication
# COPILOTSTUDIOAGENT__TENANTID - Tenant ID for authentication
async def example_with_connection_settings() -> None:
"""Example using explicit ConnectionSettings and CopilotClient."""
print("=== Copilot Studio Agent with Connection Settings ===")
# Configuration from environment variables
environment_id = os.environ["COPILOTSTUDIOAGENT__ENVIRONMENTID"]
agent_identifier = os.environ["COPILOTSTUDIOAGENT__SCHEMANAME"]
client_id = os.environ["COPILOTSTUDIOAGENT__AGENTAPPID"]
tenant_id = os.environ["COPILOTSTUDIOAGENT__TENANTID"]
# Acquire token using the acquire_token function
token = acquire_token(
client_id=client_id,
tenant_id=tenant_id,
)
# Create connection settings
settings = ConnectionSettings(
environment_id=environment_id,
agent_identifier=agent_identifier,
cloud=PowerPlatformCloud.PROD, # Or PowerPlatformCloud.GOV, PowerPlatformCloud.HIGH, etc.
copilot_agent_type=AgentType.PUBLISHED, # Or AgentType.PREBUILT
custom_power_platform_cloud=None, # Optional: for custom cloud endpoints
)
# Create CopilotClient with explicit settings
client = CopilotClient(settings=settings, token=token)
# Create agent with explicit client
agent = CopilotStudioAgent(client=client)
# Run a simple query
query = "What is the capital of Italy?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}")
async def example_with_explicit_parameters() -> None:
"""Example using CopilotStudioAgent with all parameters explicitly provided."""
print("\n=== Copilot Studio Agent with All Explicit Parameters ===")
# Configuration from environment variables
environment_id = os.environ["COPILOTSTUDIOAGENT__ENVIRONMENTID"]
agent_identifier = os.environ["COPILOTSTUDIOAGENT__SCHEMANAME"]
client_id = os.environ["COPILOTSTUDIOAGENT__AGENTAPPID"]
tenant_id = os.environ["COPILOTSTUDIOAGENT__TENANTID"]
# Create agent with all parameters explicitly
agent = CopilotStudioAgent(
environment_id=environment_id,
agent_identifier=agent_identifier,
client_id=client_id,
tenant_id=tenant_id,
cloud=PowerPlatformCloud.PROD,
agent_type=AgentType.PUBLISHED,
)
# Run a simple query
query = "What is the capital of Japan?"
print(f"User: {query}")
result = await agent.run(query)
print(f"Agent: {result}")
async def main() -> None:
await example_with_connection_settings()
await example_with_explicit_parameters()
if __name__ == "__main__":
asyncio.run(main())
+2108 -2050
View File
File diff suppressed because it is too large Load Diff