improvement(mcp): 收窄外部工具为只读访问

This commit is contained in:
2977094657
2026-06-12 16:44:12 +08:00
Unverified
parent ff556e3d9a
commit b221b6f55c
2 changed files with 330 additions and 812 deletions
+200 -89
View File
@@ -1,8 +1,9 @@
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
from unittest.mock import patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
@@ -14,6 +15,75 @@ sys.path.insert(0, str(ROOT / "src"))
class TestMcpRouter(unittest.TestCase):
TEST_TOKEN = "test-mcp-token-1234567890"
REMOVED_MCP_TOOLS = {
"wechat.setup.get_saved_keys",
"wechat.setup.get_database_key",
"wechat.setup.get_image_key",
"wechat.setup.decrypt_databases",
"wechat.setup.get_decrypt_stream_url",
"wechat.setup.preview_import_decrypted",
"wechat.setup.get_import_decrypted_stream_url",
"wechat.setup.cancel_import_decrypted",
"wechat.setup.save_media_keys",
"wechat.setup.decrypt_all_media",
"wechat.setup.get_decrypt_all_media_stream_url",
"wechat.setup.get_download_all_emojis_stream_url",
"wechat.contacts.export_contacts",
"wechat.chat.get_realtime_status",
"wechat.chat.sync_realtime_session",
"wechat.chat.sync_realtime_all_sessions",
"wechat.chat.get_realtime_events_url",
"wechat.moments.sync_latest",
"wechat.editing.list_edited_sessions",
"wechat.editing.list_edited_messages",
"wechat.editing.get_message_edit_status",
"wechat.editing.edit_message",
"wechat.editing.repair_message_sender",
"wechat.editing.flip_message_direction",
"wechat.editing.reset_message_edit",
"wechat.editing.reset_session_edits",
"wechat.export.preview_chat_targets",
"wechat.export.create_chat_export",
"wechat.export.list_chat_exports",
"wechat.export.get_chat_export",
"wechat.export.cancel_chat_export",
"wechat.export.get_chat_export_download",
"wechat.export.get_chat_export_events_url",
"wechat.export.create_moments_export",
"wechat.export.list_moments_exports",
"wechat.export.get_moments_export",
"wechat.export.cancel_moments_export",
"wechat.export.get_moments_export_download",
"wechat.export.get_moments_export_events_url",
"wechat.export.create_account_archive",
"wechat.export.get_account_archive",
"wechat.export.cancel_account_archive",
"wechat.export.get_account_archive_download",
"wechat.mobile.export_job",
"wechat.admin.detect_wechat_installation",
"wechat.admin.get_current_wechat_account",
"wechat.admin.get_wechat_runtime_status",
"wechat.admin.delete_account_data",
"wechat.system.api_root",
"wechat.system.health_check",
"wechat.system.get_backend_log_file",
"wechat.system.open_backend_log_file",
"wechat.system.log_frontend_server_error",
"wechat.system.get_backend_port",
"wechat.system.set_backend_port_setting",
"wechat.system.set_backend_port_and_restart",
"wechat.system.get_mcp_lan_access",
"wechat.system.set_mcp_lan_access",
"wechat.system.get_img_helper_status",
"wechat.system.toggle_img_helper",
"wechat.system.pick_directory",
"wechat.chat.get_search_index_status",
"wechat.chat.build_search_index",
"wechat.chat.get_session_last_message_cache_status",
"wechat.chat.build_session_last_message_cache",
"wechat.media.download_chat_emoji",
"wechat.media.open_chat_media_folder",
}
def setUp(self):
self._old_mcp_token = os.environ.get("WECHAT_TOOL_MCP_TOKEN")
@@ -59,13 +129,11 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("wechat.chat.list_search_senders", names)
self.assertIn("wechat.chat.resolve_chat_history", names)
self.assertIn("wechat.chat.resolve_app_message", names)
self.assertIn("wechat.contacts.export_contacts", names)
self.assertIn("wechat.export.create_chat_export", names)
self.assertIn("wechat.export.get_account_archive_download", names)
self.assertIn("wechat.moments.get_remote_video_url", names)
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
def test_mcp_requires_token(self):
client = self._client(auth=False)
@@ -122,6 +190,11 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("bundleText", payload)
self.assertIn("WeChat MCP Copilot", payload["bundleText"])
self.assertTrue(any(ref["path"] == "references/mobile.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/system.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/setup-system.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/export.md" for ref in payload["references"]))
for tool_name in self.REMOVED_MCP_TOOLS:
self.assertNotIn(tool_name, payload["bundleText"])
def test_skill_text_can_be_loaded_over_http(self):
client = self._client()
@@ -292,6 +365,26 @@ class TestMcpRouter(unittest.TestCase):
payload = resp.json()
self.assertEqual(payload["error"]["code"], -32601)
def test_removed_mcp_tools_are_not_listed_or_callable(self):
client = self._client()
tools_resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(tools_resp.status_code, 200)
names = {tool["name"] for tool in tools_resp.json()["result"]["tools"]}
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
for tool_name in sorted(self.REMOVED_MCP_TOOLS):
with self.subTest(tool_name=tool_name):
direct_resp = client.post("/mcp", json=self._rpc(tool_name, {}))
call_resp = client.post(
"/mcp",
json=self._rpc("tools/call", {"name": tool_name, "arguments": {}}),
)
self.assertEqual(direct_resp.status_code, 200)
self.assertEqual(call_resp.status_code, 200)
self.assertEqual(direct_resp.json()["error"]["code"], -32601)
self.assertEqual(call_resp.json()["error"]["code"], -32601)
def test_missing_tool_name_returns_invalid_params(self):
client = self._client()
@@ -347,6 +440,8 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("/api/sns/media?", moments["url"])
self.assertEqual(moments["params"]["post_id"], "post-a")
self.assertEqual(moments["params"]["media_id"], "media-a")
self.assertNotIn("use_cache", moments["params"])
self.assertNotIn("use_cache", moments["url"])
def test_completed_mcp_packages_and_mobile_facade_are_listed(self):
client = self._client()
@@ -357,95 +452,36 @@ class TestMcpRouter(unittest.TestCase):
tools = resp.json()["result"]["tools"]
names = {tool["name"] for tool in tools}
expected = {
"wechat.setup.get_saved_keys",
"wechat.setup.decrypt_databases",
"wechat.setup.get_decrypt_stream_url",
"wechat.setup.preview_import_decrypted",
"wechat.setup.get_decrypt_all_media_stream_url",
"wechat.system.health_check",
"wechat.system.get_backend_port",
"wechat.system.get_mcp_lan_access",
"wechat.system.set_mcp_lan_access",
"wechat.system.get_img_helper_status",
"wechat.system.open_backend_log_file",
"wechat.system.pick_directory",
"wechat.system.set_backend_port_and_restart",
"wechat.media.get_decrypted_resource_url",
"wechat.media.get_proxy_image_url",
"wechat.media.get_favicon_url",
"wechat.media.open_chat_media_folder",
"wechat.export.get_chat_export_events_url",
"wechat.export.get_moments_export_events_url",
"wechat.chat.get_realtime_events_url",
"wechat.admin.delete_account_data",
"wechat.mobile.get_overview",
"wechat.mobile.resolve_target",
"wechat.mobile.search_chat",
"wechat.mobile.get_chat_context",
"wechat.mobile.search_moments",
"wechat.mobile.get_media_links",
"wechat.mobile.export_job",
}
self.assertTrue(expected.issubset(names))
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
packages = {tool["annotations"]["package"] for tool in tools}
self.assertTrue({"wechat.setup", "wechat.system", "wechat.mobile"}.issubset(packages))
self.assertTrue({"wechat.core", "wechat.mobile", "wechat.media"}.issubset(packages))
self.assertFalse({"wechat.setup", "wechat.export", "wechat.editing", "wechat.system", "wechat.admin"} & packages)
def test_new_url_helpers_return_urls_and_params(self):
client = self._client()
checks = [
(
"wechat.setup.get_decrypt_stream_url",
{"key": "a" * 64, "db_storage_path": r"D:\WeChat\db_storage"},
"streamUrl",
"/api/decrypt_stream?",
),
(
"wechat.setup.get_import_decrypted_stream_url",
{"import_path": r"D:\backup\wxid_a", "job_id": "job-1"},
"streamUrl",
"/api/import_decrypted?",
),
(
"wechat.setup.get_decrypt_all_media_stream_url",
{"account": "wxid_a", "xor_key": "0xA5", "concurrency": 3},
"streamUrl",
"/api/media/decrypt_all_stream?",
),
(
"wechat.setup.get_download_all_emojis_stream_url",
{"account": "wxid_a", "force": True, "concurrency": 4},
"streamUrl",
"/api/media/emoji/download_all_stream?",
),
(
"wechat.media.get_decrypted_resource_url",
{"account": "wxid_a", "md5": "a" * 32},
"url",
"/api/media/resource/",
),
(
"wechat.chat.get_realtime_events_url",
{"account": "wxid_a", "interval_ms": 300},
"streamUrl",
"/api/chat/realtime/stream?",
),
(
"wechat.export.get_chat_export_events_url",
{"export_id": "exp-1"},
"streamUrl",
"/api/chat/exports/exp-1/events",
),
(
"wechat.export.get_moments_export_events_url",
{"export_id": "sns-1"},
"streamUrl",
"/api/sns/exports/sns-1/events",
),
]
for tool_name, args, url_key, path_part in checks:
@@ -456,26 +492,82 @@ class TestMcpRouter(unittest.TestCase):
self.assertEqual(structured["status"], "success")
self.assertIn(path_part, structured[url_key])
def test_setup_and_system_wrappers_call_underlying_router(self):
def test_exposed_mcp_tools_are_read_only(self):
client = self._client()
keys_router = Mock()
keys_router.get_saved_keys = AsyncMock(return_value={"status": "success", "keys": {"db_key": "k"}})
system_router = Mock()
system_router.get_img_helper_status = AsyncMock(return_value={"enabled": False})
with patch("wechat_decrypt_tool.mcp.tools._keys_router", return_value=keys_router), patch(
"wechat_decrypt_tool.mcp.tools._system_router", return_value=system_router
):
keys_resp = client.post(
"/mcp",
json=self._rpc("wechat.setup.get_saved_keys", {"account": "wxid_a", "wxid_dir": r"D:\WeChat\wxid_a"}),
)
helper_resp = client.post("/mcp", json=self._rpc("wechat.system.get_img_helper_status"))
resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(keys_resp.status_code, 200)
self.assertEqual(keys_resp.json()["result"]["structuredContent"]["keys"]["db_key"], "k")
keys_router.get_saved_keys.assert_awaited_once_with(account="wxid_a", db_storage_path=None, wxid_dir=r"D:\WeChat\wxid_a")
self.assertEqual(helper_resp.json()["result"]["structuredContent"], {"enabled": False})
self.assertEqual(resp.status_code, 200)
tools = resp.json()["result"]["tools"]
self.assertTrue(tools)
for tool in tools:
with self.subTest(tool_name=tool["name"]):
annotations = tool.get("annotations") or {}
self.assertTrue(annotations.get("readOnlyHint"))
self.assertFalse(annotations.get("destructiveHint"))
def test_analytics_schema_does_not_expose_refresh(self):
client = self._client()
resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(resp.status_code, 200)
tools = {tool["name"]: tool for tool in resp.json()["result"]["tools"]}
for tool_name in [
"wechat.analytics.get_wrapped_meta",
"wechat.analytics.get_wrapped_card",
"wechat.analytics.get_wrapped_annual",
]:
with self.subTest(tool_name=tool_name):
properties = tools[tool_name]["inputSchema"].get("properties") or {}
self.assertNotIn("refresh", properties)
def test_analytics_tools_are_cache_only(self):
client = self._client()
class FakeWrappedService:
_CACHE_VERSION = 26
_IMPLEMENTED_UPTO_ID = 7
_WRAPPED_CARD_MANIFEST = ({"id": 0, "title": "Overview"},)
@staticmethod
def _default_year():
return 2025
def build_wrapped_annual_meta(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped meta.")
def build_wrapped_annual_card(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped card.")
def build_wrapped_annual_response(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped annual data.")
with tempfile.TemporaryDirectory() as tmp:
account_dir = Path(tmp) / "wxid_a"
account_dir.mkdir()
with patch("wechat_decrypt_tool.mcp.tools._resolve_account_dir", return_value=account_dir), patch(
"wechat_decrypt_tool.mcp.tools._wrapped_service", return_value=FakeWrappedService()
):
card_resp = client.post(
"/mcp",
json=self._rpc("wechat.analytics.get_wrapped_card", {"account": "wxid_a", "year": 2025, "card_id": 0}),
)
annual_resp = client.post(
"/mcp",
json=self._rpc("wechat.analytics.get_wrapped_annual", {"account": "wxid_a", "year": 2025}),
)
self.assertFalse((account_dir / "_wrapped" / "cache").exists())
for resp in [card_resp, annual_resp]:
self.assertEqual(resp.status_code, 200)
result = resp.json()["result"]
self.assertTrue(result["isError"])
structured = result["structuredContent"]
self.assertEqual(structured["status"], "error")
self.assertTrue(structured["cacheOnly"])
self.assertEqual(structured["message"], "Wrapped cache not found. Open the app to generate it first.")
def test_mobile_overview_uses_compact_facade(self):
client = self._client()
@@ -486,12 +578,6 @@ class TestMcpRouter(unittest.TestCase):
), patch(
"wechat_decrypt_tool.mcp.tools._list_sessions",
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
), patch(
"wechat_decrypt_tool.mcp.tools._chat_realtime_status", return_value={"status": "success", "available": True}
), patch(
"wechat_decrypt_tool.mcp.tools._search_index_status", return_value={"status": "ready"}
), patch(
"wechat_decrypt_tool.mcp.tools._session_last_message_status", return_value={"status": "ready"}
):
resp = client.post(
"/mcp",
@@ -504,6 +590,31 @@ class TestMcpRouter(unittest.TestCase):
self.assertTrue(structured["ready"])
self.assertEqual(structured["defaultAccount"], "wxid_a")
self.assertIn("wechat.mobile.search_chat", structured["suggestedTools"])
self.assertNotIn("wechat.mobile.export_job", structured["suggestedTools"])
self.assertNotIn("realtime", structured["health"])
self.assertNotIn("indexes", structured["health"])
def test_mobile_overview_does_not_expose_realtime_status(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_a"]), patch(
"wechat_decrypt_tool.mcp.tools._get_account_info",
return_value={"status": "success", "account": "wxid_a", "databaseCount": 3},
), patch(
"wechat_decrypt_tool.mcp.tools._list_sessions",
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
):
resp = client.post(
"/mcp",
json=self._rpc("wechat.mobile.get_overview", {"account": "wxid_a", "session_limit": 5}),
)
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertNotIn("error", payload)
structured = payload["result"]["structuredContent"]
self.assertNotIn("realtime", structured["health"])
self.assertNotIn("indexes", structured["health"])
def test_mobile_resolve_target_normalizes_candidates(self):
client = self._client()