diff --git a/src/wechat_decrypt_tool/mcp/tools.py b/src/wechat_decrypt_tool/mcp/tools.py index 44ee3c3..4d12dc0 100644 --- a/src/wechat_decrypt_tool/mcp/tools.py +++ b/src/wechat_decrypt_tool/mcp/tools.py @@ -2,15 +2,12 @@ from __future__ import annotations import json import sqlite3 -import asyncio -from pathlib import Path from typing import Any, Callable, Optional from urllib.parse import urlencode -from fastapi import BackgroundTasks, Request +from fastapi import Request from .. import __version__ as APP_VERSION -from ..chat_export_service import CHAT_EXPORT_MANAGER, get_chat_export_targets_preview from ..chat_helpers import ( _iter_message_db_paths, _list_decrypted_accounts, @@ -18,11 +15,7 @@ from ..chat_helpers import ( _resolve_account_dir, _resolve_msg_table_name, ) -from ..chat_search_index import get_chat_search_index_status, start_chat_search_index_build from ..database_filters import list_countable_database_names -from ..session_last_message import build_session_last_message_table, get_session_last_message_status -from ..sns_export_service import SNS_EXPORT_MANAGER -from ..wcdb_realtime import WCDB_REALTIME from .registry import ( McpTool, McpToolContext, @@ -68,60 +61,6 @@ def _chat_media_router(): return chat_media -def _account_archive_router(): - from ..routers import account_archive_export - - return account_archive_export - - -def _wechat_detection_router(): - from ..routers import wechat_detection - - return wechat_detection - - -def _decrypt_router(): - from ..routers import decrypt - - return decrypt - - -def _keys_router(): - from ..routers import keys - - return keys - - -def _media_router(): - from ..routers import media - - return media - - -def _import_decrypted_router(): - from ..routers import import_decrypted - - return import_decrypted - - -def _admin_router(): - from ..routers import admin - - return admin - - -def _system_router(): - from ..routers import system - - return system - - -def _health_router(): - from ..routers import health - - return health - - def _wrapped_service(): from ..wrapped import service @@ -257,22 +196,12 @@ class _JsonRequest: return self._payload -class _LoopbackRequest: - client = type("_Client", (), {"host": "127.0.0.1"})() - - def _request(ctx: McpToolContext, payload: dict[str, Any] | None = None) -> Request | _JsonRequest: if payload is None: return ctx.request return _JsonRequest(payload, base_url=(ctx.base_url + "/") if ctx.base_url else "http://127.0.0.1/") -def _job_payload(job: Any) -> dict[str, Any]: - if job is None: - raise ValueError("Job not found.") - return job.to_public_dict() - - def _account_arg(args: dict[str, Any]) -> Optional[str]: return _opt_str(args, "account") @@ -318,18 +247,6 @@ def _get_account_info(args: dict[str, Any], _: McpToolContext) -> dict[str, Any] } -def _wechat_detection(args: dict[str, Any], _: McpToolContext) -> Any: - return _wechat_detection_router().detect_wechat_detailed(_opt_str(args, "data_root_path")) - - -async def _current_wechat_account(args: dict[str, Any], _: McpToolContext) -> Any: - return await _wechat_detection_router().detect_current_account(_opt_str(args, "data_root_path")) - - -async def _wechat_runtime_status(_: dict[str, Any], __: McpToolContext) -> Any: - return await _wechat_detection_router().check_wechat_status() - - def _list_contacts(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: result = _contacts_router().list_chat_contacts( _request(ctx), @@ -367,26 +284,6 @@ def _resolve_contact(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any return {"status": "success", "query": query, "count": len(candidates), "candidates": _clip_deep(candidates, max_items=50)} -def _export_contacts(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - contact_types_raw = args.get("contact_types") - contact_types = contact_types_raw if isinstance(contact_types_raw, dict) else {} - merged_flags = {**contact_types, **args} - contacts_router = _contacts_router() - req = contacts_router.ContactExportRequest( - account=_account_arg(args), - output_dir=_str(args, "output_dir"), - format=_str(args, "format", "json") or "json", - include_avatar_link=_bool(args, "include_avatar_link", True), - contact_types=contacts_router.ContactTypeFilter( - friends=_bool(merged_flags, "friends", True), - groups=_bool(merged_flags, "groups", True), - officials=_bool(merged_flags, "officials", True), - ), - keyword=_opt_str(args, "keyword") or _opt_str(args, "query"), - ) - return contacts_router.export_chat_contacts(_request(ctx), req) - - def _list_sessions(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: result = _chat_router().list_chat_sessions( _request(ctx), @@ -551,90 +448,6 @@ async def _resolve_app_message(args: dict[str, Any], ctx: McpToolContext) -> dic ) -def _search_index_status(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return get_chat_search_index_status(_resolve_account_dir(_account_arg(args))) - - -def _build_search_index(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return start_chat_search_index_build(_resolve_account_dir(_account_arg(args)), rebuild=_bool(args, "rebuild", False)) - - -def _session_last_message_status(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return get_session_last_message_status(_resolve_account_dir(_account_arg(args))) - - -def _build_session_last_message(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return build_session_last_message_table( - _resolve_account_dir(_account_arg(args)), - rebuild=_bool(args, "rebuild", False), - include_hidden=_bool(args, "include_hidden", True), - include_official=_bool(args, "include_official", True), - ) - - -def _chat_realtime_status(args: dict[str, Any], _: McpToolContext) -> Any: - return _chat_router().get_chat_realtime_status(account=_account_arg(args)) - - -def _chat_realtime_sync(args: dict[str, Any], ctx: McpToolContext) -> Any: - username = _str(args, "username") or _str(args, "session_id") - if not username: - raise ValueError("username is required.") - return _chat_router().sync_chat_realtime_messages( - _request(ctx), - account=_account_arg(args), - username=username, - max_scan=_int(args, "max_scan", _int(args, "limit", 600), minimum=50, maximum=5000), - backfill_limit=_int(args, "backfill_limit", 200, minimum=0, maximum=5000), - ) - - -def _chat_realtime_sync_all(args: dict[str, Any], ctx: McpToolContext) -> Any: - return _chat_router().sync_chat_realtime_messages_all( - _request(ctx), - account=_account_arg(args), - max_scan=_int(args, "max_scan", _int(args, "limit_per_session", 200), minimum=50, maximum=5000), - priority_username=_opt_str(args, "priority_username"), - priority_max_scan=_int(args, "priority_max_scan", 600, minimum=50, maximum=5000), - include_hidden=_bool(args, "include_hidden", True), - include_official=_bool(args, "include_official", True), - only_official=_bool(args, "only_official", False), - backfill_limit=_int(args, "backfill_limit", 200, minimum=0, maximum=5000), - ) - - -def _edit_status(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _chat_router().get_chat_edit_status(account=_account_arg(args), username=_str(args, "username"), message_id=_str(args, "message_id")) - - -def _list_edited_sessions(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _chat_router().list_chat_edited_sessions(_request(ctx), account=_account_arg(args)) - - -def _list_edited_messages(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _chat_router().list_chat_edited_messages(_request(ctx), username=_str(args, "username"), account=_account_arg(args)) - - -async def _edit_message(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return await _chat_router().edit_chat_message(_request(ctx, args)) - - -async def _repair_sender(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return await _chat_router().repair_chat_message_sender(_request(ctx, args)) - - -async def _flip_direction(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return await _chat_router().flip_chat_message_direction(_request(ctx, args)) - - -async def _reset_message_edit(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return await _chat_router().reset_chat_edited_message(_request(ctx, args)) - - -async def _reset_session_edits(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return await _chat_router().reset_chat_edited_session(_request(ctx, args)) - - def _sns_self_info(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: return _sns_router().api_sns_self_info(account=_account_arg(args)) @@ -661,14 +474,6 @@ def _sns_users(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: return _clip_deep(result, max_items=100) -def _sns_sync_latest(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _sns_router().sync_sns_realtime_timeline_latest( - account=_account_arg(args), - max_scan=_int(args, "max_scan", 200, minimum=1, maximum=2000), - force=_int(args, "force", 0), - ) - - def _sns_media_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: post_id = _opt_str(args, "post_id") or _opt_str(args, "tid") params = { @@ -688,7 +493,6 @@ def _sns_media_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: "token": _opt_str(args, "token"), "url": _opt_str(args, "url"), "key": _opt_str(args, "key"), - "use_cache": _opt_int(args, "use_cache"), }.items() if v not in (None, "") } @@ -712,7 +516,6 @@ def _sns_video_remote_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str "url": _opt_str(args, "url"), "token": _opt_str(args, "token"), "key": _opt_str(args, "key"), - "use_cache": _opt_int(args, "use_cache"), }.items() if v not in (None, "") } @@ -762,165 +565,146 @@ def _pay_records(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: def _wrapped_meta(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _wrapped_service().build_wrapped_annual_meta(account=_account_arg(args), year=_opt_int(args, "year"), refresh=_bool(args, "refresh", False)) + svc = _wrapped_service() + account_dir = _resolve_account_dir(_account_arg(args)) + year = _wrapped_cache_year(account_dir, _opt_int(args, "year"), svc) + return { + "status": "success", + "account": account_dir.name, + "year": year, + "scope": "global", + "cacheOnly": True, + "availableYears": _wrapped_available_cache_years(account_dir, svc), + "cards": [dict(c) for c in getattr(svc, "_WRAPPED_CARD_MANIFEST", ())], + } def _wrapped_card(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _clip_deep( - _wrapped_service().build_wrapped_annual_card( - account=_account_arg(args), - year=_opt_int(args, "year"), - card_id=_int(args, "card_id", minimum=0), - refresh=_bool(args, "refresh", False), - ), - max_items=80, - ) + svc = _wrapped_service() + account_dir = _resolve_account_dir(_account_arg(args)) + year = _wrapped_cache_year(account_dir, _opt_int(args, "year"), svc) + card_id = _int(args, "card_id", minimum=0) + card = _read_wrapped_card_cache(account_dir, year, card_id, svc) + if card is None: + return _wrapped_cache_missing(account_dir, year, f"card {card_id}", svc) + return _clip_deep(card, max_items=80) def _wrapped_annual(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _clip_deep(_wrapped_service().build_wrapped_annual_response(account=_account_arg(args), year=_opt_int(args, "year"), refresh=_bool(args, "refresh", False)), max_items=80) + svc = _wrapped_service() + account_dir = _resolve_account_dir(_account_arg(args)) + year = _wrapped_cache_year(account_dir, _opt_int(args, "year"), svc) + payload = _read_json_file(_wrapped_full_cache_path(account_dir, year, svc)) + if not isinstance(payload, dict): + return _wrapped_cache_missing(account_dir, year, "annual wrapped data", svc) + payload = {**payload, "cached": True, "cacheOnly": True, "availableYears": _wrapped_available_cache_years(account_dir, svc)} + return _clip_deep(payload, max_items=80) -def _chat_export_targets(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _clip_deep( - get_chat_export_targets_preview( - account=_account_arg(args), - include_hidden=_bool(args, "include_hidden", True), - include_official=_bool(args, "include_official", False), - base_url=ctx.base_url, - ), - max_items=120, - ) +def _wrapped_cache_dir_readonly(account_dir: Any) -> Any: + return account_dir / "_wrapped" / "cache" -def _create_chat_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - job = CHAT_EXPORT_MANAGER.create_job( - account=_account_arg(args), - scope=_str(args, "scope", "selected") or "selected", - usernames=_list_str(args, "usernames"), - export_format=_str(args, "format", "json") or "json", - start_time=_opt_int(args, "start_time"), - end_time=_opt_int(args, "end_time"), - include_hidden=_bool(args, "include_hidden", False), - include_official=_bool(args, "include_official", False), - include_media=_bool(args, "include_media", True), - media_kinds=_list_str(args, "media_kinds") or ["image", "emoji", "video", "video_thumb", "voice", "file"], - message_types=_list_str(args, "message_types"), - output_dir=_opt_str(args, "output_dir"), - allow_process_key_extract=_bool(args, "allow_process_key_extract", False), - download_remote_media=_bool(args, "download_remote_media", False), - html_page_size=_int(args, "html_page_size", 1000, minimum=0, maximum=10000), - privacy_mode=_bool(args, "privacy_mode", False), - file_name=_opt_str(args, "file_name"), - ) - return {"status": "success", "job": _job_payload(job)} +def _wrapped_cache_version(svc: Any) -> int: + return int(getattr(svc, "_CACHE_VERSION", 0)) -def _list_chat_exports(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - jobs = [j.to_public_dict() for j in CHAT_EXPORT_MANAGER.list_jobs()] - jobs.sort(key=lambda x: int(x.get("createdAt") or 0), reverse=True) - return {"status": "success", "jobs": jobs} +def _wrapped_implemented_upto(svc: Any) -> int: + return int(getattr(svc, "_IMPLEMENTED_UPTO_ID", 0)) -def _get_chat_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return {"status": "success", "job": _job_payload(CHAT_EXPORT_MANAGER.get_job(_str(args, "export_id")))} +def _wrapped_default_year(svc: Any) -> int: + try: + return int(svc._default_year()) + except Exception: + from datetime import datetime + + return int(datetime.now().year) -def _cancel_chat_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - ok = CHAT_EXPORT_MANAGER.cancel_job(_str(args, "export_id")) - if not ok: - raise ValueError("Export not found.") - return {"status": "success"} +def _wrapped_full_cache_path(account_dir: Any, year: int, svc: Any) -> Any: + version = _wrapped_cache_version(svc) + upto = _wrapped_implemented_upto(svc) + return _wrapped_cache_dir_readonly(account_dir) / f"global_{int(year)}_upto_{upto}_v{version}.json" -def _download_chat_export(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - export_id = _str(args, "export_id") - job = CHAT_EXPORT_MANAGER.get_job(export_id) - if not job: - raise ValueError("Export not found.") +def _wrapped_card_cache_path(account_dir: Any, year: int, card_id: int, svc: Any) -> Any: + version = _wrapped_cache_version(svc) + return _wrapped_cache_dir_readonly(account_dir) / f"global_{int(year)}_card_{int(card_id)}_v{version}.json" + + +def _read_json_file(path: Any) -> Any: + try: + if not path.exists() or not path.is_file(): + return None + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + +def _wrapped_available_cache_years(account_dir: Any, svc: Any) -> list[int]: + cache_dir = _wrapped_cache_dir_readonly(account_dir) + if not cache_dir.exists() or not cache_dir.is_dir(): + return [] + version = _wrapped_cache_version(svc) + upto = _wrapped_implemented_upto(svc) + years: set[int] = set() + patterns = [ + f"global_*_upto_{upto}_v{version}.json", + f"global_*_card_*_v{version}.json", + ] + for pattern in patterns: + try: + paths = list(cache_dir.glob(pattern)) + except Exception: + paths = [] + for path in paths: + parts = str(path.stem or "").split("_") + if len(parts) < 2: + continue + try: + year = int(parts[1]) + except Exception: + continue + if year > 0: + years.add(year) + return sorted(years, reverse=True) + + +def _wrapped_cache_year(account_dir: Any, requested_year: Optional[int], svc: Any) -> int: + years = _wrapped_available_cache_years(account_dir, svc) + year = int(requested_year or _wrapped_default_year(svc)) + if years and year not in years: + return int(years[0]) + return year + + +def _read_wrapped_card_cache(account_dir: Any, year: int, card_id: int, svc: Any) -> dict[str, Any] | None: + card_payload = _read_json_file(_wrapped_card_cache_path(account_dir, year, card_id, svc)) + if isinstance(card_payload, dict): + return card_payload + annual_payload = _read_json_file(_wrapped_full_cache_path(account_dir, year, svc)) + if isinstance(annual_payload, dict): + for card in annual_payload.get("cards") or []: + if not isinstance(card, dict): + continue + try: + if int(card.get("id") or -1) == int(card_id): + return card + except Exception: + continue + return None + + +def _wrapped_cache_missing(account_dir: Any, year: int, target: str, svc: Any) -> dict[str, Any]: return { - "status": "success", - "ready": bool(job.zip_path and job.zip_path.exists()), - "job": job.to_public_dict(), - "downloadUrl": _download_url(ctx, f"/api/chat/exports/{export_id}/download"), - } - - -def _create_sns_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - job = SNS_EXPORT_MANAGER.create_job( - account=_account_arg(args), - scope=_str(args, "scope", "selected") or "selected", - usernames=_list_str(args, "usernames"), - export_format=_str(args, "format", "html") or "html", - use_cache=_bool(args, "use_cache", True), - output_dir=_opt_str(args, "output_dir"), - file_name=_opt_str(args, "file_name"), - ) - return {"status": "success", "job": _job_payload(job)} - - -def _list_sns_exports(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - jobs = [j.to_public_dict() for j in SNS_EXPORT_MANAGER.list_jobs()] - jobs.sort(key=lambda x: int(x.get("createdAt") or 0), reverse=True) - return {"status": "success", "jobs": jobs} - - -def _get_sns_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return {"status": "success", "job": _job_payload(SNS_EXPORT_MANAGER.get_job(_str(args, "export_id")))} - - -def _cancel_sns_export(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - ok = SNS_EXPORT_MANAGER.cancel_job(_str(args, "export_id")) - if not ok: - raise ValueError("Export not found.") - return {"status": "success"} - - -def _download_sns_export(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - export_id = _str(args, "export_id") - job = SNS_EXPORT_MANAGER.get_job(export_id) - if not job: - raise ValueError("Export not found.") - return { - "status": "success", - "ready": bool(job.zip_path and job.zip_path.exists()), - "job": job.to_public_dict(), - "downloadUrl": _download_url(ctx, f"/api/sns/exports/{export_id}/download"), - } - - -async def _create_account_archive(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - account_archive_router = _account_archive_router() - req = account_archive_router.AccountArchiveExportRequest( - account=_account_arg(args), - output_dir=_opt_str(args, "output_dir"), - include_databases=_bool(args, "include_databases", True), - include_resources=_bool(args, "include_resources", True), - file_name=_opt_str(args, "file_name"), - ) - return await account_archive_router.export_account_archive(req) - - -async def _get_account_archive(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _account_archive_router().get_account_archive_export(_str(args, "export_id")) - - -async def _cancel_account_archive(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _account_archive_router().cancel_account_archive_export(_str(args, "export_id")) - - -async def _download_account_archive(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - result = await _account_archive_router().get_account_archive_export(_str(args, "export_id")) - job = dict(result.get("job") or {}) - zip_path = str(job.get("zipPath") or "").strip() - download_url = "" - if zip_path: - download_url = _download_url(ctx, f"/api/account/archive_export/download?{urlencode({'path': zip_path})}") - return { - "status": "success", - "ready": bool(zip_path and Path(zip_path).exists()), - "job": job, - "downloadUrl": download_url, + "status": "error", + "message": "Wrapped cache not found. Open the app to generate it first.", + "account": account_dir.name, + "year": int(year), + "target": target, + "cacheOnly": True, + "availableYears": _wrapped_available_cache_years(account_dir, svc), } @@ -939,99 +723,6 @@ def _url_result(ctx: McpToolContext, path: str, params: dict[str, Any] | None = return {"status": "success", kind: _download_url(ctx, f"{path}{suffix}"), "params": clean} -async def _api_root(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _health_router().api_root() - - -async def _health_check(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _health_router().health_check() - - -async def _decrypt_databases(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - decrypt_router = _decrypt_router() - req = decrypt_router.DecryptRequest( - key=_str(args, "key"), - db_storage_path=_str(args, "db_storage_path"), - ) - return await decrypt_router.decrypt_databases(req) - - -def _decrypt_stream_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _url_result( - ctx, - "/api/decrypt_stream", - {"key": _str(args, "key"), "db_storage_path": _str(args, "db_storage_path")}, - kind="streamUrl", - ) - - -async def _get_saved_keys(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _keys_router().get_saved_keys( - account=_account_arg(args), - db_storage_path=_opt_str(args, "db_storage_path"), - wxid_dir=_opt_str(args, "wxid_dir"), - ) - - -async def _get_wechat_db_key(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _keys_router().get_wechat_db_key(wechat_install_path=_opt_str(args, "wechat_install_path")) - - -async def _get_image_key(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _keys_router().get_image_key( - account=_account_arg(args), - db_storage_path=_opt_str(args, "db_storage_path"), - wxid_dir=_opt_str(args, "wxid_dir"), - ) - - -async def _save_media_keys(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - media_router = _media_router() - req = media_router.MediaKeysSaveRequest( - account=_account_arg(args), - xor_key=_str(args, "xor_key"), - aes_key=_opt_str(args, "aes_key"), - ) - return await media_router.save_media_keys_api(req) - - -async def _decrypt_all_media(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - media_router = _media_router() - req = media_router.MediaDecryptRequest( - account=_account_arg(args), - xor_key=_opt_str(args, "xor_key"), - aes_key=_opt_str(args, "aes_key"), - ) - return await media_router.decrypt_all_media(req) - - -def _decrypt_all_media_stream_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _url_result( - ctx, - "/api/media/decrypt_all_stream", - { - "account": _account_arg(args), - "xor_key": _opt_str(args, "xor_key"), - "aes_key": _opt_str(args, "aes_key"), - "concurrency": _int(args, "concurrency", 10, minimum=1, maximum=64), - }, - kind="streamUrl", - ) - - -def _download_all_emojis_stream_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _url_result( - ctx, - "/api/media/emoji/download_all_stream", - { - "account": _account_arg(args), - "force": _bool(args, "force", False), - "concurrency": _int(args, "concurrency", 20, minimum=1, maximum=100), - }, - kind="streamUrl", - ) - - def _decrypted_media_resource_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: md5 = _str(args, "md5").lower() if len(md5) != 32: @@ -1039,109 +730,6 @@ def _decrypted_media_resource_url(args: dict[str, Any], ctx: McpToolContext) -> return _url_result(ctx, f"/api/media/resource/{md5}", {"account": _account_arg(args)}, kind="url") -async def _import_preview(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - import_router = _import_decrypted_router() - return await import_router.preview_import(import_router.ImportRequest(import_path=_str(args, "import_path"))) - - -async def _import_cancel(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _import_decrypted_router().cancel_import_decrypted(job_id=_str(args, "job_id")) - - -def _import_stream_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _url_result( - ctx, - "/api/import_decrypted", - {"import_path": _str(args, "import_path"), "job_id": _opt_str(args, "job_id")}, - kind="streamUrl", - ) - - -async def _admin_log_file(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _admin_router().get_backend_log_file() - - -def _admin_open_log_file(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - admin_router = _admin_router() - log_file = admin_router._get_current_log_file_path() - admin_router._open_path_with_default_app(log_file) - return {"status": "success", "path": str(log_file), "exists": log_file.exists()} - - -async def _admin_log_frontend_error(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _admin_router().log_frontend_server_error(args) - - -async def _admin_port(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _admin_router().get_backend_port() - - -async def _admin_mcp_access(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _admin_router().get_mcp_access() - - -async def _admin_set_port_setting(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - admin_router = _admin_router() - port = _int(args, "port", 0, minimum=1, maximum=65535) - current_port, source = admin_router.read_effective_backend_port(default=admin_router.DEFAULT_BACKEND_PORT) - admin_router.write_backend_port_setting(port) - env_file = admin_router.write_backend_port_env_file(port) - return { - "status": "success", - "changed": int(current_port) != int(port), - "port": port, - "previousPort": int(current_port), - "previousSource": source, - "defaultPort": admin_router.DEFAULT_BACKEND_PORT, - "restartRequired": int(current_port) != int(port), - "envFile": str(env_file) if env_file else None, - } - - -async def _admin_set_port_and_restart(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - admin_router = _admin_router() - port = _int(args, "port", 0, minimum=1, maximum=65535) - background_tasks = BackgroundTasks() - result = await admin_router.set_backend_port({"port": port}, _LoopbackRequest(), background_tasks) - try: - if bool(result.get("changed")): - asyncio.create_task(background_tasks()) - result["restartScheduled"] = True - except Exception: - result["restartScheduled"] = False - return result - - -async def _admin_set_mcp_access(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - admin_router = _admin_router() - enabled = _bool(args, "enabled", False) - background_tasks = BackgroundTasks() - result = await admin_router.set_mcp_access({"enabled": enabled}, _LoopbackRequest(), background_tasks) - try: - if bool(result.get("changed")): - asyncio.create_task(background_tasks()) - result["restartScheduled"] = True - except Exception: - result["restartScheduled"] = False - return result - - -async def _img_helper_status(_: dict[str, Any], __: McpToolContext) -> dict[str, Any]: - return await _system_router().get_img_helper_status() - - -async def _img_helper_toggle(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - system_router = _system_router() - return await system_router.toggle_img_helper(system_router.ImgHelperToggleRequest(enabled=_bool(args, "enabled", False))) - - -async def _pick_directory(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _system_router().pick_directory( - title=_str(args, "title", "请选择目录") or "请选择目录", - initial_dir=_str(args, "initial_dir"), - ) - - def _chat_proxy_image_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: url = _str(args, "url") if not url: @@ -1163,47 +751,6 @@ def _biz_proxy_image_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, return _url_result(ctx, "/api/biz/proxy_image", {"url": url}, kind="url") -def _chat_export_events_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - export_id = _str(args, "export_id") - if not export_id: - raise ValueError("export_id is required.") - return {"status": "success", "streamUrl": _download_url(ctx, f"/api/chat/exports/{export_id}/events"), "exportId": export_id} - - -def _moments_export_events_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - export_id = _str(args, "export_id") - if not export_id: - raise ValueError("export_id is required.") - return {"status": "success", "streamUrl": _download_url(ctx, f"/api/sns/exports/{export_id}/events"), "exportId": export_id} - - -def _chat_realtime_events_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - return _url_result( - ctx, - "/api/chat/realtime/stream", - { - "account": _account_arg(args), - "interval_ms": _int(args, "interval_ms", 500, minimum=100, maximum=5000), - }, - kind="streamUrl", - ) - - -def _delete_account_data(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return _chat_router().delete_chat_account(account=_str(args, "account")) - - -async def _open_chat_media_folder(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: - return await _chat_media_router().open_chat_media_folder( - kind=_str(args, "kind"), - md5=_opt_str(args, "md5"), - file_id=_opt_str(args, "file_id"), - server_id=_opt_int(args, "server_id") or _opt_int(args, "msg_svr_id"), - account=_account_arg(args), - username=_opt_str(args, "username") or _opt_str(args, "session_id"), - ) - - def _safe_call(label: str, func: Callable[[], Any]) -> dict[str, Any]: try: return {"ok": True, "data": func()} @@ -1211,7 +758,7 @@ def _safe_call(label: str, func: Callable[[], Any]) -> dict[str, Any]: return {"ok": False, "error": str(exc), "section": label} -def _mobile_home_snapshot(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: +async def _mobile_home_snapshot(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: account = _account_arg(args) session_limit = _int(args, "session_limit", 20, minimum=1, maximum=80) moments_limit = _int(args, "moments_limit", 6, minimum=0, maximum=30) @@ -1227,8 +774,6 @@ def _mobile_home_snapshot(args: dict[str, Any], ctx: McpToolContext) -> dict[str "accountInfo": None, "sessions": None, "moments": None, - "realtime": None, - "indexes": None, "warnings": [], } @@ -1264,24 +809,6 @@ def _mobile_home_snapshot(args: dict[str, Any], ctx: McpToolContext) -> dict[str else: payload["warnings"].append(moments) - realtime = _safe_call("realtime", lambda: _chat_realtime_status({"account": account}, ctx)) - if realtime["ok"]: - payload["realtime"] = realtime["data"] - else: - payload["warnings"].append(realtime) - - indexes = _safe_call( - "indexes", - lambda: { - "searchIndex": _search_index_status({"account": account}, ctx), - "sessionLastMessage": _session_last_message_status({"account": account}, ctx), - }, - ) - if indexes["ok"]: - payload["indexes"] = indexes["data"] - else: - payload["warnings"].append(indexes) - return _clip_deep(payload, max_items=120) @@ -1338,7 +865,7 @@ async def _mobile_search_context(args: dict[str, Any], ctx: McpToolContext) -> d return _clip_deep(payload, max_items=120) -def _mobile_session_bundle(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: +async def _mobile_session_bundle(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: username = _str(args, "username") or _str(args, "session_id") if not username: raise ValueError("username is required.") @@ -1353,7 +880,6 @@ def _mobile_session_bundle(args: dict[str, Any], ctx: McpToolContext) -> dict[st "session": None, "messages": None, "dailyCounts": None, - "realtime": None, "warnings": [], } @@ -1395,12 +921,6 @@ def _mobile_session_bundle(args: dict[str, Any], ctx: McpToolContext) -> dict[st else: payload["warnings"].append(daily) - realtime = _safe_call("realtime", lambda: _chat_realtime_status({"account": account}, ctx)) - if realtime["ok"]: - payload["realtime"] = realtime["data"] - else: - payload["warnings"].append(realtime) - return _clip_deep(payload, max_items=140) @@ -1474,8 +994,8 @@ def _candidate_display(item: dict[str, Any]) -> str: return "" -def _mobile_overview(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - snapshot = _mobile_home_snapshot( +async def _mobile_overview(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: + snapshot = await _mobile_home_snapshot( { **args, "session_limit": _int(args, "session_limit", 8, minimum=1, maximum=30), @@ -1496,15 +1016,12 @@ def _mobile_overview(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any "health": { "service": snapshot.get("service"), "accountInfo": snapshot.get("accountInfo"), - "indexes": snapshot.get("indexes"), - "realtime": snapshot.get("realtime"), }, "suggestedTools": [ "wechat.mobile.resolve_target", "wechat.mobile.search_chat", "wechat.mobile.get_chat_context", "wechat.mobile.get_media_links", - "wechat.mobile.export_job", ], "warnings": snapshot.get("warnings") or [], }, @@ -1779,44 +1296,10 @@ def _mobile_get_analytics(args: dict[str, Any], ctx: McpToolContext) -> dict[str elif metric == "pay": data = _pay_records({"account": account, "limit": _int(args, "limit", 20, minimum=1, maximum=100), "offset": _int(args, "offset", 0, minimum=0)}, ctx) else: - data = _wrapped_meta({"account": account, "year": _opt_int(args, "year"), "refresh": _bool(args, "refresh", False)}, ctx) + data = _wrapped_meta({"account": account, "year": _opt_int(args, "year")}, ctx) return _clip_deep({"status": "success", "ok": True, "account": account, "metric": metric, "basis": {"year": _opt_int(args, "year"), "username": _opt_str(args, "username"), "month": _opt_int(args, "month")}, "data": data}, max_items=100) -async def _mobile_export_job(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: - action = (_str(args, "action", "status") or "status").lower() - kind = (_str(args, "kind", "chat") or "chat").lower() - if action == "preview" and kind == "chat": - data = _chat_export_targets(args, ctx) - elif action == "create" and kind == "chat": - data = _create_chat_export(args, ctx) - elif action == "create" and kind == "moments": - data = _create_sns_export(args, ctx) - elif action == "create" and kind == "archive": - data = await _create_account_archive(args, ctx) - elif action == "status" and kind == "chat": - data = _get_chat_export(args, ctx) - elif action == "status" and kind == "moments": - data = _get_sns_export(args, ctx) - elif action == "status" and kind == "archive": - data = await _get_account_archive(args, ctx) - elif action == "download" and kind == "chat": - data = _download_chat_export(args, ctx) - elif action == "download" and kind == "moments": - data = _download_sns_export(args, ctx) - elif action == "download" and kind == "archive": - data = await _download_account_archive(args, ctx) - elif action == "cancel" and kind == "chat": - data = _cancel_chat_export(args, ctx) - elif action == "cancel" and kind == "moments": - data = _cancel_sns_export(args, ctx) - elif action == "cancel" and kind == "archive": - data = await _cancel_account_archive(args, ctx) - else: - raise ValueError("Unsupported export action/kind.") - return _clip_deep({"status": "success", "ok": True, "action": action, "kind": kind, "nextPollAfterMs": 1000 if action in {"create", "status"} else None, "data": data}, max_items=100) - - def _avatar_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any]: return _media_url("/api/chat/avatar", args, ctx, ["username", "account"]) @@ -1841,12 +1324,6 @@ def _chat_voice_url(args: dict[str, Any], ctx: McpToolContext) -> dict[str, Any] return _media_url("/api/chat/media/voice", args, ctx, ["server_id", "account"]) -def _download_emoji(args: dict[str, Any], ctx: McpToolContext) -> Any: - chat_media_router = _chat_media_router() - req = chat_media_router.EmojiDownloadRequest(**args) - return chat_media_router.download_chat_emoji(req) - - def _tools_catalog(args: dict[str, Any], _: McpToolContext) -> dict[str, Any]: package = _opt_str(args, "package") tools = MCP_REGISTRY.list_tools()["tools"] @@ -1874,39 +1351,9 @@ def _install_tools() -> None: _register("wechat.core.list_tools", "List WeChat MCP tools, optionally filtered by package.", object_schema({"package": string_schema("Optional package name."), "cursor": string_schema("Optional numeric cursor."), "limit": int_schema("Maximum tools to return.", minimum=1, maximum=100)}), _tools_catalog, package="wechat.core") _register("wechat.core.list_accounts", "List decrypted WeChat accounts available to WeChatDataAnalysis.", object_schema(), _list_accounts, package="wechat.core") _register("wechat.core.get_account_info", "Return database and account metadata for one decrypted account.", object_schema(COMMON_ACCOUNT), _get_account_info, package="wechat.core") - _register("wechat.admin.detect_wechat_installation", "Detect local WeChat installation and data directories.", object_schema({"data_root_path": string_schema("Optional WeChat data root path.")}), _wechat_detection, package="wechat.admin") - _register("wechat.admin.get_current_wechat_account", "Detect the currently logged-in WeChat account.", object_schema({"data_root_path": string_schema("Optional WeChat data root path.")}), _current_wechat_account, package="wechat.admin") - _register("wechat.admin.get_wechat_runtime_status", "Return whether the WeChat process is running.", object_schema(), _wechat_runtime_status, package="wechat.admin") - _register("wechat.system.api_root", "Return the API root metadata.", object_schema(), _api_root, package="wechat.system") - _register("wechat.system.health_check", "Return backend health status.", object_schema(), _health_check, package="wechat.system") - _register("wechat.system.get_backend_log_file", "Return the current backend log file path.", object_schema(), _admin_log_file, package="wechat.system") - _register("wechat.system.open_backend_log_file", "Open the current backend log file on the desktop host.", object_schema(), _admin_open_log_file, package="wechat.system", read_only=False) - _register("wechat.system.log_frontend_server_error", "Append a frontend-observed server error to the backend log.", object_schema(additional_properties=True), _admin_log_frontend_error, package="wechat.system", read_only=False) - _register("wechat.system.get_backend_port", "Return the configured backend port.", object_schema(), _admin_port, package="wechat.system") - _register("wechat.system.set_backend_port_setting", "Persist a backend port setting without restarting the current MCP response.", object_schema({"port": int_schema("Backend port.", minimum=1, maximum=65535)}, required=["port"]), _admin_set_port_setting, package="wechat.system", read_only=False) - _register("wechat.system.set_backend_port_and_restart", "Change the backend port using the desktop router flow and restart the backend process.", object_schema({"port": int_schema("Backend port.", minimum=1, maximum=65535)}, required=["port"]), _admin_set_port_and_restart, package="wechat.system", read_only=False, destructive=True) - _register("wechat.system.get_mcp_lan_access", "Return whether phone/LAN clients can reach the MCP endpoint.", object_schema(), _admin_mcp_access, package="wechat.system") - _register("wechat.system.set_mcp_lan_access", "Enable or disable LAN access for the MCP endpoint and restart the backend if needed.", object_schema({"enabled": bool_schema("Whether LAN MCP access should be enabled.", default=False)}, required=["enabled"]), _admin_set_mcp_access, package="wechat.system", read_only=False, destructive=True) - _register("wechat.system.get_img_helper_status", "Return the large-image helper plugin status.", object_schema(), _img_helper_status, package="wechat.system") - _register("wechat.system.toggle_img_helper", "Enable or disable the local large-image helper plugin.", object_schema({"enabled": bool_schema("Whether the helper should be enabled.", default=False)}, required=["enabled"]), _img_helper_toggle, package="wechat.system", read_only=False) - _register("wechat.system.pick_directory", "Open a native directory picker on the desktop host.", object_schema({"title": string_schema("Dialog title."), "initial_dir": string_schema("Initial directory.")}), _pick_directory, package="wechat.system", read_only=False) - - _register("wechat.setup.get_saved_keys", "Return saved database and media keys for an account or wxid directory.", object_schema({**COMMON_ACCOUNT, "db_storage_path": string_schema("Optional WeChat db_storage path."), "wxid_dir": string_schema("Optional WeChat wxid directory.")}), _get_saved_keys, package="wechat.setup") - _register("wechat.setup.get_database_key", "Run the local desktop workflow that extracts the WeChat database key.", object_schema({"wechat_install_path": string_schema("Optional WeChat install path.")}), _get_wechat_db_key, package="wechat.setup", read_only=False) - _register("wechat.setup.get_image_key", "Fetch and save WeChat image AES/XOR keys for an account or wxid directory.", object_schema({**COMMON_ACCOUNT, "db_storage_path": string_schema("Optional WeChat db_storage path."), "wxid_dir": string_schema("Optional WeChat wxid directory.")}), _get_image_key, package="wechat.setup", read_only=False) - _register("wechat.setup.decrypt_databases", "Decrypt WeChat databases from a db_storage path with a 64-character database key.", object_schema({"key": string_schema("64-character hex database key."), "db_storage_path": string_schema("Absolute db_storage path.")}, required=["key", "db_storage_path"]), _decrypt_databases, package="wechat.setup", read_only=False) - _register("wechat.setup.get_decrypt_stream_url", "Build an SSE URL for database decryption progress.", object_schema({"key": string_schema("64-character hex database key."), "db_storage_path": string_schema("Absolute db_storage path.")}, required=["key", "db_storage_path"]), _decrypt_stream_url, package="wechat.setup", read_only=False) - _register("wechat.setup.preview_import_decrypted", "Preview an already-decrypted account directory before import.", object_schema({"import_path": string_schema("Absolute decrypted account/export directory.")}, required=["import_path"]), _import_preview, package="wechat.setup") - _register("wechat.setup.get_import_decrypted_stream_url", "Build an SSE URL that imports an already-decrypted account directory.", object_schema({"import_path": string_schema("Absolute decrypted account/export directory."), "job_id": string_schema("Optional cancellation job id.")}, required=["import_path"]), _import_stream_url, package="wechat.setup", read_only=False) - _register("wechat.setup.cancel_import_decrypted", "Cancel an in-memory decrypted-account import job.", object_schema({"job_id": string_schema("Import job id.")}, required=["job_id"]), _import_cancel, package="wechat.setup", read_only=False) - _register("wechat.setup.save_media_keys", "Save image XOR/AES media keys for an account.", object_schema({**COMMON_ACCOUNT, "xor_key": string_schema("XOR key such as 0xA5."), "aes_key": string_schema("Optional AES key.")}, required=["xor_key"]), _save_media_keys, package="wechat.setup", read_only=False) - _register("wechat.setup.decrypt_all_media", "Decrypt all local .dat image resources for an account.", object_schema({**COMMON_ACCOUNT, "xor_key": string_schema("Optional XOR key."), "aes_key": string_schema("Optional AES key.")}), _decrypt_all_media, package="wechat.setup", read_only=False) - _register("wechat.setup.get_decrypt_all_media_stream_url", "Build an SSE URL for bulk media decryption progress.", object_schema({**COMMON_ACCOUNT, "xor_key": string_schema("Optional XOR key."), "aes_key": string_schema("Optional AES key."), "concurrency": int_schema("Worker count.", minimum=1, maximum=64)}), _decrypt_all_media_stream_url, package="wechat.setup", read_only=False) - _register("wechat.setup.get_download_all_emojis_stream_url", "Build an SSE URL for bulk emoji download progress.", object_schema({**COMMON_ACCOUNT, "force": bool_schema("Download even when cached.", default=False), "concurrency": int_schema("Worker count.", minimum=1, maximum=100)}), _download_all_emojis_stream_url, package="wechat.setup", read_only=False) _register("wechat.contacts.list_contacts", "List contacts, groups, and official accounts with optional fuzzy keyword filtering.", object_schema({**COMMON_ACCOUNT, **PAGING, "keyword": string_schema("Optional fuzzy keyword."), "include_friends": bool_schema("Include friends.", default=True), "include_groups": bool_schema("Include groups.", default=True), "include_officials": bool_schema("Include official accounts.", default=True)}), _list_contacts, package="wechat.contacts") _register("wechat.contacts.resolve_contact", "Resolve a fuzzy person/group/official-account clue to contact candidates.", object_schema({**COMMON_ACCOUNT, "query": string_schema("Fuzzy contact clue."), "limit": int_schema("Maximum candidates.", minimum=1, maximum=50)}, required=["query"]), _resolve_contact, package="wechat.contacts") - _register("wechat.contacts.export_contacts", "Export contacts to a local JSON or CSV file.", object_schema({**COMMON_ACCOUNT, "output_dir": string_schema("Absolute output directory."), "format": string_schema("json or csv."), "include_avatar_link": bool_schema("Include avatar links.", default=True), "friends": bool_schema("Include friends.", default=True), "groups": bool_schema("Include groups.", default=True), "officials": bool_schema("Include official accounts.", default=True), "keyword": string_schema("Optional fuzzy keyword.")}, required=["output_dir"]), _export_contacts, package="wechat.contacts", read_only=False) _register("wechat.chat.list_sessions", "List chat sessions with preview and optional fuzzy filtering.", object_schema({**COMMON_ACCOUNT, **PAGING, "query": string_schema("Optional fuzzy session keyword."), "include_hidden": bool_schema("Include hidden sessions.", default=False), "include_official": bool_schema("Include official account sessions.", default=False), "preview": string_schema("Preview mode.")}), _list_sessions, package="wechat.chat") _register("wechat.chat.resolve_session", "Resolve a fuzzy clue to chat session candidates.", object_schema({**COMMON_ACCOUNT, "query": string_schema("Fuzzy session clue."), "limit": int_schema("Maximum candidates.", minimum=1, maximum=50)}, required=["query"]), _resolve_session, package="wechat.chat") @@ -1919,30 +1366,11 @@ def _install_tools() -> None: _register("wechat.chat.get_message_raw", "Return raw decrypted fields for one message. Use only for debugging or missing structured fields.", object_schema({**COMMON_ACCOUNT, "username": string_schema("Session username."), "message_id": string_schema("Message id.")}, required=["username", "message_id"]), _message_raw, package="wechat.chat") _register("wechat.chat.resolve_chat_history", "Resolve a merged-forward chat history AppMsg by server_id.", object_schema({**COMMON_ACCOUNT, "server_id": int_schema("Message server id.", minimum=1)}, required=["server_id"]), _resolve_chat_history, package="wechat.chat") _register("wechat.chat.resolve_app_message", "Resolve an AppMsg/card/miniprogram message by server_id.", object_schema({**COMMON_ACCOUNT, "server_id": int_schema("Message server id.", minimum=1)}, required=["server_id"]), _resolve_app_message, package="wechat.chat") - _register("wechat.chat.get_search_index_status", "Return chat search index status.", object_schema(COMMON_ACCOUNT), _search_index_status, package="wechat.admin") - _register("wechat.chat.build_search_index", "Build or rebuild chat search index.", object_schema({**COMMON_ACCOUNT, "rebuild": bool_schema("Rebuild even if index exists.", default=False)}), _build_search_index, package="wechat.admin", read_only=False) - _register("wechat.chat.get_session_last_message_cache_status", "Return session last-message cache status.", object_schema(COMMON_ACCOUNT), _session_last_message_status, package="wechat.admin") - _register("wechat.chat.build_session_last_message_cache", "Build session last-message cache.", object_schema({**COMMON_ACCOUNT, "rebuild": bool_schema("Rebuild cache.", default=False), "include_hidden": bool_schema("Include hidden sessions.", default=True), "include_official": bool_schema("Include official sessions.", default=True)}), _build_session_last_message, package="wechat.admin", read_only=False) - _register("wechat.chat.get_realtime_status", "Return chat realtime sync status.", object_schema(COMMON_ACCOUNT), _chat_realtime_status, package="wechat.admin") - _register("wechat.chat.sync_realtime_session", "Sync realtime messages for one session.", object_schema({**COMMON_ACCOUNT, "username": string_schema("Session username."), "max_scan": int_schema("Maximum realtime rows to scan.", minimum=50, maximum=5000), "backfill_limit": int_schema("Maximum old rows to backfill.", minimum=0, maximum=5000)}, required=["username"]), _chat_realtime_sync, package="wechat.admin", read_only=False) - _register("wechat.chat.sync_realtime_all_sessions", "Sync realtime messages for all sessions.", object_schema({**COMMON_ACCOUNT, "max_scan": int_schema("Maximum realtime rows per session.", minimum=50, maximum=5000), "priority_username": string_schema("Optional username to sync first."), "priority_max_scan": int_schema("Priority session max scan.", minimum=50, maximum=5000), "include_hidden": bool_schema("Include hidden sessions.", default=True), "include_official": bool_schema("Include official sessions.", default=True), "only_official": bool_schema("Only sync official sessions.", default=False), "backfill_limit": int_schema("Maximum old rows to backfill.", minimum=0, maximum=5000)}), _chat_realtime_sync_all, package="wechat.admin", read_only=False) - _register("wechat.chat.get_realtime_events_url", "Build an SSE URL for realtime db_storage change events.", object_schema({**COMMON_ACCOUNT, "interval_ms": int_schema("Polling interval in milliseconds.", minimum=100, maximum=5000)}), _chat_realtime_events_url, package="wechat.admin") - _register("wechat.admin.delete_account_data", "Delete one account's local WeChatDataAnalysis data from this project.", object_schema({"account": string_schema("Account directory name.")}, required=["account"]), _delete_account_data, package="wechat.admin", read_only=False, destructive=True) - - _register("wechat.editing.list_edited_sessions", "List sessions with local message edits.", object_schema(COMMON_ACCOUNT), _list_edited_sessions, package="wechat.editing") - _register("wechat.editing.list_edited_messages", "List edited messages for one session.", object_schema({**COMMON_ACCOUNT, "username": string_schema("Session username.")}, required=["username"]), _list_edited_messages, package="wechat.editing") - _register("wechat.editing.get_message_edit_status", "Return whether one message has been edited.", object_schema({**COMMON_ACCOUNT, "username": string_schema("Session username."), "message_id": string_schema("Message id.")}, required=["username", "message_id"]), _edit_status, package="wechat.editing") - _register("wechat.editing.edit_message", "Edit one message in the real WeChat database and output cache.", object_schema(additional_properties=True), _edit_message, package="wechat.editing", read_only=False) - _register("wechat.editing.repair_message_sender", "Repair one message sender metadata.", object_schema(additional_properties=True), _repair_sender, package="wechat.editing", read_only=False) - _register("wechat.editing.flip_message_direction", "Flip one message display direction.", object_schema(additional_properties=True), _flip_direction, package="wechat.editing", read_only=False) - _register("wechat.editing.reset_message_edit", "Restore one edited message from its first snapshot.", object_schema(additional_properties=True), _reset_message_edit, package="wechat.editing", read_only=False) - _register("wechat.editing.reset_session_edits", "Restore all edited messages in one session.", object_schema(additional_properties=True), _reset_session_edits, package="wechat.editing", read_only=False) _register("wechat.moments.get_self_info", "Return Moments self wxid and display name.", object_schema(COMMON_ACCOUNT), _sns_self_info, package="wechat.moments") _register("wechat.moments.list_timeline", "List Moments timeline by users, keyword, and pagination.", object_schema({**COMMON_ACCOUNT, **PAGING, "usernames": array_schema("Optional poster usernames.", string_schema("Username.")), "keyword": string_schema("Optional content keyword.")}), _sns_timeline, package="wechat.moments") _register("wechat.moments.search_moments", "Alias for timeline keyword/user search.", object_schema({**COMMON_ACCOUNT, **PAGING, "usernames": array_schema("Optional poster usernames.", string_schema("Username.")), "query": string_schema("Content keyword.")}), _sns_timeline, package="wechat.moments") _register("wechat.moments.list_users", "List Moments posters with post counts.", object_schema({**COMMON_ACCOUNT, "keyword": string_schema("Optional poster keyword."), "limit": int_schema("Maximum users.", minimum=1, maximum=500)}), _sns_users, package="wechat.moments") - _register("wechat.moments.sync_latest", "Sync latest visible Moments into decrypted sns.db.", object_schema({**COMMON_ACCOUNT, "max_scan": int_schema("Maximum rows to scan.", minimum=1, maximum=2000), "force": int_schema("Force flag 0/1.", minimum=0, maximum=1)}), _sns_sync_latest, package="wechat.admin", read_only=False) _register("wechat.moments.get_media_url", "Build a URL for a Moments image resource.", object_schema(additional_properties=True), _sns_media_url, package="wechat.media") _register("wechat.moments.get_article_thumb_url", "Build a URL for an official-article thumbnail image.", object_schema({"url": string_schema("Article URL.")}, required=["url"]), _sns_article_thumb_url, package="wechat.media") _register("wechat.moments.get_remote_video_url", "Build a URL for a remote Moments video/live-photo resource.", object_schema(additional_properties=True), _sns_video_remote_url, package="wechat.media") @@ -1952,9 +1380,9 @@ def _install_tools() -> None: _register("wechat.biz.get_messages", "Get official account messages.", object_schema({**COMMON_ACCOUNT, **PAGING, "username": string_schema("Official account username.")}, required=["username"]), _biz_messages, package="wechat.biz") _register("wechat.biz.get_pay_records", "Get WeChat Pay records from the pay official account.", object_schema({**COMMON_ACCOUNT, **PAGING}), _pay_records, package="wechat.biz") - _register("wechat.analytics.get_wrapped_meta", "Return annual wrapped manifest.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year."), "refresh": bool_schema("Refresh cache.", default=False)}), _wrapped_meta, package="wechat.analytics") - _register("wechat.analytics.get_wrapped_card", "Return one annual wrapped card.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year."), "card_id": int_schema("Card id.", minimum=0), "refresh": bool_schema("Refresh cache.", default=False)}, required=["card_id"]), _wrapped_card, package="wechat.analytics") - _register("wechat.analytics.get_wrapped_annual", "Return full annual wrapped data. Prefer meta/card for mobile clients.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year."), "refresh": bool_schema("Refresh cache.", default=False)}), _wrapped_annual, package="wechat.analytics") + _register("wechat.analytics.get_wrapped_meta", "Return annual wrapped manifest.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year.")}), _wrapped_meta, package="wechat.analytics") + _register("wechat.analytics.get_wrapped_card", "Return one annual wrapped card.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year."), "card_id": int_schema("Card id.", minimum=0)}, required=["card_id"]), _wrapped_card, package="wechat.analytics") + _register("wechat.analytics.get_wrapped_annual", "Return full annual wrapped data. Prefer meta/card for mobile clients.", object_schema({**COMMON_ACCOUNT, "year": int_schema("Optional year.")}), _wrapped_annual, package="wechat.analytics") _register("wechat.media.get_avatar_url", "Build a URL for a contact avatar.", object_schema({**COMMON_ACCOUNT, "username": string_schema("Contact username.")}, required=["username"]), _avatar_url, package="wechat.media") _register("wechat.media.get_chat_image_url", "Build a URL for a chat image message resource.", object_schema(additional_properties=True), _chat_image_url, package="wechat.media") @@ -1962,31 +1390,11 @@ def _install_tools() -> None: _register("wechat.media.get_chat_video_thumb_url", "Build a URL for a chat video thumbnail.", object_schema(additional_properties=True), _chat_video_thumb_url, package="wechat.media") _register("wechat.media.get_chat_video_url", "Build a URL for a chat video resource.", object_schema(additional_properties=True), _chat_video_url, package="wechat.media") _register("wechat.media.get_chat_voice_url", "Build a URL for a chat voice file. This does not transcribe audio.", object_schema(additional_properties=True), _chat_voice_url, package="wechat.media") - _register("wechat.media.download_chat_emoji", "Download one emoji resource into local cache.", object_schema(additional_properties=True), _download_emoji, package="wechat.media", read_only=False) _register("wechat.media.get_decrypted_resource_url", "Build a URL for a previously decrypted resource by MD5.", object_schema({**COMMON_ACCOUNT, "md5": string_schema("32-character resource md5.")}, required=["md5"]), _decrypted_media_resource_url, package="wechat.media") _register("wechat.media.get_proxy_image_url", "Build a backend proxy URL for a remote chat image.", object_schema({"url": string_schema("Remote image URL.")}, required=["url"]), _chat_proxy_image_url, package="wechat.media") _register("wechat.media.get_favicon_url", "Build a backend URL for a web page favicon.", object_schema({"url": string_schema("Page URL.")}, required=["url"]), _chat_favicon_url, package="wechat.media") - _register("wechat.media.open_chat_media_folder", "Open a chat media file or folder on the desktop host.", object_schema(additional_properties=True), _open_chat_media_folder, package="wechat.media", read_only=False) _register("wechat.biz.get_proxy_image_url", "Build a backend proxy URL for an official-account image.", object_schema({"url": string_schema("Remote image URL.")}, required=["url"]), _biz_proxy_image_url, package="wechat.biz") - _register("wechat.export.preview_chat_targets", "Preview chat export targets.", object_schema({**COMMON_ACCOUNT, "include_hidden": bool_schema("Include hidden sessions.", default=True), "include_official": bool_schema("Include official sessions.", default=False)}), _chat_export_targets, package="wechat.export") - _register("wechat.export.create_chat_export", "Create a chat export job.", object_schema(additional_properties=True), _create_chat_export, package="wechat.export", read_only=False) - _register("wechat.export.list_chat_exports", "List chat export jobs.", object_schema(), _list_chat_exports, package="wechat.export") - _register("wechat.export.get_chat_export", "Get one chat export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _get_chat_export, package="wechat.export") - _register("wechat.export.cancel_chat_export", "Cancel one chat export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _cancel_chat_export, package="wechat.export", read_only=False) - _register("wechat.export.get_chat_export_download", "Return chat export download URL when ready.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _download_chat_export, package="wechat.export") - _register("wechat.export.get_chat_export_events_url", "Build an SSE URL for chat export progress events.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _chat_export_events_url, package="wechat.export") - _register("wechat.export.create_moments_export", "Create a Moments export job.", object_schema(additional_properties=True), _create_sns_export, package="wechat.export", read_only=False) - _register("wechat.export.list_moments_exports", "List Moments export jobs.", object_schema(), _list_sns_exports, package="wechat.export") - _register("wechat.export.get_moments_export", "Get one Moments export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _get_sns_export, package="wechat.export") - _register("wechat.export.cancel_moments_export", "Cancel one Moments export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _cancel_sns_export, package="wechat.export", read_only=False) - _register("wechat.export.get_moments_export_download", "Return Moments export download URL when ready.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _download_sns_export, package="wechat.export") - _register("wechat.export.get_moments_export_events_url", "Build an SSE URL for Moments export progress events.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _moments_export_events_url, package="wechat.export") - _register("wechat.export.create_account_archive", "Create a full account archive export job.", object_schema(additional_properties=True), _create_account_archive, package="wechat.export", read_only=False) - _register("wechat.export.get_account_archive", "Get account archive export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _get_account_archive, package="wechat.export") - _register("wechat.export.cancel_account_archive", "Cancel account archive export job.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _cancel_account_archive, package="wechat.export", read_only=False) - _register("wechat.export.get_account_archive_download", "Return account archive download URL when ready.", object_schema({"export_id": string_schema("Export id.")}, required=["export_id"]), _download_account_archive, package="wechat.export") - _register("wechat.mobile.get_overview", "Return a compact mobile overview and suggested next tools.", object_schema({**COMMON_ACCOUNT, "session_limit": int_schema("Session count.", minimum=1, maximum=30), "moments_limit": int_schema("Moments count.", minimum=0, maximum=10), "include_moments": bool_schema("Include Moments preview.", default=False)}), _mobile_overview, package="wechat.mobile") _register("wechat.mobile.get_home_snapshot", "Return a mobile-friendly account/session/Moments readiness snapshot.", object_schema({**COMMON_ACCOUNT, "session_limit": int_schema("Session count.", minimum=1, maximum=80), "moments_limit": int_schema("Moments count.", minimum=0, maximum=30), "include_moments": bool_schema("Include Moments preview.", default=True), "include_hidden": bool_schema("Include hidden sessions.", default=False), "include_official": bool_schema("Include official sessions.", default=False), "preview": string_schema("Session preview mode.")}), _mobile_home_snapshot, package="wechat.mobile") _register("wechat.mobile.resolve_target", "Resolve a fuzzy target to contacts, sessions, Moments users, or official accounts.", object_schema({**COMMON_ACCOUNT, "query": string_schema("Target clue."), "target_type": string_schema("auto, contact, session, moments_user, or biz."), "limit": int_schema("Maximum candidates.", minimum=1, maximum=20)}, required=["query"]), _mobile_resolve_target, package="wechat.mobile") @@ -1998,7 +1406,6 @@ def _install_tools() -> None: _register("wechat.mobile.get_media_links", "Return URL resources for chat, Moments, avatar, link, or emoji media.", object_schema(additional_properties=True), _mobile_get_media_links, package="wechat.mobile") _register("wechat.mobile.get_message_media_bundle", "Return likely media URLs for a message or link without fetching binary content.", object_schema(additional_properties=True), _mobile_message_media_bundle, package="wechat.mobile") _register("wechat.mobile.get_analytics", "Return compact analytics data by metric without loading full annual payloads.", object_schema(additional_properties=True), _mobile_get_analytics, package="wechat.mobile") - _register("wechat.mobile.export_job", "Preview, create, poll, download, or cancel chat/Moments/archive export jobs.", object_schema(additional_properties=True), _mobile_export_job, package="wechat.mobile", read_only=False) _install_tools() diff --git a/tests/test_mcp_router.py b/tests/test_mcp_router.py index 9dca452..7227fd9 100644 --- a/tests/test_mcp_router.py +++ b/tests/test_mcp_router.py @@ -1,8 +1,9 @@ import os import sys +import tempfile import unittest from pathlib import Path -from unittest.mock import AsyncMock, Mock, patch +from unittest.mock import patch from fastapi import FastAPI from fastapi.testclient import TestClient @@ -14,6 +15,75 @@ sys.path.insert(0, str(ROOT / "src")) class TestMcpRouter(unittest.TestCase): TEST_TOKEN = "test-mcp-token-1234567890" + REMOVED_MCP_TOOLS = { + "wechat.setup.get_saved_keys", + "wechat.setup.get_database_key", + "wechat.setup.get_image_key", + "wechat.setup.decrypt_databases", + "wechat.setup.get_decrypt_stream_url", + "wechat.setup.preview_import_decrypted", + "wechat.setup.get_import_decrypted_stream_url", + "wechat.setup.cancel_import_decrypted", + "wechat.setup.save_media_keys", + "wechat.setup.decrypt_all_media", + "wechat.setup.get_decrypt_all_media_stream_url", + "wechat.setup.get_download_all_emojis_stream_url", + "wechat.contacts.export_contacts", + "wechat.chat.get_realtime_status", + "wechat.chat.sync_realtime_session", + "wechat.chat.sync_realtime_all_sessions", + "wechat.chat.get_realtime_events_url", + "wechat.moments.sync_latest", + "wechat.editing.list_edited_sessions", + "wechat.editing.list_edited_messages", + "wechat.editing.get_message_edit_status", + "wechat.editing.edit_message", + "wechat.editing.repair_message_sender", + "wechat.editing.flip_message_direction", + "wechat.editing.reset_message_edit", + "wechat.editing.reset_session_edits", + "wechat.export.preview_chat_targets", + "wechat.export.create_chat_export", + "wechat.export.list_chat_exports", + "wechat.export.get_chat_export", + "wechat.export.cancel_chat_export", + "wechat.export.get_chat_export_download", + "wechat.export.get_chat_export_events_url", + "wechat.export.create_moments_export", + "wechat.export.list_moments_exports", + "wechat.export.get_moments_export", + "wechat.export.cancel_moments_export", + "wechat.export.get_moments_export_download", + "wechat.export.get_moments_export_events_url", + "wechat.export.create_account_archive", + "wechat.export.get_account_archive", + "wechat.export.cancel_account_archive", + "wechat.export.get_account_archive_download", + "wechat.mobile.export_job", + "wechat.admin.detect_wechat_installation", + "wechat.admin.get_current_wechat_account", + "wechat.admin.get_wechat_runtime_status", + "wechat.admin.delete_account_data", + "wechat.system.api_root", + "wechat.system.health_check", + "wechat.system.get_backend_log_file", + "wechat.system.open_backend_log_file", + "wechat.system.log_frontend_server_error", + "wechat.system.get_backend_port", + "wechat.system.set_backend_port_setting", + "wechat.system.set_backend_port_and_restart", + "wechat.system.get_mcp_lan_access", + "wechat.system.set_mcp_lan_access", + "wechat.system.get_img_helper_status", + "wechat.system.toggle_img_helper", + "wechat.system.pick_directory", + "wechat.chat.get_search_index_status", + "wechat.chat.build_search_index", + "wechat.chat.get_session_last_message_cache_status", + "wechat.chat.build_session_last_message_cache", + "wechat.media.download_chat_emoji", + "wechat.media.open_chat_media_folder", + } def setUp(self): self._old_mcp_token = os.environ.get("WECHAT_TOOL_MCP_TOKEN") @@ -59,13 +129,11 @@ class TestMcpRouter(unittest.TestCase): self.assertIn("wechat.chat.list_search_senders", names) self.assertIn("wechat.chat.resolve_chat_history", names) self.assertIn("wechat.chat.resolve_app_message", names) - self.assertIn("wechat.contacts.export_contacts", names) - self.assertIn("wechat.export.create_chat_export", names) - self.assertIn("wechat.export.get_account_archive_download", names) self.assertIn("wechat.moments.get_remote_video_url", names) self.assertNotIn("search_memory", names) self.assertNotIn("transcribe_voice_message", names) self.assertNotIn("transcribe_audio_file", names) + self.assertFalse(self.REMOVED_MCP_TOOLS & names) def test_mcp_requires_token(self): client = self._client(auth=False) @@ -122,6 +190,11 @@ class TestMcpRouter(unittest.TestCase): self.assertIn("bundleText", payload) self.assertIn("WeChat MCP Copilot", payload["bundleText"]) self.assertTrue(any(ref["path"] == "references/mobile.md" for ref in payload["references"])) + self.assertFalse(any(ref["path"] == "references/system.md" for ref in payload["references"])) + self.assertFalse(any(ref["path"] == "references/setup-system.md" for ref in payload["references"])) + self.assertFalse(any(ref["path"] == "references/export.md" for ref in payload["references"])) + for tool_name in self.REMOVED_MCP_TOOLS: + self.assertNotIn(tool_name, payload["bundleText"]) def test_skill_text_can_be_loaded_over_http(self): client = self._client() @@ -292,6 +365,26 @@ class TestMcpRouter(unittest.TestCase): payload = resp.json() self.assertEqual(payload["error"]["code"], -32601) + def test_removed_mcp_tools_are_not_listed_or_callable(self): + client = self._client() + + tools_resp = client.post("/mcp", json=self._rpc("tools/list")) + self.assertEqual(tools_resp.status_code, 200) + names = {tool["name"] for tool in tools_resp.json()["result"]["tools"]} + self.assertFalse(self.REMOVED_MCP_TOOLS & names) + + for tool_name in sorted(self.REMOVED_MCP_TOOLS): + with self.subTest(tool_name=tool_name): + direct_resp = client.post("/mcp", json=self._rpc(tool_name, {})) + call_resp = client.post( + "/mcp", + json=self._rpc("tools/call", {"name": tool_name, "arguments": {}}), + ) + self.assertEqual(direct_resp.status_code, 200) + self.assertEqual(call_resp.status_code, 200) + self.assertEqual(direct_resp.json()["error"]["code"], -32601) + self.assertEqual(call_resp.json()["error"]["code"], -32601) + def test_missing_tool_name_returns_invalid_params(self): client = self._client() @@ -347,6 +440,8 @@ class TestMcpRouter(unittest.TestCase): self.assertIn("/api/sns/media?", moments["url"]) self.assertEqual(moments["params"]["post_id"], "post-a") self.assertEqual(moments["params"]["media_id"], "media-a") + self.assertNotIn("use_cache", moments["params"]) + self.assertNotIn("use_cache", moments["url"]) def test_completed_mcp_packages_and_mobile_facade_are_listed(self): client = self._client() @@ -357,95 +452,36 @@ class TestMcpRouter(unittest.TestCase): tools = resp.json()["result"]["tools"] names = {tool["name"] for tool in tools} expected = { - "wechat.setup.get_saved_keys", - "wechat.setup.decrypt_databases", - "wechat.setup.get_decrypt_stream_url", - "wechat.setup.preview_import_decrypted", - "wechat.setup.get_decrypt_all_media_stream_url", - "wechat.system.health_check", - "wechat.system.get_backend_port", - "wechat.system.get_mcp_lan_access", - "wechat.system.set_mcp_lan_access", - "wechat.system.get_img_helper_status", - "wechat.system.open_backend_log_file", - "wechat.system.pick_directory", - "wechat.system.set_backend_port_and_restart", "wechat.media.get_decrypted_resource_url", "wechat.media.get_proxy_image_url", "wechat.media.get_favicon_url", - "wechat.media.open_chat_media_folder", - "wechat.export.get_chat_export_events_url", - "wechat.export.get_moments_export_events_url", - "wechat.chat.get_realtime_events_url", - "wechat.admin.delete_account_data", "wechat.mobile.get_overview", "wechat.mobile.resolve_target", "wechat.mobile.search_chat", "wechat.mobile.get_chat_context", "wechat.mobile.search_moments", "wechat.mobile.get_media_links", - "wechat.mobile.export_job", } self.assertTrue(expected.issubset(names)) + self.assertFalse(self.REMOVED_MCP_TOOLS & names) self.assertNotIn("search_memory", names) self.assertNotIn("transcribe_voice_message", names) self.assertNotIn("transcribe_audio_file", names) packages = {tool["annotations"]["package"] for tool in tools} - self.assertTrue({"wechat.setup", "wechat.system", "wechat.mobile"}.issubset(packages)) + self.assertTrue({"wechat.core", "wechat.mobile", "wechat.media"}.issubset(packages)) + self.assertFalse({"wechat.setup", "wechat.export", "wechat.editing", "wechat.system", "wechat.admin"} & packages) def test_new_url_helpers_return_urls_and_params(self): client = self._client() checks = [ - ( - "wechat.setup.get_decrypt_stream_url", - {"key": "a" * 64, "db_storage_path": r"D:\WeChat\db_storage"}, - "streamUrl", - "/api/decrypt_stream?", - ), - ( - "wechat.setup.get_import_decrypted_stream_url", - {"import_path": r"D:\backup\wxid_a", "job_id": "job-1"}, - "streamUrl", - "/api/import_decrypted?", - ), - ( - "wechat.setup.get_decrypt_all_media_stream_url", - {"account": "wxid_a", "xor_key": "0xA5", "concurrency": 3}, - "streamUrl", - "/api/media/decrypt_all_stream?", - ), - ( - "wechat.setup.get_download_all_emojis_stream_url", - {"account": "wxid_a", "force": True, "concurrency": 4}, - "streamUrl", - "/api/media/emoji/download_all_stream?", - ), ( "wechat.media.get_decrypted_resource_url", {"account": "wxid_a", "md5": "a" * 32}, "url", "/api/media/resource/", ), - ( - "wechat.chat.get_realtime_events_url", - {"account": "wxid_a", "interval_ms": 300}, - "streamUrl", - "/api/chat/realtime/stream?", - ), - ( - "wechat.export.get_chat_export_events_url", - {"export_id": "exp-1"}, - "streamUrl", - "/api/chat/exports/exp-1/events", - ), - ( - "wechat.export.get_moments_export_events_url", - {"export_id": "sns-1"}, - "streamUrl", - "/api/sns/exports/sns-1/events", - ), ] for tool_name, args, url_key, path_part in checks: @@ -456,26 +492,82 @@ class TestMcpRouter(unittest.TestCase): self.assertEqual(structured["status"], "success") self.assertIn(path_part, structured[url_key]) - def test_setup_and_system_wrappers_call_underlying_router(self): + def test_exposed_mcp_tools_are_read_only(self): client = self._client() - keys_router = Mock() - keys_router.get_saved_keys = AsyncMock(return_value={"status": "success", "keys": {"db_key": "k"}}) - system_router = Mock() - system_router.get_img_helper_status = AsyncMock(return_value={"enabled": False}) - with patch("wechat_decrypt_tool.mcp.tools._keys_router", return_value=keys_router), patch( - "wechat_decrypt_tool.mcp.tools._system_router", return_value=system_router - ): - keys_resp = client.post( - "/mcp", - json=self._rpc("wechat.setup.get_saved_keys", {"account": "wxid_a", "wxid_dir": r"D:\WeChat\wxid_a"}), - ) - helper_resp = client.post("/mcp", json=self._rpc("wechat.system.get_img_helper_status")) + resp = client.post("/mcp", json=self._rpc("tools/list")) - self.assertEqual(keys_resp.status_code, 200) - self.assertEqual(keys_resp.json()["result"]["structuredContent"]["keys"]["db_key"], "k") - keys_router.get_saved_keys.assert_awaited_once_with(account="wxid_a", db_storage_path=None, wxid_dir=r"D:\WeChat\wxid_a") - self.assertEqual(helper_resp.json()["result"]["structuredContent"], {"enabled": False}) + self.assertEqual(resp.status_code, 200) + tools = resp.json()["result"]["tools"] + self.assertTrue(tools) + for tool in tools: + with self.subTest(tool_name=tool["name"]): + annotations = tool.get("annotations") or {} + self.assertTrue(annotations.get("readOnlyHint")) + self.assertFalse(annotations.get("destructiveHint")) + + def test_analytics_schema_does_not_expose_refresh(self): + client = self._client() + + resp = client.post("/mcp", json=self._rpc("tools/list")) + + self.assertEqual(resp.status_code, 200) + tools = {tool["name"]: tool for tool in resp.json()["result"]["tools"]} + for tool_name in [ + "wechat.analytics.get_wrapped_meta", + "wechat.analytics.get_wrapped_card", + "wechat.analytics.get_wrapped_annual", + ]: + with self.subTest(tool_name=tool_name): + properties = tools[tool_name]["inputSchema"].get("properties") or {} + self.assertNotIn("refresh", properties) + + def test_analytics_tools_are_cache_only(self): + client = self._client() + + class FakeWrappedService: + _CACHE_VERSION = 26 + _IMPLEMENTED_UPTO_ID = 7 + _WRAPPED_CARD_MANIFEST = ({"id": 0, "title": "Overview"},) + + @staticmethod + def _default_year(): + return 2025 + + def build_wrapped_annual_meta(self, **_kwargs): + raise AssertionError("MCP analytics must not build wrapped meta.") + + def build_wrapped_annual_card(self, **_kwargs): + raise AssertionError("MCP analytics must not build wrapped card.") + + def build_wrapped_annual_response(self, **_kwargs): + raise AssertionError("MCP analytics must not build wrapped annual data.") + + with tempfile.TemporaryDirectory() as tmp: + account_dir = Path(tmp) / "wxid_a" + account_dir.mkdir() + with patch("wechat_decrypt_tool.mcp.tools._resolve_account_dir", return_value=account_dir), patch( + "wechat_decrypt_tool.mcp.tools._wrapped_service", return_value=FakeWrappedService() + ): + card_resp = client.post( + "/mcp", + json=self._rpc("wechat.analytics.get_wrapped_card", {"account": "wxid_a", "year": 2025, "card_id": 0}), + ) + annual_resp = client.post( + "/mcp", + json=self._rpc("wechat.analytics.get_wrapped_annual", {"account": "wxid_a", "year": 2025}), + ) + + self.assertFalse((account_dir / "_wrapped" / "cache").exists()) + + for resp in [card_resp, annual_resp]: + self.assertEqual(resp.status_code, 200) + result = resp.json()["result"] + self.assertTrue(result["isError"]) + structured = result["structuredContent"] + self.assertEqual(structured["status"], "error") + self.assertTrue(structured["cacheOnly"]) + self.assertEqual(structured["message"], "Wrapped cache not found. Open the app to generate it first.") def test_mobile_overview_uses_compact_facade(self): client = self._client() @@ -486,12 +578,6 @@ class TestMcpRouter(unittest.TestCase): ), patch( "wechat_decrypt_tool.mcp.tools._list_sessions", return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]}, - ), patch( - "wechat_decrypt_tool.mcp.tools._chat_realtime_status", return_value={"status": "success", "available": True} - ), patch( - "wechat_decrypt_tool.mcp.tools._search_index_status", return_value={"status": "ready"} - ), patch( - "wechat_decrypt_tool.mcp.tools._session_last_message_status", return_value={"status": "ready"} ): resp = client.post( "/mcp", @@ -504,6 +590,31 @@ class TestMcpRouter(unittest.TestCase): self.assertTrue(structured["ready"]) self.assertEqual(structured["defaultAccount"], "wxid_a") self.assertIn("wechat.mobile.search_chat", structured["suggestedTools"]) + self.assertNotIn("wechat.mobile.export_job", structured["suggestedTools"]) + self.assertNotIn("realtime", structured["health"]) + self.assertNotIn("indexes", structured["health"]) + + def test_mobile_overview_does_not_expose_realtime_status(self): + client = self._client() + + with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_a"]), patch( + "wechat_decrypt_tool.mcp.tools._get_account_info", + return_value={"status": "success", "account": "wxid_a", "databaseCount": 3}, + ), patch( + "wechat_decrypt_tool.mcp.tools._list_sessions", + return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]}, + ): + resp = client.post( + "/mcp", + json=self._rpc("wechat.mobile.get_overview", {"account": "wxid_a", "session_limit": 5}), + ) + + self.assertEqual(resp.status_code, 200) + payload = resp.json() + self.assertNotIn("error", payload) + structured = payload["result"]["structuredContent"] + self.assertNotIn("realtime", structured["health"]) + self.assertNotIn("indexes", structured["health"]) def test_mobile_resolve_target_normalizes_candidates(self): client = self._client()