Python: feat: Add ChatKit integration with a sample application (#1273)

* feat: Add ChatKit integration with a new frontend application

- Created a new frontend application using React and Vite for the ChatKit integration.
- Added essential files including package.json, vite.config.ts, and Tailwind CSS configuration.
- Implemented core components: App, Home, ChatKitPanel, ThemeToggle, and hooks for color scheme management.
- Established SQLite-based store implementation for ChatKit data persistence in store.py.
- Integrated theme toggling functionality for light and dark modes.
- Set up ESLint and TypeScript configurations for better development experience.

* git ignore

* fix mypy

* add mising file

* minimal frontend for chatkit sample

* update ignore files

* version

* set python version lowerbound on chatkit

* update project settings for chatkit

* update setup

* update setup

* update setup

* update setup

* weather widget

* add select city widget sample

* remove widget helper

* update chatkit to include file attachments and cover more thread item types

* update readme with mermaid diagram

* update diagram

* update instructions

* update chatkit dependency

* fix converter imports

* move to demos/

* move to demos/ -- rename references

* support multiple session instead of using global variable in sample

* support chunk streaming

* fix tests

* Update python/samples/demos/chatkit-integration/store.py

Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>

* use local host

---------

Co-authored-by: Evan Mattson <35585003+moonbox3@users.noreply.github.com>
This commit is contained in:
Eric Zhu
2025-11-04 18:11:40 -08:00
committed by GitHub
Unverified
parent bbde248839
commit 0c862e97a6
33 changed files with 4972 additions and 2 deletions
@@ -0,0 +1,25 @@
# Copyright (c) Microsoft. All rights reserved.
"""Agent Framework and ChatKit Integration.
This package provides an integration layer between Microsoft Agent Framework
and OpenAI ChatKit (Python). It mirrors the Agent SDK integration and provides
helpers to convert between Agent Framework and ChatKit types.
"""
import importlib.metadata
from ._converter import ThreadItemConverter, simple_to_agent_input
from ._streaming import stream_agent_response
try:
__version__ = importlib.metadata.version(__name__)
except importlib.metadata.PackageNotFoundError:
__version__ = "0.0.0" # Fallback for development mode
__all__ = [
"ThreadItemConverter",
"__version__",
"simple_to_agent_input",
"stream_agent_response",
]
@@ -0,0 +1,603 @@
# Copyright (c) Microsoft. All rights reserved.
"""Converter utilities for converting ChatKit thread items to Agent Framework messages."""
import logging
import sys
from collections.abc import Awaitable, Callable, Sequence
if sys.version_info >= (3, 11):
from typing import assert_never
else:
from typing_extensions import assert_never
from agent_framework import (
ChatMessage,
DataContent,
FunctionCallContent,
FunctionResultContent,
Role,
TextContent,
UriContent,
)
from chatkit.types import (
AssistantMessageItem,
Attachment,
ClientToolCallItem,
EndOfTurnItem,
HiddenContextItem,
ImageAttachment,
TaskItem,
ThreadItem,
UserMessageItem,
UserMessageTagContent,
UserMessageTextContent,
WidgetItem,
WorkflowItem,
)
logger = logging.getLogger(__name__)
class ThreadItemConverter:
"""Helper class to convert ChatKit thread items to Agent Framework ChatMessage objects.
This class provides a base implementation for converting ChatKit thread items
to Agent Framework messages. It can be extended to handle attachments,
@-mentions, hidden context items, and custom thread item formats.
Args:
attachment_data_fetcher: Optional async function to fetch attachment binary data.
If provided, it should take an attachment ID and return the binary data as bytes.
If not provided, attachments will be converted to UriContent using available URLs.
"""
def __init__(
self,
attachment_data_fetcher: Callable[[str], Awaitable[bytes]] | None = None,
) -> None:
"""Initialize the converter.
Args:
attachment_data_fetcher: Optional async function to fetch attachment data by ID.
"""
self.attachment_data_fetcher = attachment_data_fetcher
async def user_message_to_input(
self, item: UserMessageItem, is_last_message: bool = True
) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit UserMessageItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how user messages are converted.
Args:
item: The ChatKit user message item to convert.
is_last_message: Whether this is the last message in the thread (used for quoted_text handling).
Returns:
A ChatMessage, list of messages, or None to skip.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
"""
# Extract text content from the user message
text_content = ""
if item.content:
for content_part in item.content:
if isinstance(content_part, UserMessageTextContent):
text_content += content_part.text
# Convert attachments to DataContent or UriContent
data_contents: list[DataContent | UriContent] = []
if item.attachments:
for attachment in item.attachments:
content = await self.attachment_to_message_content(attachment)
if content is not None:
data_contents.append(content)
# Create the message with text and attachments
if not text_content.strip() and not data_contents:
return None
# If only text and no attachments, use text parameter for simplicity
if text_content.strip() and not data_contents:
user_message = ChatMessage(role=Role.USER, text=text_content.strip())
else:
# Build contents list with both text and attachments
contents: list[TextContent | DataContent | UriContent] = []
if text_content.strip():
contents.append(TextContent(text=text_content.strip()))
contents.extend(data_contents)
user_message = ChatMessage(role=Role.USER, contents=contents)
# Handle quoted text if this is the last message
messages = [user_message]
if item.quoted_text and is_last_message:
quoted_context = ChatMessage(
role=Role.USER,
text=f"The user is referring to this in particular:\n{item.quoted_text}",
)
# Prepend quoted context before the main message
messages.insert(0, quoted_context)
return messages
async def attachment_to_message_content(self, attachment: Attachment) -> DataContent | UriContent | None:
"""Convert a ChatKit attachment to Agent Framework content.
This method is called internally by `user_message_to_input()` to handle attachments.
Override this method to customize attachment handling for your storage backend.
The default implementation provides two strategies:
1. If an attachment_data_fetcher was provided, it fetches the binary data
and creates a DataContent object
2. Otherwise, for ImageAttachment with preview_url, it creates a UriContent object
For FileAttachment without a data fetcher, returns None (attachment is skipped).
Args:
attachment: The ChatKit attachment to convert (FileAttachment or ImageAttachment).
Returns:
DataContent if binary data is available, UriContent if only URL is available,
or None if the attachment cannot be converted.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types including attachments within user messages.
Examples:
.. code-block:: python
# With data fetcher
async def fetch_data(attachment_id: str) -> bytes:
return await my_storage.get_file(attachment_id)
converter = ThreadItemConverter(attachment_data_fetcher=fetch_data)
messages = await converter.to_agent_input(thread_items)
# Without data fetcher (uses URLs for images)
converter = ThreadItemConverter()
messages = await converter.to_agent_input(thread_items)
"""
# If we have a data fetcher, use it to get binary data
if self.attachment_data_fetcher is not None:
try:
data = await self.attachment_data_fetcher(attachment.id)
return DataContent(data=data, media_type=attachment.mime_type)
except Exception as e:
# If fetch fails, fall through to URL-based approach
logger.debug(f"Failed to fetch attachment data for {attachment.id}: {e}")
# For ImageAttachment, try to use preview_url
if isinstance(attachment, ImageAttachment) and attachment.preview_url:
return UriContent(uri=str(attachment.preview_url), media_type=attachment.mime_type)
# For FileAttachment without data fetcher, skip the attachment
# Subclasses can override this method to provide custom handling
return None
def hidden_context_to_input(self, item: HiddenContextItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit HiddenContextItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how hidden context is converted.
The default implementation wraps the hidden context in XML tags and returns
a system message. This allows the model to distinguish hidden context from
regular conversation.
Args:
item: The ChatKit hidden context item to convert.
Returns:
A ChatMessage with system role, a list of messages, or None to skip.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
Examples:
.. code-block:: python
# Default behavior
converter = ThreadItemConverter()
hidden_item = HiddenContextItem(
id="ctx_1",
thread_id="thread_1",
created_at=datetime.now(),
content="User's email: user@example.com",
)
message = converter.hidden_context_to_input(hidden_item)
# Returns: ChatMessage(role=SYSTEM, text="<HIDDEN_CONTEXT>User's email: ...</HIDDEN_CONTEXT>")
"""
return ChatMessage(role=Role.SYSTEM, text=f"<HIDDEN_CONTEXT>{item.content}</HIDDEN_CONTEXT>")
def tag_to_message_content(self, tag: UserMessageTagContent) -> TextContent:
"""Convert a ChatKit tag (@-mention) to Agent Framework content.
This method is called internally by `user_message_to_input()` to handle tags.
Override this method to customize tag conversion for your application.
The default implementation extracts the tag's display name and wraps it in
XML tags to provide context to the model about the @-mention.
Args:
tag: The ChatKit tag content to convert.
Returns:
TextContent with the tag information.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types including tags within user messages.
Examples:
.. code-block:: python
# Default behavior
converter = ThreadItemConverter()
tag = UserMessageTagContent(
type="input_tag", id="tag_1", text="john", data={"name": "John Doe"}, interactive=False
)
content = converter.tag_to_message_content(tag)
# Returns: TextContent(text="<TAG>Name:John Doe</TAG>")
"""
name = getattr(tag.data, "name", tag.text if hasattr(tag, "text") else "unknown")
return TextContent(text=f"<TAG>Name:{name}</TAG>")
def task_to_input(self, item: TaskItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit TaskItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how tasks are converted.
The default implementation converts custom tasks with title/content into
a user message explaining what task was displayed to the user.
Args:
item: The ChatKit task item to convert.
Returns:
A ChatMessage, a list of messages, or None to skip the task.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
Examples:
.. code-block:: python
# Task with both title and content
from chatkit.types import Task
task_item = TaskItem(
id="task_1",
thread_id="thread_1",
created_at=datetime.now(),
task=Task(type="custom", title="Data Analysis", content="Analyzed sales data"),
)
message = converter.task_to_input(task_item)
# Returns message explaining the task was performed
"""
if item.task.type != "custom" or (not item.task.title and not item.task.content):
return None
title = item.task.title or ""
content = item.task.content or ""
task_text = f"{title}: {content}" if title and content else title or content
text = (
f"A message was displayed to the user that the following task was performed:\n<Task>\n{task_text}\n</Task>"
)
return ChatMessage(role=Role.USER, text=text)
def workflow_to_input(self, item: WorkflowItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit WorkflowItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how workflows are converted.
The default implementation converts each custom task in the workflow into
a separate user message explaining what tasks were performed.
Args:
item: The ChatKit workflow item to convert.
Returns:
A list of ChatMessages (one per task), a single message, or None to skip.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
Examples:
.. code-block:: python
# Workflow with multiple tasks
from chatkit.types import Workflow, Task
workflow_item = WorkflowItem(
id="wf_1",
thread_id="thread_1",
created_at=datetime.now(),
workflow=Workflow(
type="custom",
tasks=[
Task(type="custom", title="Step 1", content="Gathered data"),
Task(type="custom", title="Step 2", content="Analyzed results"),
],
),
)
messages = converter.workflow_to_input(workflow_item)
# Returns list of messages for each task
"""
messages: list[ChatMessage] = []
for task in item.workflow.tasks:
if task.type != "custom" or (not task.title and not task.content):
continue
title = task.title or ""
content = task.content or ""
task_text = f"{title}: {content}" if title and content else title or content
text = (
"A message was displayed to the user that the following task was performed:\n"
f"<Task>\n{task_text}\n</Task>"
)
messages.append(ChatMessage(role=Role.USER, text=text))
return messages if messages else None
def widget_to_input(self, item: WidgetItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit WidgetItem to Agent Framework ChatMessage(s).
This method is called internally by `to_agent_input()`. Override this method
to customize how widgets are converted.
The default implementation converts the widget to a JSON representation
and includes it in a user message, allowing the model to understand what
UI element was displayed to the user.
Args:
item: The ChatKit widget item to convert.
Returns:
A ChatMessage describing the widget, or None to skip.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
Examples:
.. code-block:: python
# Widget item
from chatkit.widgets import Card, Text
widget_item = WidgetItem(
id="widget_1",
thread_id="thread_1",
created_at=datetime.now(),
widget=Card(children=[Text(value="Hello")]),
)
message = converter.widget_to_input(widget_item)
# Returns message with JSON representation of the widget
"""
try:
widget_json = item.widget.model_dump_json(exclude_unset=True, exclude_none=True)
text = f"The following graphical UI widget (id: {item.id}) was displayed to the user:{widget_json}"
return ChatMessage(role=Role.USER, text=text)
except Exception:
# If JSON serialization fails, skip the widget
return None
async def assistant_message_to_input(self, item: AssistantMessageItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit AssistantMessageItem to Agent Framework ChatMessage(s).
The default implementation extracts text from all content parts and creates
an assistant message.
Args:
item: The ChatKit assistant message item to convert.
Returns:
A ChatMessage with assistant role, or None to skip.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
"""
# Extract text from all content parts
text_parts = [content.text for content in item.content]
if not text_parts:
return None
return ChatMessage(role=Role.ASSISTANT, text="".join(text_parts))
async def client_tool_call_to_input(self, item: ClientToolCallItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit ClientToolCallItem to Agent Framework ChatMessage(s).
The default implementation converts completed tool calls into function call
and result content.
Args:
item: The ChatKit client tool call item to convert.
Returns:
A list containing function call and result messages, or None for pending calls.
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
"""
if item.status == "pending":
# Skip pending tool calls - they cannot be sent to the model
return None
import json
# Create function call message
function_call_msg = ChatMessage(
role=Role.ASSISTANT,
contents=[
FunctionCallContent(
call_id=item.call_id,
name=item.name,
arguments=json.dumps(item.arguments),
)
],
)
# Create function result message
function_result_msg = ChatMessage(
role=Role.TOOL,
contents=[
FunctionResultContent(
call_id=item.call_id,
result=json.dumps(item.output) if item.output is not None else "",
)
],
)
return [function_call_msg, function_result_msg]
async def end_of_turn_to_input(self, item: EndOfTurnItem) -> ChatMessage | list[ChatMessage] | None:
"""Convert a ChatKit EndOfTurnItem to Agent Framework ChatMessage(s).
The default implementation skips end-of-turn markers as they are only UI hints.
Args:
item: The ChatKit end-of-turn item to convert.
Returns:
None (end-of-turn items are not converted).
Note:
Instead of calling this method directly, use `to_agent_input()` which handles
all ThreadItem types and provides proper message ordering.
"""
# End-of-turn is only used for UI hints - skip it
return None
async def _thread_item_to_input_item(
self,
item: ThreadItem,
is_last_message: bool = True,
) -> list[ChatMessage]:
"""Internal method to convert a single ThreadItem to ChatMessage(s).
Args:
item: The thread item to convert.
is_last_message: Whether this is the last item in the thread.
Returns:
A list of ChatMessage objects (may be empty).
"""
match item:
case UserMessageItem():
out = await self.user_message_to_input(item, is_last_message) or []
return out if isinstance(out, list) else [out]
case AssistantMessageItem():
out = await self.assistant_message_to_input(item) or []
return out if isinstance(out, list) else [out]
case ClientToolCallItem():
out = await self.client_tool_call_to_input(item) or []
return out if isinstance(out, list) else [out]
case EndOfTurnItem():
out = await self.end_of_turn_to_input(item) or []
return out if isinstance(out, list) else [out]
case WidgetItem():
out = self.widget_to_input(item) or []
return out if isinstance(out, list) else [out]
case WorkflowItem():
out = self.workflow_to_input(item) or []
return out if isinstance(out, list) else [out]
case TaskItem():
out = self.task_to_input(item) or []
return out if isinstance(out, list) else [out]
case HiddenContextItem():
out = self.hidden_context_to_input(item) or []
return out if isinstance(out, list) else [out]
case _:
assert_never(item)
async def to_agent_input(
self,
thread_items: Sequence[ThreadItem] | ThreadItem,
) -> list[ChatMessage]:
"""Convert ChatKit thread items to Agent Framework ChatMessages.
This is the main entry point for converting ChatKit thread items. It handles
all ThreadItem types (UserMessageItem, AssistantMessageItem, TaskItem, etc.)
and calls the appropriate conversion method for each.
Args:
thread_items: A single ThreadItem or a sequence of ThreadItems to convert.
Returns:
A list of ChatMessage objects that can be sent to an Agent Framework agent.
Examples:
.. code-block:: python
from agent_framework_chatkit import ThreadItemConverter
converter = ThreadItemConverter()
# Convert a single thread item
messages = await converter.to_agent_input(user_message_item)
# Convert multiple thread items
messages = await converter.to_agent_input([user_message_item, assistant_message_item, task_item])
# Use with agent
from agent_framework import ChatAgent
agent = ChatAgent(...)
response = await agent.run_stream(messages)
"""
thread_items = list(thread_items) if isinstance(thread_items, Sequence) else [thread_items]
output: list[ChatMessage] = []
for item in thread_items:
output.extend(
await self._thread_item_to_input_item(
item,
is_last_message=item is thread_items[-1],
)
)
return output
# Default converter instance
_DEFAULT_CONVERTER = ThreadItemConverter()
async def simple_to_agent_input(thread_items: Sequence[ThreadItem] | ThreadItem) -> list[ChatMessage]:
"""Helper function that uses the default ThreadItemConverter.
This function provides a quick way to get started with ChatKit integration
without needing to create a custom ThreadItemConverter instance.
Args:
thread_items: A single ThreadItem or a sequence of ThreadItems to convert.
Returns:
A list of ChatMessage objects that can be sent to an Agent Framework agent.
Examples:
.. code-block:: python
from agent_framework_chatkit import simple_to_agent_input
# Convert a single item
messages = await simple_to_agent_input(user_message_item)
# Convert multiple items
messages = await simple_to_agent_input([user_message_item, assistant_message_item, task_item])
"""
return await _DEFAULT_CONVERTER.to_agent_input(thread_items)
@@ -0,0 +1,104 @@
# Copyright (c) Microsoft. All rights reserved.
"""Streaming utilities for converting Agent Framework responses to ChatKit events."""
import uuid
from collections.abc import AsyncIterable, AsyncIterator, Callable
from datetime import datetime
from agent_framework import AgentRunResponseUpdate, TextContent
from chatkit.types import (
AssistantMessageContent,
AssistantMessageContentPartTextDelta,
AssistantMessageItem,
ThreadItemAddedEvent,
ThreadItemDoneEvent,
ThreadItemUpdated,
ThreadStreamEvent,
)
async def stream_agent_response(
response_stream: AsyncIterable[AgentRunResponseUpdate],
thread_id: str,
generate_id: Callable[[str], str] | None = None,
) -> AsyncIterator[ThreadStreamEvent]:
"""Convert a streamed AgentRunResponseUpdate from Agent Framework to ChatKit events.
This helper function takes a stream of AgentRunResponseUpdate objects from
a Microsoft Agent Framework agent and converts them to ChatKit ThreadStreamEvent
objects that can be consumed by the ChatKit UI.
The function supports real-time token-by-token streaming by emitting
ThreadItemUpdated events with AssistantMessageContentPartTextDelta for each
text chunk as it arrives from the agent.
Args:
response_stream: An async iterable of AgentRunResponseUpdate objects
from an Agent Framework agent.
thread_id: The ChatKit thread ID for the conversation.
generate_id: Optional function to generate IDs for ChatKit items.
If not provided, simple incremental IDs will be used.
Yields:
ThreadStreamEvent: ChatKit events representing the agent's response,
including incremental text deltas for streaming display.
"""
# Use provided ID generator or create default one
if generate_id is None:
def _default_id_generator(item_type: str) -> str:
return f"{item_type}_{uuid.uuid4().hex[:8]}"
message_id = _default_id_generator("msg")
else:
message_id = generate_id("msg")
# Track if we've started the message
message_started = False
accumulated_text = ""
content_index = 0
async for update in response_stream:
# Start the assistant message if not already started
if not message_started:
assistant_message = AssistantMessageItem(
id=message_id,
thread_id=thread_id,
type="assistant_message",
content=[],
created_at=datetime.now(),
)
yield ThreadItemAddedEvent(type="thread.item.added", item=assistant_message)
message_started = True
# Process the update content
if update.contents:
for content in update.contents:
# Handle text content - only TextContent has a text attribute
if isinstance(content, TextContent) and content.text is not None:
# Yield incremental text delta for streaming display
yield ThreadItemUpdated(
type="thread.item.updated",
item_id=message_id,
update=AssistantMessageContentPartTextDelta(
content_index=content_index,
delta=content.text,
),
)
accumulated_text += content.text
# Finalize the message
if message_started:
final_message = AssistantMessageItem(
id=message_id,
thread_id=thread_id,
type="assistant_message",
content=[AssistantMessageContent(type="output_text", text=accumulated_text, annotations=[])]
if accumulated_text
else [],
created_at=datetime.now(),
)
yield ThreadItemDoneEvent(type="thread.item.done", item=final_message)