feat(mcp): 新增微信数据 MCP 服务

新增 MCP JSON-RPC over HTTP 入口,支持 initialize、tools/list、tools/call、批量请求与通知响应。

注册微信账号、联系人、聊天、朋友圈、媒体、导出、年度总结、系统管理等 MCP 工具,并为结果提供 structuredContent。

增加 MCP token 鉴权,支持 Bearer、X-MCP-Token 和 query token。
This commit is contained in:
2977094657
2026-06-11 14:58:09 +08:00
Unverified
parent 9be38589f2
commit d74e8415f8
7 changed files with 2422 additions and 0 deletions
+2
View File
@@ -31,6 +31,7 @@ from .routers.admin import router as _admin_router
from .routers.account_archive_export import router as _account_archive_export_router
from .routers.keys import router as _keys_router
from .routers.media import router as _media_router
from .routers.mcp import router as _mcp_router
from .routers.sns import router as _sns_router
from .routers.sns_export import router as _sns_export_router
from .routers.wechat_detection import router as _wechat_detection_router
@@ -73,6 +74,7 @@ app.include_router(_import_decrypted_router)
app.include_router(_decrypt_router)
app.include_router(_keys_router)
app.include_router(_media_router)
app.include_router(_mcp_router)
app.include_router(_chat_router)
app.include_router(_chat_contacts_router)
app.include_router(_chat_export_router)
+2
View File
@@ -0,0 +1,2 @@
"""MCP integration for WeChatDataAnalysis."""
+35
View File
@@ -0,0 +1,35 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from fastapi import HTTPException
JSONRPC_PARSE_ERROR = -32700
JSONRPC_INVALID_REQUEST = -32600
JSONRPC_METHOD_NOT_FOUND = -32601
JSONRPC_INVALID_PARAMS = -32602
JSONRPC_INTERNAL_ERROR = -32603
@dataclass
class McpError(Exception):
code: int
message: str
data: Any = None
def error_from_exception(exc: Exception) -> McpError:
if isinstance(exc, McpError):
return exc
if isinstance(exc, HTTPException):
return McpError(
JSONRPC_INVALID_PARAMS if int(exc.status_code or 500) < 500 else JSONRPC_INTERNAL_ERROR,
str(exc.detail or "HTTP error"),
{"httpStatus": int(exc.status_code or 500)},
)
if isinstance(exc, (ValueError, TypeError, KeyError)):
return McpError(JSONRPC_INVALID_PARAMS, str(exc) or "Invalid params")
return McpError(JSONRPC_INTERNAL_ERROR, "Internal error", {"detail": str(exc)})
+102
View File
@@ -0,0 +1,102 @@
from __future__ import annotations
from typing import Any
from .errors import (
JSONRPC_INVALID_REQUEST,
JSONRPC_METHOD_NOT_FOUND,
JSONRPC_PARSE_ERROR,
McpError,
error_from_exception,
)
from .registry import McpToolContext, McpToolRegistry
PROTOCOL_VERSION = "2025-06-18"
def jsonrpc_result(request_id: Any, result: Any) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": request_id, "result": result}
def jsonrpc_error(request_id: Any, error: McpError) -> dict[str, Any]:
payload: dict[str, Any] = {"code": error.code, "message": error.message}
if error.data is not None:
payload["data"] = error.data
return {"jsonrpc": "2.0", "id": request_id, "error": payload}
def initialize_result() -> dict[str, Any]:
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "wechat-data-analysis-mcp", "version": "1.0.0"},
"instructions": (
"Use this MCP server to inspect local WeChatDataAnalysis data. "
"Prefer resolve tools before broad message queries. Keep list limits small and expand details only when needed."
),
}
async def handle_jsonrpc_payload(payload: Any, registry: McpToolRegistry, context: McpToolContext) -> Any:
if isinstance(payload, list):
if not payload:
return jsonrpc_error(None, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
responses = []
for item in payload:
response = await handle_jsonrpc_request(item, registry, context)
if response is not None:
responses.append(response)
return responses or None
return await handle_jsonrpc_request(payload, registry, context)
async def handle_jsonrpc_request(request_obj: Any, registry: McpToolRegistry, context: McpToolContext) -> dict[str, Any] | None:
if not isinstance(request_obj, dict):
return jsonrpc_error(None, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
if "method" not in request_obj:
if request_obj.get("jsonrpc") == "2.0" and ("result" in request_obj or "error" in request_obj):
return None
return jsonrpc_error(request_obj.get("id"), McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
request_id = request_obj.get("id")
is_notification = "id" not in request_obj
method_value = request_obj.get("method")
method = method_value.strip() if isinstance(method_value, str) else ""
if request_obj.get("jsonrpc") != "2.0" or not method:
return None if is_notification else jsonrpc_error(request_id, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
if is_notification and method == "notifications/initialized":
return None
try:
params = request_obj.get("params")
if method == "initialize":
result = initialize_result()
elif method == "ping":
result = {}
elif method == "tools/list":
params_dict = params if isinstance(params, dict) else {}
result = registry.list_tools(
cursor=params_dict.get("cursor"),
limit=params_dict.get("limit"),
)
elif method == "tools/call":
if not isinstance(params, dict):
raise ValueError("params is required.")
name = str(params.get("name") or "").strip()
if not name:
raise ValueError("Tool name is required.")
arguments = params["arguments"] if "arguments" in params else {}
result = await registry.call_tool(name, arguments, context)
elif registry.has_tool(method):
result = await registry.call_tool(method, {} if params is None else params, context)
else:
raise McpError(JSONRPC_METHOD_NOT_FOUND, "Method not found")
return None if is_notification else jsonrpc_result(request_id, result)
except Exception as exc:
return None if is_notification else jsonrpc_error(request_id, error_from_exception(exc))
def parse_error_response() -> dict[str, Any]:
return jsonrpc_error(None, McpError(JSONRPC_PARSE_ERROR, "Parse error"))
+158
View File
@@ -0,0 +1,158 @@
from __future__ import annotations
import inspect
import json
from dataclasses import dataclass
from typing import Any, Awaitable, Callable
from fastapi.encoders import jsonable_encoder
from .errors import JSONRPC_METHOD_NOT_FOUND, McpError
ToolHandler = Callable[[dict[str, Any], "McpToolContext"], Any | Awaitable[Any]]
@dataclass(frozen=True)
class McpToolContext:
request: Any
@property
def base_url(self) -> str:
try:
return str(self.request.base_url).rstrip("/")
except Exception:
return ""
@dataclass(frozen=True)
class McpTool:
name: str
description: str
input_schema: dict[str, Any]
handler: ToolHandler
package: str = "wechat"
annotations: dict[str, Any] | None = None
def to_public_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
if self.annotations:
payload["annotations"] = self.annotations
return payload
class McpToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, McpTool] = {}
def register(self, tool: McpTool) -> None:
if not tool.name:
raise ValueError("Tool name is required.")
if tool.name in self._tools:
raise ValueError(f"Duplicate MCP tool: {tool.name}")
self._tools[tool.name] = tool
def tool_names(self) -> list[str]:
return sorted(self._tools)
def list_tools(self, *, cursor: str | None = None, limit: int | None = None) -> dict[str, Any]:
names = sorted(self._tools)
start = 0
if cursor:
try:
start = int(cursor)
except Exception as exc:
raise ValueError("Invalid cursor.") from exc
if start < 0 or start > len(names):
raise ValueError("Invalid cursor.")
if limit is None:
page_names = names[start:]
next_cursor = None
else:
page_size = max(1, min(100, int(limit)))
page_names = names[start : start + page_size]
next_index = start + page_size
next_cursor = str(next_index) if next_index < len(names) else None
payload: dict[str, Any] = {
"tools": [self._tools[name].to_public_dict() for name in page_names],
"count": len(page_names),
"total": len(names),
}
if next_cursor is not None:
payload["nextCursor"] = next_cursor
return payload
def has_tool(self, name: str) -> bool:
return name in self._tools
async def call_tool(self, name: str, arguments: Any, context: McpToolContext) -> dict[str, Any]:
tool = self._tools.get(str(name or "").strip())
if tool is None:
raise McpError(JSONRPC_METHOD_NOT_FOUND, f"Unknown tool: {name}")
if arguments is None:
args: dict[str, Any] = {}
elif isinstance(arguments, dict):
args = dict(arguments)
else:
raise ValueError("Tool arguments must be an object.")
result = tool.handler(args, context)
if inspect.isawaitable(result):
result = await result
encoded = jsonable_encoder(result)
text = json.dumps(encoded, ensure_ascii=False, indent=2)
is_error = isinstance(encoded, dict) and str(encoded.get("status") or "").lower() == "error"
return {
"content": [{"type": "text", "text": text}],
"structuredContent": encoded,
"isError": is_error,
}
def object_schema(
properties: dict[str, Any] | None = None,
*,
required: list[str] | None = None,
additional_properties: bool = False,
) -> dict[str, Any]:
schema: dict[str, Any] = {
"type": "object",
"properties": properties or {},
"additionalProperties": additional_properties,
}
if required:
schema["required"] = required
return schema
def string_schema(description: str, *, enum: list[str] | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "string", "description": description}
if enum:
out["enum"] = enum
return out
def int_schema(description: str, *, minimum: int | None = None, maximum: int | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "integer", "description": description}
if minimum is not None:
out["minimum"] = minimum
if maximum is not None:
out["maximum"] = maximum
return out
def bool_schema(description: str, *, default: bool | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "boolean", "description": description}
if default is not None:
out["default"] = default
return out
def array_schema(description: str, items: dict[str, Any]) -> dict[str, Any]:
return {"type": "array", "description": description, "items": items}
File diff suppressed because it is too large Load Diff
+119
View File
@@ -0,0 +1,119 @@
from __future__ import annotations
import hmac
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from ..mcp.protocol import handle_jsonrpc_payload, parse_error_response
from ..mcp.registry import McpToolContext
from ..mcp.tools import MCP_REGISTRY
from ..path_fix import PathFixRoute
from ..runtime_settings import ensure_mcp_token
router = APIRouter(route_class=PathFixRoute)
PROJECT_ROOT = Path(__file__).resolve().parents[3]
SKILL_ROOT = PROJECT_ROOT / "skills" / "wechat-mcp-copilot"
def _extract_mcp_token(request: Request) -> str:
auth = str(request.headers.get("authorization") or "").strip()
if auth.lower().startswith("bearer "):
return auth[7:].strip()
header_token = str(request.headers.get("x-mcp-token") or "").strip()
if header_token:
return header_token
return str(request.query_params.get("token") or "").strip()
def _mcp_unauthorized() -> JSONResponse:
return JSONResponse(
{"status": "error", "message": "Invalid or missing MCP token."},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
def _verify_mcp_token(request: Request) -> bool:
expected, _ = ensure_mcp_token()
provided = _extract_mcp_token(request)
if not expected or not provided:
return False
return hmac.compare_digest(provided, expected)
def _read_skill_bundle() -> dict[str, Any]:
entry_path = SKILL_ROOT / "SKILL.md"
if not entry_path.is_file():
return {
"status": "error",
"message": "Skill not found.",
"path": str(entry_path),
}
references = []
bundle_parts = [entry_path.read_text(encoding="utf-8")]
references_dir = SKILL_ROOT / "references"
if references_dir.is_dir():
for ref_path in sorted(references_dir.glob("*.md")):
content = ref_path.read_text(encoding="utf-8")
rel_path = ref_path.relative_to(SKILL_ROOT).as_posix()
references.append({"path": rel_path, "content": content})
bundle_parts.append(f"\n\n---\n# {rel_path}\n\n{content}")
return {
"status": "success",
"name": "wechat-mcp-copilot",
"version": "1.0.0",
"entry": "SKILL.md",
"entryContent": bundle_parts[0],
"references": references,
"bundleText": "".join(bundle_parts),
}
@router.get("/mcp", summary="MCP endpoint")
async def mcp_get(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
return PlainTextResponse("Use POST with JSON-RPC 2.0.", status_code=405, headers={"Allow": "POST"})
@router.get("/mcp/skill/bundle", summary="MCP skill bundle")
async def mcp_skill_bundle(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
payload = _read_skill_bundle()
status_code = 200 if payload.get("status") == "success" else 404
return JSONResponse(payload, status_code=status_code)
@router.get("/mcp/skill", summary="MCP skill text")
async def mcp_skill_text(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
payload = _read_skill_bundle()
if payload.get("status") != "success":
return PlainTextResponse(str(payload.get("message") or "Skill not found."), status_code=404)
return PlainTextResponse(str(payload.get("bundleText") or ""), media_type="text/markdown; charset=utf-8")
@router.post("/mcp", summary="MCP JSON-RPC endpoint")
async def mcp_post(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
try:
payload: Any = await request.json()
except Exception:
return JSONResponse(parse_error_response(), status_code=400)
result = await handle_jsonrpc_payload(payload, MCP_REGISTRY, McpToolContext(request=request))
if result is None:
return PlainTextResponse("", status_code=202)
return JSONResponse(result)