Files
Shubham Kumar b00465d7be Python: feat: Add Agent Framework to A2A bridge support (#2403)
* feat: Add Agent Framework to A2A bridge support

- Implement A2A event adapter for converting agent messages to A2A protocol
- Add A2A execution context for managing agent execution state
- Implement A2A executor for running agents in A2A environment
- Add comprehensive unit tests for event adapter, execution context, and executor
- Update agent framework core A2A module exports and type stubs
- Integrate thread management utilities for async execution
- Add getting started sample for A2A agent framework integration
- Update dependencies in uv.lock

This integration enables agent framework agents to communicate and execute within the A2A (Agent to Agent) infrastructure.

* fix: Update references from agent_thread_storage to _agent_thread_storage in A2A executor tests

* Refactor A2A agent framework and improve code structure

- Reordered imports in various files for consistency and clarity.
- Updated `__all__` definitions to maintain a consistent order across modules.
- Simplified method signatures by removing unnecessary line breaks.
- Enhanced readability by adjusting formatting in several sections.
- Removed redundant comments and example scenarios in the execution context.
- Improved handling of agent messages in the event adapter.
- Added type hints for better clarity and type checking.
- Cleaned up test cases for better organization and readability.

* fix: Lint fix new line added

* test: Add unit tests for AgentThreadStorage and InMemoryAgentThreadStorage

* refactor: Update type hints to use new syntax for Union and List

* fix: Validate RequestContext for context_id and message before execution

* Refactor tests and remove A2aExecutionContext references

- Deleted the test file for A2aExecutionContext as it is no longer needed.
- Updated A2aExecutor tests to remove dependencies on A2aExecutionContext and adjusted method calls accordingly.
- Modified event adapter tests to use ChatMessage instead of AgentRunResponseUpdate.
- Removed A2aExecutionContext from imports in agent_framework.a2a module and updated type hints accordingly.

* Refactor A2AExecutor tests and remove event adapter

- Updated test cases to use A2AExecutor instead of A2aExecutor for consistency.
- Removed mock_event_adapter fixture and related tests as A2aEventAdapter is deprecated.
- Consolidated event handling tests into TestA2AExecutorEventAdapter.
- Adjusted imports in various files to reflect the removal of deprecated components.
- Ensured all references to A2aExecutor are updated to A2AExecutor across the codebase.

* refactor: Remove AgentThreadStorage and InMemoryAgentThreadStorage classes from threads and tests

* feat: A2AExecutor to have its own override able save and get threads methods for persistent storage.

* fix: linter bugs

* removed unnecessary changes form core package

* new line added

* Refactor A2AExecutor tests and update imports

- Consolidated mock agent fixtures in test_a2a_executor.py to simplify agent mocking.
- Removed redundant tests related to thread storage and agent types, focusing on A2AExecutor's core functionality.
- Updated test assertions to reflect changes in message handling with new Message and Content classes.
- Enhanced integration tests to ensure compatibility with the new agent framework structure.
- Added A2AExecutor to the module exports in __init__.py and __init__.pyi for better accessibility.

* Update A2A documentation: enhance usage examples for A2AAgent and A2AExecutor

* Updated uv lock

* Fix metadata assertion in TestA2AExecutorHandleEvents and reorder load_dotenv call in agent_framework_to_a2a.py

* Update agent card configuration: add default input and output modes, and fix agent creation method

* Fix assertion for metadata in TestA2AExecutorHandleEvents

* Fix formatting issues in TestA2AExecutorExecute and TestA2AExecutorIntegration

* Enhance A2AExecutor documentation with examples and clarify agent execution process

* Revert uv lock to main

* Refactor A2AExecutor: Improve formatting and streamline constructor parameters

* Apply suggestions from code review

Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>

* Refactor A2AExecutor to use SupportsAgentRun and enhance logging; update agent framework sample for flight and hotel booking capabilities

* Enhance A2AExecutor with streaming support and custom run arguments; update tests for initialization and execution scenarios

* Enhance A2AExecutor event handling with streamed artifact tracking; update tests for new behavior

* Refactor A2AExecutor to enforce type hints for stream and run_kwargs attributes

* Refactor A2AExecutor and tests: replace AsyncMock with MagicMock for response stream handling; clean up imports in agent_framework_to_a2a.py

* refactor: streamline imports and improve code readability across multiple files

* feat: enhance A2AExecutor cancel method with context validation and fixed review comments

* feat: implement get_uri_data utility function for extracting base64 data from data URIs and update references

* fix: update import path for get_uri_data utility function in A2AExecutor and A2AAgent

* fix: correct error message handling in A2AExecutor and update test assertions

---------

Co-authored-by: Eduard van Valkenburg <eavanvalkenburg@users.noreply.github.com>
2026-04-24 08:35:40 +00:00

42 lines
1.5 KiB
Python

# Copyright (c) Microsoft. All rights reserved.
import pytest
from agent_framework_a2a._utils import get_uri_data
def test_get_uri_data_valid() -> None:
"""Test get_uri_data with valid data URIs."""
# Simple text/plain
uri = "data:text/plain;base64,SGVsbG8sIFdvcmxkIQ=="
assert get_uri_data(uri) == "SGVsbG8sIFdvcmxkIQ=="
# Image png
uri = "data:image/png;base64,iVBORw0KGgoAAAANSUhEUgfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
assert get_uri_data(uri) == "iVBORw0KGgoAAAANSUhEUgfFcSJAAAADUlEQVR42mP8/5+hHgAHggJ/PchI7wAAAABJRU5ErkJggg=="
# Application octet-stream
uri = "data:application/octet-stream;base64,AQIDBA=="
assert get_uri_data(uri) == "AQIDBA=="
def test_get_uri_data_invalid_format() -> None:
"""Test get_uri_data with invalid URI formats."""
invalid_uris = [
"not-a-uri",
"http://example.com",
"data:text/plain;SGVsbG8sIFdvcmxkIQ==", # Missing base64 marker
"data:base64,SGVsbG8sIFdvcmxkIQ==", # Missing media type
"data:text/plain;charset=utf-8;base64,SGVsbG8sIFdvcmxkIQ==", # Extra parameters (current regex doesn't support)
"data:text/plain;base64,SGVsbG8sIFdvcmxkIQ== extra",
]
for uri in invalid_uris:
with pytest.raises(ValueError, match="Invalid data URI format"):
get_uri_data(uri)
def test_get_uri_data_empty() -> None:
"""Test get_uri_data with empty string."""
with pytest.raises(ValueError, match="Invalid data URI format"):
get_uri_data("")