Compare commits

..

5 Commits

25 changed files with 968 additions and 945 deletions
+10 -11
View File
@@ -161,7 +161,7 @@ npm run dev
## MCP 服务
后端提供 MCP JSON-RPC over HTTP 服务,默认只监听 `127.0.0.1`。手机接入局域网时,在应用内打开 **设置 -> MCP 接入 -> 允许手机局域网接入 MCP**,后端会切换为监听 `0.0.0.0` 并重启。
后端提供 MCP JSON-RPC over HTTP 服务,默认只监听 `127.0.0.1`。手机接入局域网时,在应用内打开 **设置 -> MCP 接入 -> 允许手机局域网接入 MCP**,后端会切换为监听 `0.0.0.0` 并重启;设置页展示和复制的接入地址会使用电脑实际局域网 IP,例如 `http://192.168.x.x:10392/mcp`,而不是不可被其他设备访问的 `127.0.0.1`
MCP 入口需要 token 鉴权。设置页提供 **MCP Token**、**AI 接入提示词** 和 **Skill Markdown** 三个独立复制区;token 可一键复制或重置,重置后旧 token 立即失效。手机端或外部 AI 客户端访问 `/mcp``/mcp/skill/bundle``/mcp/skill` 时,都应带上 `Authorization: Bearer <MCP_TOKEN>`。兼容客户端也可以使用 `X-MCP-Token` 请求头或 `?token=` 查询参数,但推荐使用 Bearer token。
@@ -169,23 +169,22 @@ MCP 入口需要 token 鉴权。设置页提供 **MCP Token**、**AI 接入提
工具调用成功时,客户端优先读取 `result.structuredContent``content[0].text` 只是给通用 MCP 客户端展示的 JSON 文本副本。业务未就绪时仍可能返回 JSON-RPC success,但 `result.isError=true`;协议错误或参数错误则返回 JSON-RPC `error`
MCP 仅暴露读取数据与获取媒体资源 URL/参数的能力;系统设置、索引与缓存构建、数据准备、导出、实时同步、本地修订、数据删除等操作类能力不通过 MCP 暴露,请在桌面/网页应用内使用。
工具按包分层:
- `wechat.core`: 状态、工具目录、账号列表、账号信息
- `wechat.setup`: 密钥读取、数据库解密、已解密目录导入、媒体密钥保存、批量媒体处理入口
- `wechat.system`: 健康检查、后端端口、日志路径、大图辅助插件状态等系统能力
- `wechat.mobile`: 面向手机和外部代理的聚合入口,默认返回小结果和下一步建议
- `wechat.contacts`: 联系人列表、模糊解析、联系人导出
- `wechat.contacts`: 联系人列表、模糊解析
- `wechat.chat`: 会话、消息、搜索、发送者筛选、上下文、锚点、合并转发/AppMsg 解析、统计
- `wechat.moments`: 朋友圈时间线、用户、同步、图片/视频/文章封面 URL
- `wechat.media`: 聊天/朋友圈图片、视频、表情、头像、语音文件 URL、远程图片代理与资源辅助
- `wechat.moments`: 朋友圈时间线、用户、图片/视频/文章封面 URL
- `wechat.media`: 聊天/朋友圈图片、视频、表情、头像、语音文件 URL、远程图片代理与资源辅助;只返回 URL 或资源参数,不提供下载缓存或打开本机目录操作
- `wechat.biz`: 公众号/服务号与微信支付记录
- `wechat.analytics`: 年度总结与卡片懒加载
- `wechat.export`: 聊天、朋友圈、账号归档导出任务、下载 URL 与进度 SSE URL
- `wechat.admin`: 微信检测、索引、实时同步等管理能力
- `wechat.editing`: 消息编辑、修复、恢复与审计
- `wechat.analytics`: 年度总结与聚合分析读取;年度总结只读取应用内已生成的缓存,未生成时请先在应用内打开年度总结
媒体、视频、SSE 进度和 ZIP 导出不会直接塞进 MCP JSON 响应;相关工具返回可访问 URL、`streamUrl`、任务状态或资源参数
会话列表、联系人和头像相关接口均采用 best-effort 读取策略。即使 `contact.db` 中某些头像字段损坏或无法按 UTF-8 解码,也会继续返回昵称、会话摘要和其他可用内容,头像则自动降级为空或占位,不会阻塞整页数据加载
媒体和视频不会直接塞进 MCP JSON 响应;相关工具返回可访问 URL 或资源参数。
配套 skill 可通过 HTTP 加载,访问时需要带 MCP token:
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "wechat-data-analysis-desktop",
"version": "1.9.0",
"version": "1.9.2",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "wechat-data-analysis-desktop",
"version": "1.9.0",
"version": "1.9.2",
"dependencies": {
"electron-updater": "^6.7.3"
},
+1 -1
View File
@@ -1,7 +1,7 @@
{
"name": "wechat-data-analysis-desktop",
"private": true,
"version": "1.9.0",
"version": "1.9.2",
"main": "src/main.cjs",
"scripts": {
"dev": "node scripts/dev.cjs",
+93 -5
View File
@@ -21,6 +21,7 @@ const crypto = require("crypto");
const fs = require("fs");
const http = require("http");
const net = require("net");
const os = require("os");
const path = require("path");
const { Worker } = require("worker_threads");
const {
@@ -101,6 +102,84 @@ function getBackendAccessHost() {
return host || "127.0.0.1";
}
function getInterfacePenalty(name) {
const lower = String(name || "").toLowerCase();
if (/(docker|hyper-v|loopback|npcap|tailscale|virtual|virtualbox|vmware|vethernet|wsl|zerotier)/i.test(lower)) {
return 30;
}
if (/(ethernet|wi-fi|wifi|wireless|wlan|以太|无线)/i.test(lower)) {
return 0;
}
return 10;
}
function isReachableClientIpv4(address) {
const text = String(address || "").trim();
const parts = text.split(".");
if (parts.length !== 4) return false;
const nums = parts.map((part) => Number(part));
if (!nums.every((n) => Number.isInteger(n) && n >= 0 && n <= 255)) return false;
if (nums[0] === 0 || nums[0] === 127 || nums[0] >= 224) return false;
if (nums[0] === 169 && nums[1] === 254) return false;
return true;
}
function isPrivateIpv4(address) {
const nums = String(address || "").trim().split(".").map((part) => Number(part));
if (nums.length !== 4 || !nums.every((n) => Number.isInteger(n))) return false;
return (
nums[0] === 10 ||
(nums[0] === 172 && nums[1] >= 16 && nums[1] <= 31) ||
(nums[0] === 192 && nums[1] === 168)
);
}
function getLanAccessHost(defaultHost = DEFAULT_BACKEND_HOST) {
const candidates = [];
const seen = new Set();
const addCandidate = (address, interfaceName = "", sourceOrder = 0) => {
const value = String(address || "").trim();
if (!isReachableClientIpv4(value) || seen.has(value)) return;
seen.add(value);
candidates.push([
isPrivateIpv4(value) ? 0 : 1,
getInterfacePenalty(interfaceName),
sourceOrder,
value,
]);
};
try {
const interfaces = os.networkInterfaces();
for (const [name, addresses] of Object.entries(interfaces || {})) {
for (const item of addresses || []) {
if (!item || (item.family !== "IPv4" && item.family !== 4) || item.internal) continue;
addCandidate(item.address, name, 0);
}
}
} catch {}
candidates.sort((a, b) => a[0] - b[0] || a[1] - b[1] || a[2] - b[2]);
return candidates[0]?.[3] || defaultHost;
}
function getMcpAccessHost(bindHost = getBackendBindHost()) {
const host = String(bindHost || "").trim();
if (host === LAN_BACKEND_HOST || host === "::") return getLanAccessHost(DEFAULT_BACKEND_HOST);
return host || DEFAULT_BACKEND_HOST;
}
function getMcpAccessInfo(bindHost = getBackendBindHost(), port = getBackendPort()) {
const accessHost = getMcpAccessHost(bindHost);
const origin = `http://${formatHostForUrl(accessHost)}:${port}`;
return {
accessHost,
mcpEndpoint: `${origin}/mcp`,
skillBundleUrl: `${origin}/mcp/skill/bundle`,
skillMarkdownUrl: `${origin}/mcp/skill`,
};
}
function getBackendPort() {
const envPort = parsePort(process.env.WECHAT_TOOL_PORT);
if (envPort != null) return envPort;
@@ -2340,19 +2419,24 @@ function registerWindowIpc() {
ipcMain.handle("backend:getMcpLanAccess", () => {
try {
const host = getBackendBindHost();
const port = getBackendPort();
return {
enabled: getMcpLanAccessEnabled(),
host: getBackendBindHost(),
port: getBackendPort(),
host,
port,
uiUrl: getDesktopUiUrl(),
...getMcpAccessInfo(host, port),
};
} catch (err) {
logMain(`[main] backend:getMcpLanAccess failed: ${err?.message || err}`);
const port = DEFAULT_BACKEND_PORT;
return {
enabled: false,
host: DEFAULT_BACKEND_HOST,
port: DEFAULT_BACKEND_PORT,
port,
uiUrl: getDesktopUiUrl(),
...getMcpAccessInfo(DEFAULT_BACKEND_HOST, port),
};
}
});
@@ -2363,13 +2447,16 @@ function registerWindowIpc() {
const nextEnabled = !!enabled;
const prevEnabled = getMcpLanAccessEnabled();
if (nextEnabled === prevEnabled) {
const host = getBackendBindHost();
const port = getBackendPort();
return {
success: true,
changed: false,
enabled: prevEnabled,
host: getBackendBindHost(),
port: getBackendPort(),
host,
port,
uiUrl: getDesktopUiUrl(),
...getMcpAccessInfo(host, port),
};
}
@@ -2396,6 +2483,7 @@ function registerWindowIpc() {
host: getBackendBindHost(),
port: getBackendPort(),
uiUrl,
...getMcpAccessInfo(),
};
} finally {
backendPortChangeInProgress = false;
+22
View File
@@ -269,6 +269,7 @@
<div class="min-w-0 flex-1">
<div class="text-[13px] font-medium text-[#222]">允许手机局域网接入 MCP</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">开启后后端监听 0.0.0.0手机可通过接入提示词中的地址接入</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090] break-all">当前地址{{ mcpEndpoint }}</div>
<div v-if="mcpLanAccessMessage" class="mt-1 text-[11px] leading-relaxed text-[#1b6b43]">{{ mcpLanAccessMessage }}</div>
<div v-if="mcpLanAccessError" class="mt-1 text-[11px] leading-relaxed text-red-600">{{ mcpLanAccessError }}</div>
</div>
@@ -608,6 +609,8 @@ const mcpSkillBundleText = ref('')
const mcpSkillBundleLoading = ref(false)
const mcpSkillBundleError = ref('')
const mcpCopiedKey = ref('')
const mcpAccessHost = ref('')
const mcpAccessEndpoint = ref('')
let mcpCopiedTimer = null
const mcpPortText = computed(() => {
@@ -617,6 +620,10 @@ const mcpPortText = computed(() => {
})
const mcpEndpoint = computed(() => {
const reported = String(mcpAccessEndpoint.value || '').trim()
if (/^https?:\/\//i.test(reported)) return reported
const reportedHost = String(mcpAccessHost.value || '').trim()
if (reportedHost) return `http://${reportedHost}:${mcpPortText.value}/mcp`
if (!process.client || typeof window === 'undefined') return `http://127.0.0.1:${mcpPortText.value}/mcp`
const apiBase = useApiBase()
if (/^https?:\/\//i.test(apiBase)) {
@@ -630,6 +637,14 @@ const mcpEndpoint = computed(() => {
return `${protocol}//${host}:${mcpPortText.value}/mcp`
})
const applyMcpAccessInfo = (resp) => {
if (!resp || typeof resp !== 'object') return
const accessHost = String(resp.accessHost || resp.access_host || '').trim()
const endpoint = String(resp.mcpEndpoint || resp.mcp_endpoint || '').trim()
if (accessHost) mcpAccessHost.value = accessHost
if (/^https?:\/\//i.test(endpoint)) mcpAccessEndpoint.value = endpoint
}
const mcpSkillFallback = [
'# WeChat MCP Copilot',
'',
@@ -800,10 +815,12 @@ const refreshMcpLanAccess = async () => {
if (window.wechatDesktop?.getMcpLanAccess) {
const resp = await window.wechatDesktop.getMcpLanAccess()
mcpLanAccessEnabled.value = !!resp?.enabled
applyMcpAccessInfo(resp)
return
}
const resp = await fetchAdminEndpoint('/admin/mcp-access')
mcpLanAccessEnabled.value = !!resp?.enabled
applyMcpAccessInfo(resp)
} catch (e) {
mcpLanAccessError.value = e?.message || '读取 MCP 接入状态失败'
} finally {
@@ -881,7 +898,9 @@ const setMcpLanAccess = async (enabled) => {
if (window.wechatDesktop?.setMcpLanAccess) {
const resp = await window.wechatDesktop.setMcpLanAccess(!!enabled)
mcpLanAccessEnabled.value = !!resp?.enabled
applyMcpAccessInfo(resp)
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,后端已重启。' : 'MCP 局域网接入状态未变化。'
await refreshMcpSkillBundle()
return
}
@@ -890,11 +909,14 @@ const setMcpLanAccess = async (enabled) => {
body: { enabled: !!enabled },
})
mcpLanAccessEnabled.value = !!resp?.enabled
applyMcpAccessInfo(resp)
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,正在等待后端重启。' : 'MCP 局域网接入状态未变化。'
if (resp?.changed) {
await waitForBackendHealth(30_000)
await refreshMcpLanAccess()
mcpLanAccessMessage.value = 'MCP 局域网接入已更新,后端已恢复。'
}
await refreshMcpSkillBundle()
} catch (e) {
mcpLanAccessEnabled.value = previous
mcpLanAccessError.value = e?.message || '设置 MCP 接入状态失败'
+4
View File
@@ -11,6 +11,7 @@
import uvicorn
import os
from pathlib import Path
from wechat_decrypt_tool.network_access import get_lan_access_host
from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
def main():
@@ -18,6 +19,7 @@ def main():
host, host_source = read_effective_backend_host(default="127.0.0.1")
port, port_source = read_effective_backend_port(default=10392)
access_host = "127.0.0.1" if host in {"0.0.0.0", "::"} else host
lan_access_host = get_lan_access_host(default="127.0.0.1") if host in {"0.0.0.0", "::"} else access_host
print("=" * 60)
print("微信解密工具 API 服务")
@@ -38,6 +40,8 @@ def main():
print(f"监听地址: {host}")
print(f"API文档: http://{access_host}:{port}/docs")
print(f"健康检查: http://{access_host}:{port}/api/health")
if lan_access_host != access_host:
print(f"局域网 MCP: http://{lan_access_host}:{port}/mcp")
print("按 Ctrl+C 停止服务")
print("=" * 60)
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "wechat-decrypt-tool"
version = "1.9.0"
version = "1.9.2"
description = "Modern WeChat database decryption tool with React frontend"
readme = "README.md"
requires-python = ">=3.11"
+2 -4
View File
@@ -1,7 +1,7 @@
---
name: wechat-mcp-copilot
version: "1.0.0"
description: Use WeChatDataAnalysis MCP to inspect local WeChat accounts, contacts, sessions, messages, Moments, media, exports, and analytics through a small routed playbook. Trigger when the user asks to search, summarize, export, diagnose, or reason over local WeChat data.
description: Use WeChatDataAnalysis MCP to inspect local WeChat accounts, contacts, sessions, messages, Moments, media, and analytics through a small routed playbook. Trigger when the user asks to search, summarize, diagnose, or reason over local WeChat data.
---
# WeChat MCP Copilot
@@ -12,7 +12,7 @@ Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targ
1. Start with `references/routing.md`.
2. Load only one domain reference after routing.
3. Load `references/pagination-budget.md` before broad searches, exports, or multi-page scans.
3. Load `references/pagination-budget.md` before broad searches or multi-page scans.
4. Use `references/failure-recovery.md` when MCP, database readiness, or empty results are unclear.
5. For phone, ScreenMemo, or external MCP clients, prefer `wechat.mobile.*` facade tools before low-level tools.
6. Do not load the full tool catalog unless the user asks about available tools.
@@ -21,12 +21,10 @@ Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targ
- `references/routing.md`: first-hop intent routing.
- `references/mobile.md`: phone-friendly facade tools and compact response rules.
- `references/setup-system.md`: setup, keys, decrypt, import, health, and system operations.
- `references/target-resolution.md`: fuzzy contact/session resolution.
- `references/chats.md`: chat sessions, messages, search, and context.
- `references/moments.md`: Moments timeline, posters, likes, comments, media.
- `references/media.md`: images, videos, emoji, files, voice resources without transcription.
- `references/export.md`: chat, Moments, and account archive export jobs.
- `references/analytics.md`: wrapped cards, counts, rankings, and aggregate analysis.
- `references/pagination-budget.md`: limits, cursors, result clipping, stopping rules.
- `references/failure-recovery.md`: empty result, not-ready database, ambiguous targets, retries.
@@ -13,7 +13,7 @@ Use this for annual summaries, rankings, counts, and aggregate questions.
## Rules
- Prefer `get_wrapped_meta` then `get_wrapped_card` for mobile or constrained contexts.
- Wrapped annual tools read existing generated cache only; if a cache is missing, ask the user to open Wrapped in the desktop/web app first.
- Use `get_wrapped_annual` only when the user needs the whole annual dataset.
- For broad statistics, prefer aggregate tools or targeted searches over full message pagination.
- Always state the account, time range, and metric basis when answering.
@@ -1,29 +0,0 @@
# Export
Use export only when the user asks for an artifact.
## Chat Export
1. Resolve target session if `scope=selected`.
2. Confirm time range, format, media options, and output directory when needed.
3. Preview targets with `wechat.export.preview_chat_targets`.
4. Create job with `wechat.export.create_chat_export`.
5. Poll `wechat.export.get_chat_export`.
6. Return `wechat.export.get_chat_export_download` when ready.
7. Use `wechat.export.get_chat_export_events_url` when the client can consume SSE progress.
## Moments Export
Use `wechat.export.create_moments_export`, `wechat.export.get_moments_export`, `wechat.export.get_moments_export_download`, and `wechat.export.get_moments_export_events_url`.
## Account Archive
Use `wechat.export.create_account_archive`, `wechat.export.get_account_archive`, `wechat.export.get_account_archive_download`, and `wechat.export.cancel_account_archive`.
## Contacts Export
Use `wechat.contacts.export_contacts` only when the user asks for a contacts file. It writes JSON or CSV to a local output directory.
Do not silently export all history and all media unless the user explicitly asked for that scope.
For phone clients, prefer `wechat.mobile.export_job` unless exact low-level export options are required.
@@ -8,9 +8,8 @@ Use this when MCP status, DB readiness, or results are suspicious.
2. `wechat.core.get_status`
3. `wechat.core.list_accounts`
4. `wechat.core.get_account_info`
5. Search index status with `wechat.chat.get_search_index_status` when message search fails.
6. Moments availability by checking account info and `wechat.moments.list_users`.
7. Setup readiness: load `setup-system.md` for keys, decrypt, import, health, or media-key problems.
5. Moments availability by checking account info and `wechat.moments.list_users`.
6. For backend diagnostics, MCP LAN access, data preparation, index/cache build, export, realtime sync, local editing, or system settings, direct the user to the desktop/web app.
## Empty Results
@@ -18,4 +17,4 @@ Use this when MCP status, DB readiness, or results are suspicious.
- Try contact/session resolution with a simpler keyword.
- Try session search before global message search when a target is known.
- For Moments, resolve poster identity before timeline filtering.
- If setup is not ready, stop content tools and explain the missing readiness condition.
- If data is not ready, stop content tools and explain that data preparation is handled in the desktop app, not through MCP.
@@ -13,7 +13,6 @@ Use this for image, video, emoji, file, link, and voice resources.
- `wechat.media.get_decrypted_resource_url`
- `wechat.media.get_proxy_image_url`
- `wechat.media.get_favicon_url`
- `wechat.media.open_chat_media_folder`
- `wechat.biz.get_proxy_image_url`
- `wechat.moments.get_media_url`
- `wechat.moments.get_article_thumb_url`
@@ -25,6 +24,6 @@ Use this for image, video, emoji, file, link, and voice resources.
- Media tools return URLs or resource metadata; they do not inline large binary payloads.
- Voice resources are files only. Do not transcribe voice messages.
- For phone clients, prefer `wechat.mobile.get_media_links` first.
- `open_chat_media_folder` is a desktop-host action; do not use it for phone-only flows.
- MCP does not open local folders or download media into cache; use returned URLs in the client.
- Locate the message first, then fetch media URL by message fields such as `server_id`, `username`, `md5`, or returned media references.
- For Moments, prefer local media URL fields from timeline records. Use remote video/article helpers only when the timeline record has a remote URL or article URL.
@@ -13,18 +13,18 @@ Use this for phone, ScreenMemo, and external MCP clients unless the user needs a
- `wechat.mobile.search_moments`: compact Moments search.
- `wechat.mobile.get_media_links`: URL-only media lookup.
- `wechat.mobile.get_analytics`: compact analytics by metric.
- `wechat.mobile.export_job`: preview/create/status/download/cancel export jobs.
## Budget Rules
- Keep `limit` at 10-20 for first calls.
- Use `offset` or returned cursor fields for paging.
- Do not call full annual analytics by default; use `metric=digest` or a single card.
- Do not call full annual analytics by default; use `metric=digest` or a single card. Wrapped annual data is cache-only through MCP.
- Do not fetch binary media through MCP. Use returned URLs in the app.
- Use low-level tools only for debugging, editing, raw fields, unusual media, or exact export control.
- Use low-level tools only for debugging, raw fields, or unusual media URL construction.
- Data preparation, index/cache build, export, realtime sync, local editing, system settings, and data deletion tools are not exposed through MCP.
## Recovery
- If `ready=false`, load `setup-system.md`.
- If `ready=false`, stop content tools and direct the user to the desktop/web app for data preparation or backend diagnostics.
- If target resolution is ambiguous, ask for one clarifying clue or show top candidates.
- If search returns nothing, try `resolve_target` and then `get_chat_context` before declaring no data.
@@ -14,5 +14,4 @@ Use this for 朋友圈, posts, likes, comments, shared links, and Moments media.
- Person names must be resolved to username before filtering timeline by `usernames`.
- Keyword search is for post content/topic, not poster identity.
- Do not request raw XML by default.
- Use `wechat.moments.sync_latest` only when the user explicitly wants fresh local sync or status indicates data is stale.
- Realtime/local sync tools are not exposed through MCP; ask the user to refresh data in the app when Moments data looks stale.
@@ -8,17 +8,16 @@ Use this first for every WeChatDataAnalysis MCP task.
- Status, readiness, "why can't I find anything": `wechat.core.get_status`, or `wechat.mobile.get_overview` for phone clients.
- Available tools or packages: `wechat.core.list_tools`.
- Account selection: `wechat.core.list_accounts`, then `wechat.core.get_account_info`.
- Key/decrypt/import/backend health problems: load `setup-system.md`.
- Backend health, logs, MCP LAN access, port, system settings, key/decrypt/import/data preparation, index/cache build, export, realtime sync, local editing, or data deletion requests: explain that these operations are not exposed through MCP and should be handled in the desktop/web app.
- Fuzzy person/group/official account: load `target-resolution.md`.
- Chat content, recent messages, keyword search: load `chats.md`.
- Moments / 朋友圈 / likes / comments / post media: load `moments.md`.
- Images, videos, emoji, files, voice resources: load `media.md`.
- Export requests: load `export.md`.
- Rankings, yearly summary, activity stats: load `analytics.md`.
- Empty results or setup errors: load `failure-recovery.md`.
- Rankings, yearly summary, activity stats: load `analytics.md`; if Wrapped cache is missing, ask the user to generate it in the app.
- Empty results or readiness errors: load `failure-recovery.md`.
## Mixed Intent
Resolve the target first, then load only the main domain reference. Do not load chats, moments, media, export, and analytics together unless the user explicitly asks for a broad audit.
Resolve the target first, then load only the main domain reference. Do not load chats, moments, media, and analytics together unless the user explicitly asks for a broad audit.
For phone clients, keep using `mobile.md` until the user needs a low-level fallback such as editing, raw fields, special media URL construction, or exact export options.
For phone clients, keep using `mobile.md` until the user needs a low-level fallback such as raw fields or special media URL construction.
@@ -1,41 +0,0 @@
# Setup And System
Use this when the database is not ready, keys are needed, decrypted data must be imported, or the backend itself needs inspection.
## Setup Tools
- `wechat.setup.get_saved_keys`: read saved DB/media keys for an account or wxid directory.
- `wechat.setup.get_database_key`: desktop workflow to extract the DB key from local WeChat.
- `wechat.setup.get_image_key`: fetch and save image AES/XOR keys.
- `wechat.setup.decrypt_databases`: decrypt databases from `db_storage_path` and a DB key.
- `wechat.setup.get_decrypt_stream_url`: SSE URL for decrypt progress.
- `wechat.setup.preview_import_decrypted`: validate an already-decrypted account directory.
- `wechat.setup.get_import_decrypted_stream_url`: SSE URL for import progress.
- `wechat.setup.cancel_import_decrypted`: cancel an import job by `job_id`.
- `wechat.setup.save_media_keys`: save media XOR/AES keys.
- `wechat.setup.decrypt_all_media`: decrypt all local `.dat` image resources.
- `wechat.setup.get_decrypt_all_media_stream_url`: SSE URL for bulk media decrypt progress.
- `wechat.setup.get_download_all_emojis_stream_url`: SSE URL for bulk emoji download progress.
## System Tools
- `wechat.system.health_check`
- `wechat.system.api_root`
- `wechat.system.get_backend_log_file`
- `wechat.system.open_backend_log_file`
- `wechat.system.get_backend_port`
- `wechat.system.set_backend_port_setting`
- `wechat.system.set_backend_port_and_restart`
- `wechat.system.get_img_helper_status`
- `wechat.system.toggle_img_helper`
- `wechat.system.pick_directory`
- `wechat.system.log_frontend_server_error`
## Rules
- Stream tools return `streamUrl`; the client consumes SSE outside the MCP JSON response.
- `set_backend_port_setting` persists the setting and may require backend restart by the user/client flow.
- `set_backend_port_and_restart` changes the port through the desktop restart flow and will disrupt the current backend connection.
- `open_backend_log_file` and `pick_directory` are desktop-host GUI actions; do not use them for phone-only flows.
- DB key extraction and image helper toggling depend on the local desktop WeChat state.
- Import/decrypt/media bulk operations write local files; summarize expected impact before running them.
+1 -1
View File
@@ -1,5 +1,5 @@
"""微信数据库解密工具
"""
__version__ = "1.9.0"
__version__ = "1.9.2"
__author__ = "WeChat Decrypt Tool"
+68 -17
View File
@@ -1992,45 +1992,93 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
return previews
def _pick_display_name(contact_row: Optional[sqlite3.Row], fallback_username: str) -> str:
def _row_get_value(row: Any, key: str, default: Any = None) -> Any:
if row is None:
return default
try:
return row[key]
except Exception:
pass
if isinstance(row, dict):
return row.get(key, default)
return default
def _normalize_contact_text(value: Any) -> str:
return _decode_sqlite_text(value).strip()
def _normalize_avatar_url(value: Any) -> str:
if value is None:
return ""
if isinstance(value, memoryview):
value = value.tobytes()
if isinstance(value, (bytes, bytearray)):
raw = bytes(value)
if not raw:
return ""
# Avatar URLs should be ASCII/UTF-8 HTTP(S) URLs. If invalid bytes were
# stored in the TEXT column, ignore that avatar instead of failing the
# surrounding chat/contact response.
try:
text = raw.decode("utf-8")
except UnicodeDecodeError:
return ""
else:
text = str(value or "")
text = text.strip()
if text.lower().startswith(("http://", "https://")):
return text
return ""
def _contact_row_to_dict(row: Any) -> dict[str, Any]:
username = _normalize_contact_text(_row_get_value(row, "username", ""))
return {
"username": username,
"remark": _normalize_contact_text(_row_get_value(row, "remark", "")),
"nick_name": _normalize_contact_text(_row_get_value(row, "nick_name", "")),
"alias": _normalize_contact_text(_row_get_value(row, "alias", "")),
"big_head_url": _normalize_avatar_url(_row_get_value(row, "big_head_url", "")),
"small_head_url": _normalize_avatar_url(_row_get_value(row, "small_head_url", "")),
}
def _pick_display_name(contact_row: Optional[Any], fallback_username: str) -> str:
if contact_row is None:
return fallback_username
for key in ("remark", "nick_name", "alias"):
try:
v = contact_row[key]
except Exception:
v = None
if isinstance(v, str) and v.strip():
return v.strip()
v = _normalize_contact_text(_row_get_value(contact_row, key, ""))
if v:
return v
return fallback_username
def _pick_avatar_url(contact_row: Optional[sqlite3.Row]) -> Optional[str]:
def _pick_avatar_url(contact_row: Optional[Any]) -> Optional[str]:
if contact_row is None:
return None
for key in ("big_head_url", "small_head_url"):
try:
v = contact_row[key]
except Exception:
v = None
if isinstance(v, str) and v.strip():
return v.strip()
v = _normalize_avatar_url(_row_get_value(contact_row, key, ""))
if v:
return v
return None
def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, sqlite3.Row]:
def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, dict[str, Any]]:
uniq = list(dict.fromkeys([u for u in usernames if u]))
if not uniq:
return {}
result: dict[str, sqlite3.Row] = {}
result: dict[str, dict[str, Any]] = {}
conn = sqlite3.connect(str(contact_db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
try:
def query_table(table: str, targets: list[str]) -> None:
if not targets:
@@ -2043,7 +2091,10 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
"""
rows = conn.execute(sql, targets).fetchall()
for r in rows:
result[r["username"]] = r
item = _contact_row_to_dict(r)
username = str(item.get("username") or "").strip()
if username:
result[username] = item
query_table("contact", uniq)
missing = [u for u in uniq if u not in result]
File diff suppressed because it is too large Load Diff
+153
View File
@@ -0,0 +1,153 @@
from __future__ import annotations
import ipaddress
import socket
_VIRTUAL_INTERFACE_MARKERS = (
"docker",
"hyper-v",
"loopback",
"npcap",
"tailscale",
"virtual",
"virtualbox",
"vmware",
"vethernet",
"wsl",
"zerotier",
)
_PREFERRED_INTERFACE_MARKERS = (
"ethernet",
"wi-fi",
"wifi",
"wireless",
"wlan",
"以太",
"无线",
)
def _parse_ipv4(value: object) -> ipaddress.IPv4Address | None:
try:
ip = ipaddress.ip_address(str(value or "").strip())
except ValueError:
return None
return ip if isinstance(ip, ipaddress.IPv4Address) else None
def _is_reachable_client_ipv4(ip: ipaddress.IPv4Address) -> bool:
return not (
ip.is_loopback
or ip.is_unspecified
or ip.is_link_local
or ip.is_multicast
or ip.is_reserved
)
def _interface_penalty(name: str) -> int:
lower = str(name or "").lower()
if any(marker in lower for marker in _VIRTUAL_INTERFACE_MARKERS):
return 30
if any(marker in lower for marker in _PREFERRED_INTERFACE_MARKERS):
return 0
return 10
def _add_candidate(
candidates: list[tuple[int, int, int, str]],
seen: set[str],
value: object,
*,
interface_name: str = "",
source_order: int = 0,
) -> None:
ip = _parse_ipv4(value)
if not ip or not _is_reachable_client_ipv4(ip):
return
text = str(ip)
if text in seen:
return
seen.add(text)
private_rank = 0 if ip.is_private else 1
candidates.append((private_rank, _interface_penalty(interface_name), source_order, text))
def _add_psutil_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
try:
import psutil # type: ignore
except Exception:
return
try:
stats_by_name = psutil.net_if_stats()
interfaces = psutil.net_if_addrs()
except Exception:
return
for interface_name, addresses in interfaces.items():
try:
stats = stats_by_name.get(interface_name)
if stats is not None and not bool(getattr(stats, "isup", False)):
continue
except Exception:
pass
for addr in addresses:
try:
if getattr(addr, "family", None) != socket.AF_INET:
continue
_add_candidate(
candidates,
seen,
getattr(addr, "address", ""),
interface_name=interface_name,
source_order=0,
)
except Exception:
continue
def _add_route_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
# UDP connect 不会实际发包,只用于询问系统默认出站路由会使用哪个本机地址。
for target in ("223.5.5.5", "8.8.8.8", "1.1.1.1"):
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
sock.settimeout(0.2)
sock.connect((target, 80))
local_ip = sock.getsockname()[0]
except Exception:
continue
_add_candidate(candidates, seen, local_ip, interface_name="", source_order=1)
def _add_hostname_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
try:
hostname = socket.gethostname()
_, _, addresses = socket.gethostbyname_ex(hostname)
except Exception:
return
for address in addresses:
_add_candidate(candidates, seen, address, interface_name="", source_order=2)
def get_lan_access_host(default: str = "127.0.0.1") -> str:
"""返回同网段设备可访问的本机 IPv4 地址。"""
candidates: list[tuple[int, int, int, str]] = []
seen: set[str] = set()
_add_psutil_candidates(candidates, seen)
_add_route_candidates(candidates, seen)
_add_hostname_candidates(candidates, seen)
if not candidates:
return default
candidates.sort()
return candidates[0][3]
+26
View File
@@ -14,6 +14,7 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException
from starlette.requests import Request
from ..logging_config import get_log_file_path, get_logger
from ..network_access import get_lan_access_host
from ..path_fix import PathFixRoute
from ..runtime_settings import (
LAN_BACKEND_HOST,
@@ -56,6 +57,28 @@ def _get_backend_access_host() -> str:
return host
def _get_mcp_access_host(bind_host: str | None = None) -> str:
host = str(bind_host or _get_backend_bind_host() or "").strip()
if host in {LAN_BACKEND_HOST, "::"}:
return get_lan_access_host(default=LOOPBACK_BACKEND_HOST)
return host or LOOPBACK_BACKEND_HOST
def _get_mcp_access_urls(port: int, bind_host: str | None = None) -> dict:
access_host = _get_mcp_access_host(bind_host)
origin = f"http://{_format_host_for_url(access_host)}:{int(port)}"
return {
"access_host": access_host,
"accessHost": access_host,
"mcp_endpoint": f"{origin}/mcp",
"mcpEndpoint": f"{origin}/mcp",
"skill_bundle_url": f"{origin}/mcp/skill/bundle",
"skillBundleUrl": f"{origin}/mcp/skill/bundle",
"skill_markdown_url": f"{origin}/mcp/skill",
"skillMarkdownUrl": f"{origin}/mcp/skill",
}
def _is_loopback_client(request: Request) -> bool:
client = request.client
host = str(getattr(client, "host", "") or "").strip()
@@ -272,6 +295,7 @@ async def get_mcp_access() -> dict:
"default_host": LOOPBACK_BACKEND_HOST,
"lan_host": LAN_BACKEND_HOST,
"restart_required": False,
**_get_mcp_access_urls(port, host),
}
@@ -407,6 +431,7 @@ async def set_mcp_access(payload: dict, request: Request, background_tasks: Back
"port": int(current_port),
"ui_url": f"http://{_format_host_for_url(_get_backend_access_host())}:{int(current_port)}/",
"env_file": str(env_file) if env_file else None,
**_get_mcp_access_urls(int(current_port), next_host),
}
_PORT_CHANGE_IN_PROGRESS = True
@@ -428,6 +453,7 @@ async def set_mcp_access(payload: dict, request: Request, background_tasks: Back
"ui_url": f"http://{_format_host_for_url(LOOPBACK_BACKEND_HOST)}:{int(current_port)}/",
"env_file": str(env_file) if env_file else None,
"restart_scheduled": True,
**_get_mcp_access_urls(int(current_port), next_host),
}
finally:
_PORT_CHANGE_IN_PROGRESS = False
@@ -1,5 +1,6 @@
import sys
import threading
import sqlite3
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
@@ -97,7 +98,101 @@ class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
def test_sessions_ignore_invalid_utf8_avatar_url(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
session_conn = sqlite3.connect(str(account_dir / "session.db"))
try:
session_conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
unread_count INTEGER,
is_hidden INTEGER,
summary TEXT,
draft TEXT,
last_timestamp INTEGER,
sort_timestamp INTEGER,
last_msg_locald_id INTEGER,
last_msg_type INTEGER,
last_msg_sub_type INTEGER,
last_msg_sender TEXT,
last_sender_display_name TEXT
)
"""
)
session_conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
("wxid_bad_avatar", 0, 0, "hello", "", 100, 100, 1, 1, 0, "", ""),
)
session_conn.commit()
finally:
session_conn.close()
contact_conn = sqlite3.connect(str(account_dir / "contact.db"))
try:
contact_conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
contact_conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
contact_conn.execute(
"""
INSERT INTO contact
(username, remark, nick_name, alias, flag, big_head_url, small_head_url)
VALUES (?, ?, ?, ?, ?, CAST(x'fffe687474703a2f2f6578616d706c652e746573742f612e706e67' AS TEXT), ?)
""",
("wxid_bad_avatar", "", "坏头像好友", "", 0, ""),
)
contact_conn.commit()
finally:
contact_conn.close()
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "get_status", return_value={}),
patch.object(chat_router, "load_session_last_messages", return_value={}),
patch.object(chat_router, "_load_latest_message_previews", return_value={}),
):
resp = chat_router.list_chat_sessions(
_DummyRequest(),
account="acc",
limit=50,
include_hidden=True,
include_official=True,
preview="session",
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("name"), "坏头像好友")
self.assertEqual(sessions[0].get("lastMessage"), "hello")
self.assertIn("/api/chat/avatar", sessions[0].get("avatar") or "")
if __name__ == "__main__":
unittest.main()
+143
View File
@@ -0,0 +1,143 @@
import importlib
import logging
import os
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
def _close_logging_handlers() -> None:
for logger_name in ("", "uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"):
lg = logging.getLogger(logger_name)
for handler in lg.handlers[:]:
try:
handler.close()
except Exception:
pass
try:
lg.removeHandler(handler)
except Exception:
pass
class TestMcpAccessHost(unittest.TestCase):
def setUp(self) -> None:
self._prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR")
self._prev_host = os.environ.get("WECHAT_TOOL_HOST")
self._prev_port = os.environ.get("WECHAT_TOOL_PORT")
self._td = TemporaryDirectory()
os.environ["WECHAT_TOOL_DATA_DIR"] = self._td.name
os.environ.pop("WECHAT_TOOL_HOST", None)
os.environ.pop("WECHAT_TOOL_PORT", None)
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.runtime_settings as runtime_settings
import wechat_decrypt_tool.routers.admin as admin_router
importlib.reload(app_paths)
importlib.reload(runtime_settings)
importlib.reload(admin_router)
self.runtime_settings = runtime_settings
self.admin_router = admin_router
def tearDown(self) -> None:
_close_logging_handlers()
if self._prev_data_dir is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = self._prev_data_dir
if self._prev_host is None:
os.environ.pop("WECHAT_TOOL_HOST", None)
else:
os.environ["WECHAT_TOOL_HOST"] = self._prev_host
if self._prev_port is None:
os.environ.pop("WECHAT_TOOL_PORT", None)
else:
os.environ["WECHAT_TOOL_PORT"] = self._prev_port
self._td.cleanup()
def _client(self) -> TestClient:
app = FastAPI()
app.include_router(self.admin_router.router)
return TestClient(app, client=("127.0.0.1", 52010))
def test_mcp_access_reports_lan_endpoint_when_lan_enabled(self) -> None:
self.runtime_settings.write_backend_host_setting(self.runtime_settings.LAN_BACKEND_HOST)
self.runtime_settings.write_backend_port_setting(12092)
client = self._client()
with patch.object(self.admin_router, "get_lan_access_host", return_value="192.168.1.23"):
resp = client.get("/api/admin/mcp-access")
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertTrue(payload["enabled"])
self.assertEqual(payload["host"], "0.0.0.0")
self.assertEqual(payload["access_host"], "192.168.1.23")
self.assertEqual(payload["accessHost"], "192.168.1.23")
self.assertEqual(payload["mcp_endpoint"], "http://192.168.1.23:12092/mcp")
self.assertEqual(payload["mcpEndpoint"], "http://192.168.1.23:12092/mcp")
self.assertEqual(payload["skill_bundle_url"], "http://192.168.1.23:12092/mcp/skill/bundle")
self.assertEqual(payload["skill_markdown_url"], "http://192.168.1.23:12092/mcp/skill")
def test_mcp_access_keeps_loopback_endpoint_when_lan_disabled(self) -> None:
self.runtime_settings.write_backend_host_setting(self.runtime_settings.LOOPBACK_BACKEND_HOST)
self.runtime_settings.write_backend_port_setting(12092)
client = self._client()
with patch.object(self.admin_router, "get_lan_access_host", return_value="192.168.1.23"):
resp = client.get("/api/admin/mcp-access")
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertFalse(payload["enabled"])
self.assertEqual(payload["host"], "127.0.0.1")
self.assertEqual(payload["access_host"], "127.0.0.1")
self.assertEqual(payload["mcp_endpoint"], "http://127.0.0.1:12092/mcp")
class TestNetworkAccessHost(unittest.TestCase):
def test_get_lan_access_host_prefers_physical_private_ipv4(self) -> None:
import wechat_decrypt_tool.network_access as network_access
with patch.object(network_access, "_add_psutil_candidates") as mocked_psutil, patch.object(
network_access, "_add_route_candidates"
) as mocked_route, patch.object(network_access, "_add_hostname_candidates") as mocked_hostname:
def add_psutil(candidates, seen):
network_access._add_candidate(candidates, seen, "172.18.0.2", interface_name="Docker", source_order=0)
network_access._add_candidate(candidates, seen, "192.168.1.23", interface_name="Wi-Fi", source_order=0)
mocked_psutil.side_effect = add_psutil
mocked_route.side_effect = lambda candidates, seen: network_access._add_candidate(
candidates, seen, "10.0.0.9", source_order=1
)
mocked_hostname.side_effect = lambda candidates, seen: None
self.assertEqual(network_access.get_lan_access_host(), "192.168.1.23")
def test_get_lan_access_host_falls_back_when_no_candidate(self) -> None:
import wechat_decrypt_tool.network_access as network_access
with patch.object(network_access, "_add_psutil_candidates", return_value=None), patch.object(
network_access, "_add_route_candidates", return_value=None
), patch.object(network_access, "_add_hostname_candidates", return_value=None):
self.assertEqual(network_access.get_lan_access_host(default="127.0.0.1"), "127.0.0.1")
if __name__ == "__main__":
unittest.main()
+200 -89
View File
@@ -1,8 +1,9 @@
import os
import sys
import tempfile
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
from unittest.mock import patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
@@ -14,6 +15,75 @@ sys.path.insert(0, str(ROOT / "src"))
class TestMcpRouter(unittest.TestCase):
TEST_TOKEN = "test-mcp-token-1234567890"
REMOVED_MCP_TOOLS = {
"wechat.setup.get_saved_keys",
"wechat.setup.get_database_key",
"wechat.setup.get_image_key",
"wechat.setup.decrypt_databases",
"wechat.setup.get_decrypt_stream_url",
"wechat.setup.preview_import_decrypted",
"wechat.setup.get_import_decrypted_stream_url",
"wechat.setup.cancel_import_decrypted",
"wechat.setup.save_media_keys",
"wechat.setup.decrypt_all_media",
"wechat.setup.get_decrypt_all_media_stream_url",
"wechat.setup.get_download_all_emojis_stream_url",
"wechat.contacts.export_contacts",
"wechat.chat.get_realtime_status",
"wechat.chat.sync_realtime_session",
"wechat.chat.sync_realtime_all_sessions",
"wechat.chat.get_realtime_events_url",
"wechat.moments.sync_latest",
"wechat.editing.list_edited_sessions",
"wechat.editing.list_edited_messages",
"wechat.editing.get_message_edit_status",
"wechat.editing.edit_message",
"wechat.editing.repair_message_sender",
"wechat.editing.flip_message_direction",
"wechat.editing.reset_message_edit",
"wechat.editing.reset_session_edits",
"wechat.export.preview_chat_targets",
"wechat.export.create_chat_export",
"wechat.export.list_chat_exports",
"wechat.export.get_chat_export",
"wechat.export.cancel_chat_export",
"wechat.export.get_chat_export_download",
"wechat.export.get_chat_export_events_url",
"wechat.export.create_moments_export",
"wechat.export.list_moments_exports",
"wechat.export.get_moments_export",
"wechat.export.cancel_moments_export",
"wechat.export.get_moments_export_download",
"wechat.export.get_moments_export_events_url",
"wechat.export.create_account_archive",
"wechat.export.get_account_archive",
"wechat.export.cancel_account_archive",
"wechat.export.get_account_archive_download",
"wechat.mobile.export_job",
"wechat.admin.detect_wechat_installation",
"wechat.admin.get_current_wechat_account",
"wechat.admin.get_wechat_runtime_status",
"wechat.admin.delete_account_data",
"wechat.system.api_root",
"wechat.system.health_check",
"wechat.system.get_backend_log_file",
"wechat.system.open_backend_log_file",
"wechat.system.log_frontend_server_error",
"wechat.system.get_backend_port",
"wechat.system.set_backend_port_setting",
"wechat.system.set_backend_port_and_restart",
"wechat.system.get_mcp_lan_access",
"wechat.system.set_mcp_lan_access",
"wechat.system.get_img_helper_status",
"wechat.system.toggle_img_helper",
"wechat.system.pick_directory",
"wechat.chat.get_search_index_status",
"wechat.chat.build_search_index",
"wechat.chat.get_session_last_message_cache_status",
"wechat.chat.build_session_last_message_cache",
"wechat.media.download_chat_emoji",
"wechat.media.open_chat_media_folder",
}
def setUp(self):
self._old_mcp_token = os.environ.get("WECHAT_TOOL_MCP_TOKEN")
@@ -59,13 +129,11 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("wechat.chat.list_search_senders", names)
self.assertIn("wechat.chat.resolve_chat_history", names)
self.assertIn("wechat.chat.resolve_app_message", names)
self.assertIn("wechat.contacts.export_contacts", names)
self.assertIn("wechat.export.create_chat_export", names)
self.assertIn("wechat.export.get_account_archive_download", names)
self.assertIn("wechat.moments.get_remote_video_url", names)
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
def test_mcp_requires_token(self):
client = self._client(auth=False)
@@ -122,6 +190,11 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("bundleText", payload)
self.assertIn("WeChat MCP Copilot", payload["bundleText"])
self.assertTrue(any(ref["path"] == "references/mobile.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/system.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/setup-system.md" for ref in payload["references"]))
self.assertFalse(any(ref["path"] == "references/export.md" for ref in payload["references"]))
for tool_name in self.REMOVED_MCP_TOOLS:
self.assertNotIn(tool_name, payload["bundleText"])
def test_skill_text_can_be_loaded_over_http(self):
client = self._client()
@@ -292,6 +365,26 @@ class TestMcpRouter(unittest.TestCase):
payload = resp.json()
self.assertEqual(payload["error"]["code"], -32601)
def test_removed_mcp_tools_are_not_listed_or_callable(self):
client = self._client()
tools_resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(tools_resp.status_code, 200)
names = {tool["name"] for tool in tools_resp.json()["result"]["tools"]}
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
for tool_name in sorted(self.REMOVED_MCP_TOOLS):
with self.subTest(tool_name=tool_name):
direct_resp = client.post("/mcp", json=self._rpc(tool_name, {}))
call_resp = client.post(
"/mcp",
json=self._rpc("tools/call", {"name": tool_name, "arguments": {}}),
)
self.assertEqual(direct_resp.status_code, 200)
self.assertEqual(call_resp.status_code, 200)
self.assertEqual(direct_resp.json()["error"]["code"], -32601)
self.assertEqual(call_resp.json()["error"]["code"], -32601)
def test_missing_tool_name_returns_invalid_params(self):
client = self._client()
@@ -347,6 +440,8 @@ class TestMcpRouter(unittest.TestCase):
self.assertIn("/api/sns/media?", moments["url"])
self.assertEqual(moments["params"]["post_id"], "post-a")
self.assertEqual(moments["params"]["media_id"], "media-a")
self.assertNotIn("use_cache", moments["params"])
self.assertNotIn("use_cache", moments["url"])
def test_completed_mcp_packages_and_mobile_facade_are_listed(self):
client = self._client()
@@ -357,95 +452,36 @@ class TestMcpRouter(unittest.TestCase):
tools = resp.json()["result"]["tools"]
names = {tool["name"] for tool in tools}
expected = {
"wechat.setup.get_saved_keys",
"wechat.setup.decrypt_databases",
"wechat.setup.get_decrypt_stream_url",
"wechat.setup.preview_import_decrypted",
"wechat.setup.get_decrypt_all_media_stream_url",
"wechat.system.health_check",
"wechat.system.get_backend_port",
"wechat.system.get_mcp_lan_access",
"wechat.system.set_mcp_lan_access",
"wechat.system.get_img_helper_status",
"wechat.system.open_backend_log_file",
"wechat.system.pick_directory",
"wechat.system.set_backend_port_and_restart",
"wechat.media.get_decrypted_resource_url",
"wechat.media.get_proxy_image_url",
"wechat.media.get_favicon_url",
"wechat.media.open_chat_media_folder",
"wechat.export.get_chat_export_events_url",
"wechat.export.get_moments_export_events_url",
"wechat.chat.get_realtime_events_url",
"wechat.admin.delete_account_data",
"wechat.mobile.get_overview",
"wechat.mobile.resolve_target",
"wechat.mobile.search_chat",
"wechat.mobile.get_chat_context",
"wechat.mobile.search_moments",
"wechat.mobile.get_media_links",
"wechat.mobile.export_job",
}
self.assertTrue(expected.issubset(names))
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
packages = {tool["annotations"]["package"] for tool in tools}
self.assertTrue({"wechat.setup", "wechat.system", "wechat.mobile"}.issubset(packages))
self.assertTrue({"wechat.core", "wechat.mobile", "wechat.media"}.issubset(packages))
self.assertFalse({"wechat.setup", "wechat.export", "wechat.editing", "wechat.system", "wechat.admin"} & packages)
def test_new_url_helpers_return_urls_and_params(self):
client = self._client()
checks = [
(
"wechat.setup.get_decrypt_stream_url",
{"key": "a" * 64, "db_storage_path": r"D:\WeChat\db_storage"},
"streamUrl",
"/api/decrypt_stream?",
),
(
"wechat.setup.get_import_decrypted_stream_url",
{"import_path": r"D:\backup\wxid_a", "job_id": "job-1"},
"streamUrl",
"/api/import_decrypted?",
),
(
"wechat.setup.get_decrypt_all_media_stream_url",
{"account": "wxid_a", "xor_key": "0xA5", "concurrency": 3},
"streamUrl",
"/api/media/decrypt_all_stream?",
),
(
"wechat.setup.get_download_all_emojis_stream_url",
{"account": "wxid_a", "force": True, "concurrency": 4},
"streamUrl",
"/api/media/emoji/download_all_stream?",
),
(
"wechat.media.get_decrypted_resource_url",
{"account": "wxid_a", "md5": "a" * 32},
"url",
"/api/media/resource/",
),
(
"wechat.chat.get_realtime_events_url",
{"account": "wxid_a", "interval_ms": 300},
"streamUrl",
"/api/chat/realtime/stream?",
),
(
"wechat.export.get_chat_export_events_url",
{"export_id": "exp-1"},
"streamUrl",
"/api/chat/exports/exp-1/events",
),
(
"wechat.export.get_moments_export_events_url",
{"export_id": "sns-1"},
"streamUrl",
"/api/sns/exports/sns-1/events",
),
]
for tool_name, args, url_key, path_part in checks:
@@ -456,26 +492,82 @@ class TestMcpRouter(unittest.TestCase):
self.assertEqual(structured["status"], "success")
self.assertIn(path_part, structured[url_key])
def test_setup_and_system_wrappers_call_underlying_router(self):
def test_exposed_mcp_tools_are_read_only(self):
client = self._client()
keys_router = Mock()
keys_router.get_saved_keys = AsyncMock(return_value={"status": "success", "keys": {"db_key": "k"}})
system_router = Mock()
system_router.get_img_helper_status = AsyncMock(return_value={"enabled": False})
with patch("wechat_decrypt_tool.mcp.tools._keys_router", return_value=keys_router), patch(
"wechat_decrypt_tool.mcp.tools._system_router", return_value=system_router
):
keys_resp = client.post(
"/mcp",
json=self._rpc("wechat.setup.get_saved_keys", {"account": "wxid_a", "wxid_dir": r"D:\WeChat\wxid_a"}),
)
helper_resp = client.post("/mcp", json=self._rpc("wechat.system.get_img_helper_status"))
resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(keys_resp.status_code, 200)
self.assertEqual(keys_resp.json()["result"]["structuredContent"]["keys"]["db_key"], "k")
keys_router.get_saved_keys.assert_awaited_once_with(account="wxid_a", db_storage_path=None, wxid_dir=r"D:\WeChat\wxid_a")
self.assertEqual(helper_resp.json()["result"]["structuredContent"], {"enabled": False})
self.assertEqual(resp.status_code, 200)
tools = resp.json()["result"]["tools"]
self.assertTrue(tools)
for tool in tools:
with self.subTest(tool_name=tool["name"]):
annotations = tool.get("annotations") or {}
self.assertTrue(annotations.get("readOnlyHint"))
self.assertFalse(annotations.get("destructiveHint"))
def test_analytics_schema_does_not_expose_refresh(self):
client = self._client()
resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(resp.status_code, 200)
tools = {tool["name"]: tool for tool in resp.json()["result"]["tools"]}
for tool_name in [
"wechat.analytics.get_wrapped_meta",
"wechat.analytics.get_wrapped_card",
"wechat.analytics.get_wrapped_annual",
]:
with self.subTest(tool_name=tool_name):
properties = tools[tool_name]["inputSchema"].get("properties") or {}
self.assertNotIn("refresh", properties)
def test_analytics_tools_are_cache_only(self):
client = self._client()
class FakeWrappedService:
_CACHE_VERSION = 26
_IMPLEMENTED_UPTO_ID = 7
_WRAPPED_CARD_MANIFEST = ({"id": 0, "title": "Overview"},)
@staticmethod
def _default_year():
return 2025
def build_wrapped_annual_meta(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped meta.")
def build_wrapped_annual_card(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped card.")
def build_wrapped_annual_response(self, **_kwargs):
raise AssertionError("MCP analytics must not build wrapped annual data.")
with tempfile.TemporaryDirectory() as tmp:
account_dir = Path(tmp) / "wxid_a"
account_dir.mkdir()
with patch("wechat_decrypt_tool.mcp.tools._resolve_account_dir", return_value=account_dir), patch(
"wechat_decrypt_tool.mcp.tools._wrapped_service", return_value=FakeWrappedService()
):
card_resp = client.post(
"/mcp",
json=self._rpc("wechat.analytics.get_wrapped_card", {"account": "wxid_a", "year": 2025, "card_id": 0}),
)
annual_resp = client.post(
"/mcp",
json=self._rpc("wechat.analytics.get_wrapped_annual", {"account": "wxid_a", "year": 2025}),
)
self.assertFalse((account_dir / "_wrapped" / "cache").exists())
for resp in [card_resp, annual_resp]:
self.assertEqual(resp.status_code, 200)
result = resp.json()["result"]
self.assertTrue(result["isError"])
structured = result["structuredContent"]
self.assertEqual(structured["status"], "error")
self.assertTrue(structured["cacheOnly"])
self.assertEqual(structured["message"], "Wrapped cache not found. Open the app to generate it first.")
def test_mobile_overview_uses_compact_facade(self):
client = self._client()
@@ -486,12 +578,6 @@ class TestMcpRouter(unittest.TestCase):
), patch(
"wechat_decrypt_tool.mcp.tools._list_sessions",
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
), patch(
"wechat_decrypt_tool.mcp.tools._chat_realtime_status", return_value={"status": "success", "available": True}
), patch(
"wechat_decrypt_tool.mcp.tools._search_index_status", return_value={"status": "ready"}
), patch(
"wechat_decrypt_tool.mcp.tools._session_last_message_status", return_value={"status": "ready"}
):
resp = client.post(
"/mcp",
@@ -504,6 +590,31 @@ class TestMcpRouter(unittest.TestCase):
self.assertTrue(structured["ready"])
self.assertEqual(structured["defaultAccount"], "wxid_a")
self.assertIn("wechat.mobile.search_chat", structured["suggestedTools"])
self.assertNotIn("wechat.mobile.export_job", structured["suggestedTools"])
self.assertNotIn("realtime", structured["health"])
self.assertNotIn("indexes", structured["health"])
def test_mobile_overview_does_not_expose_realtime_status(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_a"]), patch(
"wechat_decrypt_tool.mcp.tools._get_account_info",
return_value={"status": "success", "account": "wxid_a", "databaseCount": 3},
), patch(
"wechat_decrypt_tool.mcp.tools._list_sessions",
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
):
resp = client.post(
"/mcp",
json=self._rpc("wechat.mobile.get_overview", {"account": "wxid_a", "session_limit": 5}),
)
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertNotIn("error", payload)
structured = payload["result"]["structuredContent"]
self.assertNotIn("realtime", structured["health"])
self.assertNotIn("indexes", structured["health"])
def test_mobile_resolve_target_normalizes_candidates(self):
client = self._client()
Generated
+1 -1
View File
@@ -872,7 +872,7 @@ wheels = [
[[package]]
name = "wechat-decrypt-tool"
version = "1.9.0"
version = "1.9.2"
source = { editable = "." }
dependencies = [
{ name = "aiofiles" },