Thread interfacce (#50)

* Thread interfacce

* Update to address comments

* Update

* Update
This commit is contained in:
Eric Zhu
2025-06-02 16:02:21 -07:00
committed by GitHub
Unverified
parent 6089446f04
commit 448f637f5e
2 changed files with 175 additions and 44 deletions
+77 -41
View File
@@ -29,20 +29,28 @@ custom agents easy to implement, we can remove this agent.__
- `ResponsesAgent`: an agent that is backed by OpenAI's Responses API.
- `A2AAgent`: an agent that is backed by the [A2A Protocol](https://google.github.io/A2A/documentation/).
## `Agent` protocol
## `Agent` base class
```python
class Agent(Protocol):
"""The protocol for all agents in the framework."""
TInThread = TypeVar("TInThread", bound="AgentThread", contravariant=True)
TNewThread = TypeVar("TOutThread", bound="AgentThread", covariant=True)
class Agent(ABC, Generic[TInThread, TNewThread]):
"""The base class for all agents in the framework."""
@abstractmethod
async def run(
self,
thread: Thread,
context: Context,
messages: list[Message],
thread: TInThread,
context: RunContext,
) -> Result:
"""The method to run the agent on a thread of messages, and return the result.
Args:
messages: The list of new messages to process that have not been added
to the thread yet. The agent may use these messages and append
new messages to the thread as part of its processing.
thread: The thread of messages to process: it may be a local thread
or a stub thread that is backed by a remote service.
context: The context for the current invocation of the agent, providing
@@ -52,15 +60,30 @@ class Agent(Protocol):
The result of running the agent, which includes the final response.
"""
...
@classmethod
@abstractmethod
async def create_thread(self) -> TNewThread:
"""Create a new thread for the agent to use.
Returns:
A new thread that is compatible with the agent.
"""
...
@dataclass
class Context:
class RunContext:
"""The context for the current invocation of the agent."""
event_handler: EventHandler
"""The event consumer for handling events emitted by the agent."""
event_handler: EventHandler | Callable[[Message], Awaitable[None]]
"""The event consumer for handling events emitted by the agent. Could be
a callable that takes a message and returns an awaitable, or an instance of
`EventHandler` that handles events emitted by the agent."""
user_input_source: UserInputSource
"""The user input source for requesting for user input during the agent run."""
... # Other fields, could be extended to include more for application-specific needs.
@@ -78,7 +101,11 @@ The `ToolCallingAgent` implements the `Agent` base class and
it implements the `run` method to process incoming messages and call tools if needed.
```python
class ToolCallingAgent(Agent):
TInThread = TypeVar("TInThread", bound="ChatMessageThread", contravariant=True)
TNewThread = TypeVar("TOuthread", bound="ChatMessageThread", covariant=True)
class ToolCallingAgent(Agent[TInThread, TNewThread]):
def __init__(
self,
model_client: ModelClient,
@@ -87,9 +114,11 @@ class ToolCallingAgent(Agent):
self.model_client = model_client
self.tools = tools
async def run(self, thread: Thread, context: Context) -> Result:
async def run(self, messages: list[Message], thread: TInThread, context: RunContext) -> Result:
# Apply the messages to the thread.
await thread.on_new_messages(messages)
# Create a response using the model client, passing the thread and context.
create_result = await self.model_client.create(thread, context, tools=self.tools)
create_result = await self.model_client.create(thread.messages, context, tools=self.tools)
# Emit the event to notify the workflow consumer of a model response.
await context.emit(ModelResponseEvent(create_result))
if create_result.is_tool_call():
@@ -103,7 +132,7 @@ class ToolCallingAgent(Agent):
# Emit the event to notify the workflow consumer of a tool call.
await context.emit(ToolCallEvent(tool_result))
# Update the thread with the tool result.
await thread.append(tool_result.to_messages())
await thread.on_new_messages(tool_result.to_messages())
# Return the tool result as the response.
return Result(
final_response=tool_result,
@@ -113,6 +142,14 @@ class ToolCallingAgent(Agent):
return Result(
final_response=create_result,
)
@classmethod
async def create_thread(self) -> TNewThread:
"""Create a new thread for the agent to use.
NOTE: this could be part of a new base class for this type of agent.
"""
return await ChatMessageThread.create()
```
Things to note in the implementation of the `run` method:
@@ -120,20 +157,23 @@ Things to note in the implementation of the `run` method:
- Components such as `thread` and `model_client` interacts smoothly with little boilerplate code.
- The `context` parameter provides convenient access to the workflow run fixtures such as event channel.
In practice, the developer likely will inherit from `ChatAgent` to
customize the `run` method, so they don't need to implement the boilerplate code
for creating a thread.
An agent doesn't need to use components provided by the framework to implement the agent interface.
For example, in a multi-agent workflow, we may need a verification agent in a using deterministic
logic to critic another agent's response.
```python
class CriticAgent(Agent):
class CriticAgent(ChatAgent):
def __init__(self) -> None:
self.verification_logic = ... # Some verification logic, e.g. a set of rules.
async def run(self, thread: Thread, context: Context) -> Result:
# Use the verification logic to verify the last message in the thread.
response = thread.get_last_message()
is_verified = self.verification_logic.verify(response)
async def run(self, messages: list[Message], thread: ChatMessageThread, context: RunContext) -> Result:
# Use the verification logic to verify the messages.
is_verified = self.verification_logic.verify(messages)
if is_verified:
final_response = Message("The response is verified.")
else:
@@ -165,17 +205,14 @@ agent = ToolCallingAgent(
)
# Create a thread for the current task.
thread = [
Message("Hello"),
Message("Can you find the file 'foo.txt' for me?"),
]
thread = await ChatMessageThread.create()
# Create a context that uses a handler that prints emitted events to the console,
# and a user input source that reads from the console.
context = Context(event_handler=ConsoleEventHandler(), user_input_source=ConsoleUserInputSource())
context = RunContext(event_handler=ConsoleEventHandler(), user_input_source=ConsoleUserInputSource())
# Run the agent with the thread and context.
result = await agent.run(thread, context)
result = await agent.run([Message("Can you find the file 'foo.txt' for me?")], thread, context)
```
## User session
@@ -220,19 +257,13 @@ we can run the same instance of the agent concurrently.
```python
# Create threads for concurrent tasks.
thread1 = [
Message("Hello"),
Message("Can you find the file 'foo.txt' for me?"),
]
thread2 = [
Message("Hello"),
Message("Can you find the file 'bar.txt' for me?"),
]
thread1 = ChatMessageThread.create()
thread2 = ChatMessageThread.create()
# Run the agent concurrently on multiple threads.
results = await asyncio.gather(
agent.run(thread1, context),
agent.run(thread2, context),
agent.run([Message(...)], thread1, context),
agent.run([Message(...)], thread2, context),
)
# The `context`'s event handlers will emit events from both runs.
```
@@ -263,7 +294,7 @@ agent = FoundryAgent(
thread = FoundryThread(thread_id="my_thread_id")
# Run the agent on the thread and an new context that emits events to the console.
result = await agent.run(thread, RunContext(event_channel="console"))
result = await agent.run([Message(...)], thread, RunRunContext(event_channel="console"))
```
## Alternative agent abstractions
@@ -287,12 +318,12 @@ public methods for the orchestration code to manipulate its conversation state
indirectly.
```python
class Agent(Protocol):
class Agent(ABC):
async def run(
self,
messages: list[Message],
context: Context,
context: RunContext,
) -> Result:
"""The method to run the agent and return the result.
@@ -317,16 +348,18 @@ For agent without conversation state, the agent is invoked with a thread
and the agent is responsible for processing the messages in the thread.
```python
class Agent(Protocol):
class Agent(ABC, Generic[TThread]):
async def run(
self,
thread: Thread,
context: Context,
messages: list[Message],
thread: TThread,
context: RunContext,
) -> Result:
"""The method to run the agent on a thread of messages, and return the result.
Args:
messages: The list of new messages to process.
thread: The current conversation state.
context: The context for the current invocation of the agent, providing
access to the event channel, and human-in-the-loop (HITL) features.
@@ -343,7 +376,7 @@ the a state in addition to components like model client and tools, which could b
or a custom state object that the agent uses to manage its conversation state.
```python
class CustomAgent(Agent):
class CustomAgent(Agent[ChatMessageThread]):
def __init__(self,
model_client: ModelClient,
tools: list[Tool],
@@ -359,7 +392,7 @@ For agent without conversation state, the agent is initialized with
the components it needs to process messages, such as a model client and tools.
```python
class CustomAgent(Agent):
class CustomAgent(Agent[ChatMessageThread]):
def __init__(
self,
model_client: ModelClient,
@@ -473,4 +506,7 @@ Another factor to consider is that Semantic Kernel already has agent abstraction
that passes a thread per invocation, so it is easier for us to migrate to the
new interface.
**Decision**: We will use the agent abstraction without conversation state
as the interface for agents in the framework.
> **We should continue to question this decision as we implement more agents and workflows, and revisit the design.**
+98 -3
View File
@@ -7,16 +7,15 @@ resume a previous user session.
Thread should use message and content types as defined in [Core Data Types](types.md).
A thread can contain sub-threads as a dictionary of threads.
For workflows, a thread can contain sub-threads as a dictionary of threads.
This is to ensure agents in a workflow can run concurrently on different threads.
The default thread has the key `main` and the sub-threads having keys that are usually
corresponding to the agents in a workflow.
For workflows, thread should also support the concept of execution state, which includes:
For workflows, a thread should also support the concept of execution state, which includes:
- The history of steps taken.
- The current step in the workflow.
- The next steps to be taken.
This is to ensure the workflow can be resumed from where it left off, without losing
the state of execution.
@@ -27,3 +26,99 @@ The framework should provides default implementations of a thread class that:
- Can be serialized and deserialized to/from JSON.
- Can support checkpointing, rollback, and time travel, for both agent and workflow.
- Can automantically export truncated views to be used by model clients to keep the context size within limits.
## `AgentThread` base class
```python
class AgentThread(ABC):
"""The base class for all threads defining the minimum interface."""
# ---------- Message-handling ----------
@abstractmethod
async def on_new_messages(self, messages: list["Message"]) -> None:
"""Handle a new message added to the thread."""
...
# ---------- Lifecycle management ----------
@classmethod
@abstractmethod
async def create(self) -> "AgentThread":
"""Create a new thread of the same type."""
...
# For delete and release resources, subclass should override built-in Python `del` method.
```
## `ChatMessageThread` class
The most common thread type is going to be the `ChatMessageThread`, which is a thread that stores the messages in a list. This thread type works well with `ChatCompletionAgent` and its subclasses.
```python
class ChatMessageThread(AgentThread):
"""A thread that stores the messages in a list."""
def __init__(self):
# NOTE: We should have some way to prevent direct calling of the constructor from the base class
# and enforce using the `create` class method.
if ThreadCreationContext.is_active:
raise RuntimeError("Cannot instantiate ChatMessageThread directly. Use ChatMessageThread.create() instead.")
self._messages: list["Message"] = []
@property
def messages(self) -> list["Message"]:
"""Get the list of messages in the thread."""
return self._messages
async def on_new_messages(self, messages: list["Message"]) -> None:
"""Handle a list of new messages added to the thread."""
self._messages.extend(messages)
async def fork(self, message_id: str | None = None) -> "ChatMessageThread":
"""Create a fork of the thread starting from the given message ID.
NOTE: we may need to create a new base class / protocol for this behavior.
If no message ID is provided, the fork will start from the latest message."""
new_thread = ChatMessageThread()
if message_id is None:
new_thread._messages = self._messages.copy()
else:
index = next((i for i, msg in enumerate(self._messages) if msg.id == message_id), -1)
new_thread._messages = self._messages[index + 1:] if index != -1 else []
return new_thread
@classmethod
async def create(cls) -> "ChatMessageThread":
"""Create a new chat history thread."""
with ThreadCreationContext.activate():
return cls()
async def delete(self) -> None:
"""Delete the thread. It will not be recoverable."""
self._messages.clear()
```
## `WorkflowThread` class
The `WorkflowThread` is a specialized thread that manages the execution state of a workflow. It extends the base `Thread` class and provides additional functionality to handle the workflow's execution steps and sub-threads.
```python
class WorkflowThread(AgentThread):
"""A thread that manages the execution state of a workflow."""
# ----------- Execution state management -----------
# TBD
# ----------- Lifecycle management -----------
async def create_sub_thread(self, agent: Agent, key: str) -> "AgentThread":
"""Create a sub-thread for the given agent with the given key."""
pass
async def delete_sub_thread(self, key: str) -> None:
"""Delete the sub-thread with the given key."""
pass
async def get_sub_thread(self, key: str) -> "AgentThread":
"""Get the sub-thread with the given key."""
pass