mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
Compare commits
5 Commits
@@ -161,7 +161,7 @@ npm run dev
|
||||
|
||||
## MCP 服务
|
||||
|
||||
后端提供 MCP JSON-RPC over HTTP 服务,默认只监听 `127.0.0.1`。手机接入局域网时,在应用内打开 **设置 -> MCP 接入 -> 允许手机局域网接入 MCP**,后端会切换为监听 `0.0.0.0` 并重启。
|
||||
后端提供 MCP JSON-RPC over HTTP 服务,默认只监听 `127.0.0.1`。手机接入局域网时,在应用内打开 **设置 -> MCP 接入 -> 允许手机局域网接入 MCP**,后端会切换为监听 `0.0.0.0` 并重启;设置页展示和复制的接入地址会使用电脑实际局域网 IP,例如 `http://192.168.x.x:10392/mcp`,而不是不可被其他设备访问的 `127.0.0.1`。
|
||||
|
||||
MCP 入口需要 token 鉴权。设置页提供 **MCP Token**、**AI 接入提示词** 和 **Skill Markdown** 三个独立复制区;token 可一键复制或重置,重置后旧 token 立即失效。手机端或外部 AI 客户端访问 `/mcp`、`/mcp/skill/bundle`、`/mcp/skill` 时,都应带上 `Authorization: Bearer <MCP_TOKEN>`。兼容客户端也可以使用 `X-MCP-Token` 请求头或 `?token=` 查询参数,但推荐使用 Bearer token。
|
||||
|
||||
@@ -169,23 +169,22 @@ MCP 入口需要 token 鉴权。设置页提供 **MCP Token**、**AI 接入提
|
||||
|
||||
工具调用成功时,客户端优先读取 `result.structuredContent`,`content[0].text` 只是给通用 MCP 客户端展示的 JSON 文本副本。业务未就绪时仍可能返回 JSON-RPC success,但 `result.isError=true`;协议错误或参数错误则返回 JSON-RPC `error`。
|
||||
|
||||
MCP 仅暴露读取数据与获取媒体资源 URL/参数的能力;系统设置、索引与缓存构建、数据准备、导出、实时同步、本地修订、数据删除等操作类能力不通过 MCP 暴露,请在桌面/网页应用内使用。
|
||||
|
||||
工具按包分层:
|
||||
|
||||
- `wechat.core`: 状态、工具目录、账号列表、账号信息
|
||||
- `wechat.setup`: 密钥读取、数据库解密、已解密目录导入、媒体密钥保存、批量媒体处理入口
|
||||
- `wechat.system`: 健康检查、后端端口、日志路径、大图辅助插件状态等系统能力
|
||||
- `wechat.mobile`: 面向手机和外部代理的聚合入口,默认返回小结果和下一步建议
|
||||
- `wechat.contacts`: 联系人列表、模糊解析、联系人导出
|
||||
- `wechat.contacts`: 联系人列表、模糊解析
|
||||
- `wechat.chat`: 会话、消息、搜索、发送者筛选、上下文、锚点、合并转发/AppMsg 解析、统计
|
||||
- `wechat.moments`: 朋友圈时间线、用户、同步、图片/视频/文章封面 URL
|
||||
- `wechat.media`: 聊天/朋友圈图片、视频、表情、头像、语音文件 URL、远程图片代理与资源辅助
|
||||
- `wechat.moments`: 朋友圈时间线、用户、图片/视频/文章封面 URL
|
||||
- `wechat.media`: 聊天/朋友圈图片、视频、表情、头像、语音文件 URL、远程图片代理与资源辅助;只返回 URL 或资源参数,不提供下载缓存或打开本机目录操作
|
||||
- `wechat.biz`: 公众号/服务号与微信支付记录
|
||||
- `wechat.analytics`: 年度总结与卡片懒加载
|
||||
- `wechat.export`: 聊天、朋友圈、账号归档导出任务、下载 URL 与进度 SSE URL
|
||||
- `wechat.admin`: 微信检测、索引、实时同步等管理能力
|
||||
- `wechat.editing`: 消息编辑、修复、恢复与审计
|
||||
- `wechat.analytics`: 年度总结与聚合分析读取;年度总结只读取应用内已生成的缓存,未生成时请先在应用内打开年度总结
|
||||
|
||||
媒体、视频、SSE 进度和 ZIP 导出不会直接塞进 MCP JSON 响应;相关工具返回可访问 URL、`streamUrl`、任务状态或资源参数。
|
||||
会话列表、联系人和头像相关接口均采用 best-effort 读取策略。即使 `contact.db` 中某些头像字段损坏或无法按 UTF-8 解码,也会继续返回昵称、会话摘要和其他可用内容,头像则自动降级为空或占位,不会阻塞整页数据加载。
|
||||
|
||||
媒体和视频不会直接塞进 MCP JSON 响应;相关工具返回可访问 URL 或资源参数。
|
||||
|
||||
配套 skill 可通过 HTTP 加载,访问时需要带 MCP token:
|
||||
|
||||
|
||||
Generated
+2
-2
@@ -1,12 +1,12 @@
|
||||
{
|
||||
"name": "wechat-data-analysis-desktop",
|
||||
"version": "1.9.0",
|
||||
"version": "1.9.2",
|
||||
"lockfileVersion": 3,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "wechat-data-analysis-desktop",
|
||||
"version": "1.9.0",
|
||||
"version": "1.9.2",
|
||||
"dependencies": {
|
||||
"electron-updater": "^6.7.3"
|
||||
},
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{
|
||||
"name": "wechat-data-analysis-desktop",
|
||||
"private": true,
|
||||
"version": "1.9.0",
|
||||
"version": "1.9.2",
|
||||
"main": "src/main.cjs",
|
||||
"scripts": {
|
||||
"dev": "node scripts/dev.cjs",
|
||||
|
||||
+93
-5
@@ -21,6 +21,7 @@ const crypto = require("crypto");
|
||||
const fs = require("fs");
|
||||
const http = require("http");
|
||||
const net = require("net");
|
||||
const os = require("os");
|
||||
const path = require("path");
|
||||
const { Worker } = require("worker_threads");
|
||||
const {
|
||||
@@ -101,6 +102,84 @@ function getBackendAccessHost() {
|
||||
return host || "127.0.0.1";
|
||||
}
|
||||
|
||||
function getInterfacePenalty(name) {
|
||||
const lower = String(name || "").toLowerCase();
|
||||
if (/(docker|hyper-v|loopback|npcap|tailscale|virtual|virtualbox|vmware|vethernet|wsl|zerotier)/i.test(lower)) {
|
||||
return 30;
|
||||
}
|
||||
if (/(ethernet|wi-fi|wifi|wireless|wlan|以太|无线)/i.test(lower)) {
|
||||
return 0;
|
||||
}
|
||||
return 10;
|
||||
}
|
||||
|
||||
function isReachableClientIpv4(address) {
|
||||
const text = String(address || "").trim();
|
||||
const parts = text.split(".");
|
||||
if (parts.length !== 4) return false;
|
||||
const nums = parts.map((part) => Number(part));
|
||||
if (!nums.every((n) => Number.isInteger(n) && n >= 0 && n <= 255)) return false;
|
||||
if (nums[0] === 0 || nums[0] === 127 || nums[0] >= 224) return false;
|
||||
if (nums[0] === 169 && nums[1] === 254) return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
function isPrivateIpv4(address) {
|
||||
const nums = String(address || "").trim().split(".").map((part) => Number(part));
|
||||
if (nums.length !== 4 || !nums.every((n) => Number.isInteger(n))) return false;
|
||||
return (
|
||||
nums[0] === 10 ||
|
||||
(nums[0] === 172 && nums[1] >= 16 && nums[1] <= 31) ||
|
||||
(nums[0] === 192 && nums[1] === 168)
|
||||
);
|
||||
}
|
||||
|
||||
function getLanAccessHost(defaultHost = DEFAULT_BACKEND_HOST) {
|
||||
const candidates = [];
|
||||
const seen = new Set();
|
||||
const addCandidate = (address, interfaceName = "", sourceOrder = 0) => {
|
||||
const value = String(address || "").trim();
|
||||
if (!isReachableClientIpv4(value) || seen.has(value)) return;
|
||||
seen.add(value);
|
||||
candidates.push([
|
||||
isPrivateIpv4(value) ? 0 : 1,
|
||||
getInterfacePenalty(interfaceName),
|
||||
sourceOrder,
|
||||
value,
|
||||
]);
|
||||
};
|
||||
|
||||
try {
|
||||
const interfaces = os.networkInterfaces();
|
||||
for (const [name, addresses] of Object.entries(interfaces || {})) {
|
||||
for (const item of addresses || []) {
|
||||
if (!item || (item.family !== "IPv4" && item.family !== 4) || item.internal) continue;
|
||||
addCandidate(item.address, name, 0);
|
||||
}
|
||||
}
|
||||
} catch {}
|
||||
|
||||
candidates.sort((a, b) => a[0] - b[0] || a[1] - b[1] || a[2] - b[2]);
|
||||
return candidates[0]?.[3] || defaultHost;
|
||||
}
|
||||
|
||||
function getMcpAccessHost(bindHost = getBackendBindHost()) {
|
||||
const host = String(bindHost || "").trim();
|
||||
if (host === LAN_BACKEND_HOST || host === "::") return getLanAccessHost(DEFAULT_BACKEND_HOST);
|
||||
return host || DEFAULT_BACKEND_HOST;
|
||||
}
|
||||
|
||||
function getMcpAccessInfo(bindHost = getBackendBindHost(), port = getBackendPort()) {
|
||||
const accessHost = getMcpAccessHost(bindHost);
|
||||
const origin = `http://${formatHostForUrl(accessHost)}:${port}`;
|
||||
return {
|
||||
accessHost,
|
||||
mcpEndpoint: `${origin}/mcp`,
|
||||
skillBundleUrl: `${origin}/mcp/skill/bundle`,
|
||||
skillMarkdownUrl: `${origin}/mcp/skill`,
|
||||
};
|
||||
}
|
||||
|
||||
function getBackendPort() {
|
||||
const envPort = parsePort(process.env.WECHAT_TOOL_PORT);
|
||||
if (envPort != null) return envPort;
|
||||
@@ -2340,19 +2419,24 @@ function registerWindowIpc() {
|
||||
|
||||
ipcMain.handle("backend:getMcpLanAccess", () => {
|
||||
try {
|
||||
const host = getBackendBindHost();
|
||||
const port = getBackendPort();
|
||||
return {
|
||||
enabled: getMcpLanAccessEnabled(),
|
||||
host: getBackendBindHost(),
|
||||
port: getBackendPort(),
|
||||
host,
|
||||
port,
|
||||
uiUrl: getDesktopUiUrl(),
|
||||
...getMcpAccessInfo(host, port),
|
||||
};
|
||||
} catch (err) {
|
||||
logMain(`[main] backend:getMcpLanAccess failed: ${err?.message || err}`);
|
||||
const port = DEFAULT_BACKEND_PORT;
|
||||
return {
|
||||
enabled: false,
|
||||
host: DEFAULT_BACKEND_HOST,
|
||||
port: DEFAULT_BACKEND_PORT,
|
||||
port,
|
||||
uiUrl: getDesktopUiUrl(),
|
||||
...getMcpAccessInfo(DEFAULT_BACKEND_HOST, port),
|
||||
};
|
||||
}
|
||||
});
|
||||
@@ -2363,13 +2447,16 @@ function registerWindowIpc() {
|
||||
const nextEnabled = !!enabled;
|
||||
const prevEnabled = getMcpLanAccessEnabled();
|
||||
if (nextEnabled === prevEnabled) {
|
||||
const host = getBackendBindHost();
|
||||
const port = getBackendPort();
|
||||
return {
|
||||
success: true,
|
||||
changed: false,
|
||||
enabled: prevEnabled,
|
||||
host: getBackendBindHost(),
|
||||
port: getBackendPort(),
|
||||
host,
|
||||
port,
|
||||
uiUrl: getDesktopUiUrl(),
|
||||
...getMcpAccessInfo(host, port),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -2396,6 +2483,7 @@ function registerWindowIpc() {
|
||||
host: getBackendBindHost(),
|
||||
port: getBackendPort(),
|
||||
uiUrl,
|
||||
...getMcpAccessInfo(),
|
||||
};
|
||||
} finally {
|
||||
backendPortChangeInProgress = false;
|
||||
|
||||
@@ -269,6 +269,7 @@
|
||||
<div class="min-w-0 flex-1">
|
||||
<div class="text-[13px] font-medium text-[#222]">允许手机局域网接入 MCP</div>
|
||||
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">开启后后端监听 0.0.0.0,手机可通过接入提示词中的地址接入。</div>
|
||||
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090] break-all">当前地址:{{ mcpEndpoint }}</div>
|
||||
<div v-if="mcpLanAccessMessage" class="mt-1 text-[11px] leading-relaxed text-[#1b6b43]">{{ mcpLanAccessMessage }}</div>
|
||||
<div v-if="mcpLanAccessError" class="mt-1 text-[11px] leading-relaxed text-red-600">{{ mcpLanAccessError }}</div>
|
||||
</div>
|
||||
@@ -608,6 +609,8 @@ const mcpSkillBundleText = ref('')
|
||||
const mcpSkillBundleLoading = ref(false)
|
||||
const mcpSkillBundleError = ref('')
|
||||
const mcpCopiedKey = ref('')
|
||||
const mcpAccessHost = ref('')
|
||||
const mcpAccessEndpoint = ref('')
|
||||
let mcpCopiedTimer = null
|
||||
|
||||
const mcpPortText = computed(() => {
|
||||
@@ -617,6 +620,10 @@ const mcpPortText = computed(() => {
|
||||
})
|
||||
|
||||
const mcpEndpoint = computed(() => {
|
||||
const reported = String(mcpAccessEndpoint.value || '').trim()
|
||||
if (/^https?:\/\//i.test(reported)) return reported
|
||||
const reportedHost = String(mcpAccessHost.value || '').trim()
|
||||
if (reportedHost) return `http://${reportedHost}:${mcpPortText.value}/mcp`
|
||||
if (!process.client || typeof window === 'undefined') return `http://127.0.0.1:${mcpPortText.value}/mcp`
|
||||
const apiBase = useApiBase()
|
||||
if (/^https?:\/\//i.test(apiBase)) {
|
||||
@@ -630,6 +637,14 @@ const mcpEndpoint = computed(() => {
|
||||
return `${protocol}//${host}:${mcpPortText.value}/mcp`
|
||||
})
|
||||
|
||||
const applyMcpAccessInfo = (resp) => {
|
||||
if (!resp || typeof resp !== 'object') return
|
||||
const accessHost = String(resp.accessHost || resp.access_host || '').trim()
|
||||
const endpoint = String(resp.mcpEndpoint || resp.mcp_endpoint || '').trim()
|
||||
if (accessHost) mcpAccessHost.value = accessHost
|
||||
if (/^https?:\/\//i.test(endpoint)) mcpAccessEndpoint.value = endpoint
|
||||
}
|
||||
|
||||
const mcpSkillFallback = [
|
||||
'# WeChat MCP Copilot',
|
||||
'',
|
||||
@@ -800,10 +815,12 @@ const refreshMcpLanAccess = async () => {
|
||||
if (window.wechatDesktop?.getMcpLanAccess) {
|
||||
const resp = await window.wechatDesktop.getMcpLanAccess()
|
||||
mcpLanAccessEnabled.value = !!resp?.enabled
|
||||
applyMcpAccessInfo(resp)
|
||||
return
|
||||
}
|
||||
const resp = await fetchAdminEndpoint('/admin/mcp-access')
|
||||
mcpLanAccessEnabled.value = !!resp?.enabled
|
||||
applyMcpAccessInfo(resp)
|
||||
} catch (e) {
|
||||
mcpLanAccessError.value = e?.message || '读取 MCP 接入状态失败'
|
||||
} finally {
|
||||
@@ -881,7 +898,9 @@ const setMcpLanAccess = async (enabled) => {
|
||||
if (window.wechatDesktop?.setMcpLanAccess) {
|
||||
const resp = await window.wechatDesktop.setMcpLanAccess(!!enabled)
|
||||
mcpLanAccessEnabled.value = !!resp?.enabled
|
||||
applyMcpAccessInfo(resp)
|
||||
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,后端已重启。' : 'MCP 局域网接入状态未变化。'
|
||||
await refreshMcpSkillBundle()
|
||||
return
|
||||
}
|
||||
|
||||
@@ -890,11 +909,14 @@ const setMcpLanAccess = async (enabled) => {
|
||||
body: { enabled: !!enabled },
|
||||
})
|
||||
mcpLanAccessEnabled.value = !!resp?.enabled
|
||||
applyMcpAccessInfo(resp)
|
||||
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,正在等待后端重启。' : 'MCP 局域网接入状态未变化。'
|
||||
if (resp?.changed) {
|
||||
await waitForBackendHealth(30_000)
|
||||
await refreshMcpLanAccess()
|
||||
mcpLanAccessMessage.value = 'MCP 局域网接入已更新,后端已恢复。'
|
||||
}
|
||||
await refreshMcpSkillBundle()
|
||||
} catch (e) {
|
||||
mcpLanAccessEnabled.value = previous
|
||||
mcpLanAccessError.value = e?.message || '设置 MCP 接入状态失败'
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
import uvicorn
|
||||
import os
|
||||
from pathlib import Path
|
||||
from wechat_decrypt_tool.network_access import get_lan_access_host
|
||||
from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
|
||||
|
||||
def main():
|
||||
@@ -18,6 +19,7 @@ def main():
|
||||
host, host_source = read_effective_backend_host(default="127.0.0.1")
|
||||
port, port_source = read_effective_backend_port(default=10392)
|
||||
access_host = "127.0.0.1" if host in {"0.0.0.0", "::"} else host
|
||||
lan_access_host = get_lan_access_host(default="127.0.0.1") if host in {"0.0.0.0", "::"} else access_host
|
||||
|
||||
print("=" * 60)
|
||||
print("微信解密工具 API 服务")
|
||||
@@ -38,6 +40,8 @@ def main():
|
||||
print(f"监听地址: {host}")
|
||||
print(f"API文档: http://{access_host}:{port}/docs")
|
||||
print(f"健康检查: http://{access_host}:{port}/api/health")
|
||||
if lan_access_host != access_host:
|
||||
print(f"局域网 MCP: http://{lan_access_host}:{port}/mcp")
|
||||
print("按 Ctrl+C 停止服务")
|
||||
print("=" * 60)
|
||||
|
||||
|
||||
+1
-1
@@ -1,6 +1,6 @@
|
||||
[project]
|
||||
name = "wechat-decrypt-tool"
|
||||
version = "1.9.0"
|
||||
version = "1.9.2"
|
||||
description = "Modern WeChat database decryption tool with React frontend"
|
||||
readme = "README.md"
|
||||
requires-python = ">=3.11"
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
---
|
||||
name: wechat-mcp-copilot
|
||||
version: "1.0.0"
|
||||
description: Use WeChatDataAnalysis MCP to inspect local WeChat accounts, contacts, sessions, messages, Moments, media, exports, and analytics through a small routed playbook. Trigger when the user asks to search, summarize, export, diagnose, or reason over local WeChat data.
|
||||
description: Use WeChatDataAnalysis MCP to inspect local WeChat accounts, contacts, sessions, messages, Moments, media, and analytics through a small routed playbook. Trigger when the user asks to search, summarize, diagnose, or reason over local WeChat data.
|
||||
---
|
||||
|
||||
# WeChat MCP Copilot
|
||||
@@ -12,7 +12,7 @@ Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targ
|
||||
|
||||
1. Start with `references/routing.md`.
|
||||
2. Load only one domain reference after routing.
|
||||
3. Load `references/pagination-budget.md` before broad searches, exports, or multi-page scans.
|
||||
3. Load `references/pagination-budget.md` before broad searches or multi-page scans.
|
||||
4. Use `references/failure-recovery.md` when MCP, database readiness, or empty results are unclear.
|
||||
5. For phone, ScreenMemo, or external MCP clients, prefer `wechat.mobile.*` facade tools before low-level tools.
|
||||
6. Do not load the full tool catalog unless the user asks about available tools.
|
||||
@@ -21,12 +21,10 @@ Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targ
|
||||
|
||||
- `references/routing.md`: first-hop intent routing.
|
||||
- `references/mobile.md`: phone-friendly facade tools and compact response rules.
|
||||
- `references/setup-system.md`: setup, keys, decrypt, import, health, and system operations.
|
||||
- `references/target-resolution.md`: fuzzy contact/session resolution.
|
||||
- `references/chats.md`: chat sessions, messages, search, and context.
|
||||
- `references/moments.md`: Moments timeline, posters, likes, comments, media.
|
||||
- `references/media.md`: images, videos, emoji, files, voice resources without transcription.
|
||||
- `references/export.md`: chat, Moments, and account archive export jobs.
|
||||
- `references/analytics.md`: wrapped cards, counts, rankings, and aggregate analysis.
|
||||
- `references/pagination-budget.md`: limits, cursors, result clipping, stopping rules.
|
||||
- `references/failure-recovery.md`: empty result, not-ready database, ambiguous targets, retries.
|
||||
|
||||
@@ -13,7 +13,7 @@ Use this for annual summaries, rankings, counts, and aggregate questions.
|
||||
## Rules
|
||||
|
||||
- Prefer `get_wrapped_meta` then `get_wrapped_card` for mobile or constrained contexts.
|
||||
- Wrapped annual tools read existing generated cache only; if a cache is missing, ask the user to open Wrapped in the desktop/web app first.
|
||||
- Use `get_wrapped_annual` only when the user needs the whole annual dataset.
|
||||
- For broad statistics, prefer aggregate tools or targeted searches over full message pagination.
|
||||
- Always state the account, time range, and metric basis when answering.
|
||||
|
||||
|
||||
@@ -1,29 +0,0 @@
|
||||
# Export
|
||||
|
||||
Use export only when the user asks for an artifact.
|
||||
|
||||
## Chat Export
|
||||
|
||||
1. Resolve target session if `scope=selected`.
|
||||
2. Confirm time range, format, media options, and output directory when needed.
|
||||
3. Preview targets with `wechat.export.preview_chat_targets`.
|
||||
4. Create job with `wechat.export.create_chat_export`.
|
||||
5. Poll `wechat.export.get_chat_export`.
|
||||
6. Return `wechat.export.get_chat_export_download` when ready.
|
||||
7. Use `wechat.export.get_chat_export_events_url` when the client can consume SSE progress.
|
||||
|
||||
## Moments Export
|
||||
|
||||
Use `wechat.export.create_moments_export`, `wechat.export.get_moments_export`, `wechat.export.get_moments_export_download`, and `wechat.export.get_moments_export_events_url`.
|
||||
|
||||
## Account Archive
|
||||
|
||||
Use `wechat.export.create_account_archive`, `wechat.export.get_account_archive`, `wechat.export.get_account_archive_download`, and `wechat.export.cancel_account_archive`.
|
||||
|
||||
## Contacts Export
|
||||
|
||||
Use `wechat.contacts.export_contacts` only when the user asks for a contacts file. It writes JSON or CSV to a local output directory.
|
||||
|
||||
Do not silently export all history and all media unless the user explicitly asked for that scope.
|
||||
|
||||
For phone clients, prefer `wechat.mobile.export_job` unless exact low-level export options are required.
|
||||
@@ -8,9 +8,8 @@ Use this when MCP status, DB readiness, or results are suspicious.
|
||||
2. `wechat.core.get_status`
|
||||
3. `wechat.core.list_accounts`
|
||||
4. `wechat.core.get_account_info`
|
||||
5. Search index status with `wechat.chat.get_search_index_status` when message search fails.
|
||||
6. Moments availability by checking account info and `wechat.moments.list_users`.
|
||||
7. Setup readiness: load `setup-system.md` for keys, decrypt, import, health, or media-key problems.
|
||||
5. Moments availability by checking account info and `wechat.moments.list_users`.
|
||||
6. For backend diagnostics, MCP LAN access, data preparation, index/cache build, export, realtime sync, local editing, or system settings, direct the user to the desktop/web app.
|
||||
|
||||
## Empty Results
|
||||
|
||||
@@ -18,4 +17,4 @@ Use this when MCP status, DB readiness, or results are suspicious.
|
||||
- Try contact/session resolution with a simpler keyword.
|
||||
- Try session search before global message search when a target is known.
|
||||
- For Moments, resolve poster identity before timeline filtering.
|
||||
- If setup is not ready, stop content tools and explain the missing readiness condition.
|
||||
- If data is not ready, stop content tools and explain that data preparation is handled in the desktop app, not through MCP.
|
||||
|
||||
@@ -13,7 +13,6 @@ Use this for image, video, emoji, file, link, and voice resources.
|
||||
- `wechat.media.get_decrypted_resource_url`
|
||||
- `wechat.media.get_proxy_image_url`
|
||||
- `wechat.media.get_favicon_url`
|
||||
- `wechat.media.open_chat_media_folder`
|
||||
- `wechat.biz.get_proxy_image_url`
|
||||
- `wechat.moments.get_media_url`
|
||||
- `wechat.moments.get_article_thumb_url`
|
||||
@@ -25,6 +24,6 @@ Use this for image, video, emoji, file, link, and voice resources.
|
||||
- Media tools return URLs or resource metadata; they do not inline large binary payloads.
|
||||
- Voice resources are files only. Do not transcribe voice messages.
|
||||
- For phone clients, prefer `wechat.mobile.get_media_links` first.
|
||||
- `open_chat_media_folder` is a desktop-host action; do not use it for phone-only flows.
|
||||
- MCP does not open local folders or download media into cache; use returned URLs in the client.
|
||||
- Locate the message first, then fetch media URL by message fields such as `server_id`, `username`, `md5`, or returned media references.
|
||||
- For Moments, prefer local media URL fields from timeline records. Use remote video/article helpers only when the timeline record has a remote URL or article URL.
|
||||
|
||||
@@ -13,18 +13,18 @@ Use this for phone, ScreenMemo, and external MCP clients unless the user needs a
|
||||
- `wechat.mobile.search_moments`: compact Moments search.
|
||||
- `wechat.mobile.get_media_links`: URL-only media lookup.
|
||||
- `wechat.mobile.get_analytics`: compact analytics by metric.
|
||||
- `wechat.mobile.export_job`: preview/create/status/download/cancel export jobs.
|
||||
|
||||
## Budget Rules
|
||||
|
||||
- Keep `limit` at 10-20 for first calls.
|
||||
- Use `offset` or returned cursor fields for paging.
|
||||
- Do not call full annual analytics by default; use `metric=digest` or a single card.
|
||||
- Do not call full annual analytics by default; use `metric=digest` or a single card. Wrapped annual data is cache-only through MCP.
|
||||
- Do not fetch binary media through MCP. Use returned URLs in the app.
|
||||
- Use low-level tools only for debugging, editing, raw fields, unusual media, or exact export control.
|
||||
- Use low-level tools only for debugging, raw fields, or unusual media URL construction.
|
||||
- Data preparation, index/cache build, export, realtime sync, local editing, system settings, and data deletion tools are not exposed through MCP.
|
||||
|
||||
## Recovery
|
||||
|
||||
- If `ready=false`, load `setup-system.md`.
|
||||
- If `ready=false`, stop content tools and direct the user to the desktop/web app for data preparation or backend diagnostics.
|
||||
- If target resolution is ambiguous, ask for one clarifying clue or show top candidates.
|
||||
- If search returns nothing, try `resolve_target` and then `get_chat_context` before declaring no data.
|
||||
|
||||
@@ -14,5 +14,4 @@ Use this for 朋友圈, posts, likes, comments, shared links, and Moments media.
|
||||
- Person names must be resolved to username before filtering timeline by `usernames`.
|
||||
- Keyword search is for post content/topic, not poster identity.
|
||||
- Do not request raw XML by default.
|
||||
- Use `wechat.moments.sync_latest` only when the user explicitly wants fresh local sync or status indicates data is stale.
|
||||
|
||||
- Realtime/local sync tools are not exposed through MCP; ask the user to refresh data in the app when Moments data looks stale.
|
||||
|
||||
@@ -8,17 +8,16 @@ Use this first for every WeChatDataAnalysis MCP task.
|
||||
- Status, readiness, "why can't I find anything": `wechat.core.get_status`, or `wechat.mobile.get_overview` for phone clients.
|
||||
- Available tools or packages: `wechat.core.list_tools`.
|
||||
- Account selection: `wechat.core.list_accounts`, then `wechat.core.get_account_info`.
|
||||
- Key/decrypt/import/backend health problems: load `setup-system.md`.
|
||||
- Backend health, logs, MCP LAN access, port, system settings, key/decrypt/import/data preparation, index/cache build, export, realtime sync, local editing, or data deletion requests: explain that these operations are not exposed through MCP and should be handled in the desktop/web app.
|
||||
- Fuzzy person/group/official account: load `target-resolution.md`.
|
||||
- Chat content, recent messages, keyword search: load `chats.md`.
|
||||
- Moments / 朋友圈 / likes / comments / post media: load `moments.md`.
|
||||
- Images, videos, emoji, files, voice resources: load `media.md`.
|
||||
- Export requests: load `export.md`.
|
||||
- Rankings, yearly summary, activity stats: load `analytics.md`.
|
||||
- Empty results or setup errors: load `failure-recovery.md`.
|
||||
- Rankings, yearly summary, activity stats: load `analytics.md`; if Wrapped cache is missing, ask the user to generate it in the app.
|
||||
- Empty results or readiness errors: load `failure-recovery.md`.
|
||||
|
||||
## Mixed Intent
|
||||
|
||||
Resolve the target first, then load only the main domain reference. Do not load chats, moments, media, export, and analytics together unless the user explicitly asks for a broad audit.
|
||||
Resolve the target first, then load only the main domain reference. Do not load chats, moments, media, and analytics together unless the user explicitly asks for a broad audit.
|
||||
|
||||
For phone clients, keep using `mobile.md` until the user needs a low-level fallback such as editing, raw fields, special media URL construction, or exact export options.
|
||||
For phone clients, keep using `mobile.md` until the user needs a low-level fallback such as raw fields or special media URL construction.
|
||||
|
||||
@@ -1,41 +0,0 @@
|
||||
# Setup And System
|
||||
|
||||
Use this when the database is not ready, keys are needed, decrypted data must be imported, or the backend itself needs inspection.
|
||||
|
||||
## Setup Tools
|
||||
|
||||
- `wechat.setup.get_saved_keys`: read saved DB/media keys for an account or wxid directory.
|
||||
- `wechat.setup.get_database_key`: desktop workflow to extract the DB key from local WeChat.
|
||||
- `wechat.setup.get_image_key`: fetch and save image AES/XOR keys.
|
||||
- `wechat.setup.decrypt_databases`: decrypt databases from `db_storage_path` and a DB key.
|
||||
- `wechat.setup.get_decrypt_stream_url`: SSE URL for decrypt progress.
|
||||
- `wechat.setup.preview_import_decrypted`: validate an already-decrypted account directory.
|
||||
- `wechat.setup.get_import_decrypted_stream_url`: SSE URL for import progress.
|
||||
- `wechat.setup.cancel_import_decrypted`: cancel an import job by `job_id`.
|
||||
- `wechat.setup.save_media_keys`: save media XOR/AES keys.
|
||||
- `wechat.setup.decrypt_all_media`: decrypt all local `.dat` image resources.
|
||||
- `wechat.setup.get_decrypt_all_media_stream_url`: SSE URL for bulk media decrypt progress.
|
||||
- `wechat.setup.get_download_all_emojis_stream_url`: SSE URL for bulk emoji download progress.
|
||||
|
||||
## System Tools
|
||||
|
||||
- `wechat.system.health_check`
|
||||
- `wechat.system.api_root`
|
||||
- `wechat.system.get_backend_log_file`
|
||||
- `wechat.system.open_backend_log_file`
|
||||
- `wechat.system.get_backend_port`
|
||||
- `wechat.system.set_backend_port_setting`
|
||||
- `wechat.system.set_backend_port_and_restart`
|
||||
- `wechat.system.get_img_helper_status`
|
||||
- `wechat.system.toggle_img_helper`
|
||||
- `wechat.system.pick_directory`
|
||||
- `wechat.system.log_frontend_server_error`
|
||||
|
||||
## Rules
|
||||
|
||||
- Stream tools return `streamUrl`; the client consumes SSE outside the MCP JSON response.
|
||||
- `set_backend_port_setting` persists the setting and may require backend restart by the user/client flow.
|
||||
- `set_backend_port_and_restart` changes the port through the desktop restart flow and will disrupt the current backend connection.
|
||||
- `open_backend_log_file` and `pick_directory` are desktop-host GUI actions; do not use them for phone-only flows.
|
||||
- DB key extraction and image helper toggling depend on the local desktop WeChat state.
|
||||
- Import/decrypt/media bulk operations write local files; summarize expected impact before running them.
|
||||
@@ -1,5 +1,5 @@
|
||||
"""微信数据库解密工具
|
||||
"""
|
||||
|
||||
__version__ = "1.9.0"
|
||||
__version__ = "1.9.2"
|
||||
__author__ = "WeChat Decrypt Tool"
|
||||
|
||||
@@ -1992,45 +1992,93 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
|
||||
return previews
|
||||
|
||||
|
||||
def _pick_display_name(contact_row: Optional[sqlite3.Row], fallback_username: str) -> str:
|
||||
def _row_get_value(row: Any, key: str, default: Any = None) -> Any:
|
||||
if row is None:
|
||||
return default
|
||||
try:
|
||||
return row[key]
|
||||
except Exception:
|
||||
pass
|
||||
if isinstance(row, dict):
|
||||
return row.get(key, default)
|
||||
return default
|
||||
|
||||
|
||||
def _normalize_contact_text(value: Any) -> str:
|
||||
return _decode_sqlite_text(value).strip()
|
||||
|
||||
|
||||
def _normalize_avatar_url(value: Any) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, memoryview):
|
||||
value = value.tobytes()
|
||||
if isinstance(value, (bytes, bytearray)):
|
||||
raw = bytes(value)
|
||||
if not raw:
|
||||
return ""
|
||||
# Avatar URLs should be ASCII/UTF-8 HTTP(S) URLs. If invalid bytes were
|
||||
# stored in the TEXT column, ignore that avatar instead of failing the
|
||||
# surrounding chat/contact response.
|
||||
try:
|
||||
text = raw.decode("utf-8")
|
||||
except UnicodeDecodeError:
|
||||
return ""
|
||||
else:
|
||||
text = str(value or "")
|
||||
|
||||
text = text.strip()
|
||||
if text.lower().startswith(("http://", "https://")):
|
||||
return text
|
||||
return ""
|
||||
|
||||
|
||||
def _contact_row_to_dict(row: Any) -> dict[str, Any]:
|
||||
username = _normalize_contact_text(_row_get_value(row, "username", ""))
|
||||
return {
|
||||
"username": username,
|
||||
"remark": _normalize_contact_text(_row_get_value(row, "remark", "")),
|
||||
"nick_name": _normalize_contact_text(_row_get_value(row, "nick_name", "")),
|
||||
"alias": _normalize_contact_text(_row_get_value(row, "alias", "")),
|
||||
"big_head_url": _normalize_avatar_url(_row_get_value(row, "big_head_url", "")),
|
||||
"small_head_url": _normalize_avatar_url(_row_get_value(row, "small_head_url", "")),
|
||||
}
|
||||
|
||||
|
||||
def _pick_display_name(contact_row: Optional[Any], fallback_username: str) -> str:
|
||||
if contact_row is None:
|
||||
return fallback_username
|
||||
|
||||
for key in ("remark", "nick_name", "alias"):
|
||||
try:
|
||||
v = contact_row[key]
|
||||
except Exception:
|
||||
v = None
|
||||
if isinstance(v, str) and v.strip():
|
||||
return v.strip()
|
||||
v = _normalize_contact_text(_row_get_value(contact_row, key, ""))
|
||||
if v:
|
||||
return v
|
||||
|
||||
return fallback_username
|
||||
|
||||
|
||||
def _pick_avatar_url(contact_row: Optional[sqlite3.Row]) -> Optional[str]:
|
||||
def _pick_avatar_url(contact_row: Optional[Any]) -> Optional[str]:
|
||||
if contact_row is None:
|
||||
return None
|
||||
|
||||
for key in ("big_head_url", "small_head_url"):
|
||||
try:
|
||||
v = contact_row[key]
|
||||
except Exception:
|
||||
v = None
|
||||
if isinstance(v, str) and v.strip():
|
||||
return v.strip()
|
||||
v = _normalize_avatar_url(_row_get_value(contact_row, key, ""))
|
||||
if v:
|
||||
return v
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, sqlite3.Row]:
|
||||
def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, dict[str, Any]]:
|
||||
uniq = list(dict.fromkeys([u for u in usernames if u]))
|
||||
if not uniq:
|
||||
return {}
|
||||
|
||||
result: dict[str, sqlite3.Row] = {}
|
||||
result: dict[str, dict[str, Any]] = {}
|
||||
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
conn.text_factory = bytes
|
||||
try:
|
||||
def query_table(table: str, targets: list[str]) -> None:
|
||||
if not targets:
|
||||
@@ -2043,7 +2091,10 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
|
||||
"""
|
||||
rows = conn.execute(sql, targets).fetchall()
|
||||
for r in rows:
|
||||
result[r["username"]] = r
|
||||
item = _contact_row_to_dict(r)
|
||||
username = str(item.get("username") or "").strip()
|
||||
if username:
|
||||
result[username] = item
|
||||
|
||||
query_table("contact", uniq)
|
||||
missing = [u for u in uniq if u not in result]
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,153 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import ipaddress
|
||||
import socket
|
||||
|
||||
|
||||
_VIRTUAL_INTERFACE_MARKERS = (
|
||||
"docker",
|
||||
"hyper-v",
|
||||
"loopback",
|
||||
"npcap",
|
||||
"tailscale",
|
||||
"virtual",
|
||||
"virtualbox",
|
||||
"vmware",
|
||||
"vethernet",
|
||||
"wsl",
|
||||
"zerotier",
|
||||
)
|
||||
|
||||
_PREFERRED_INTERFACE_MARKERS = (
|
||||
"ethernet",
|
||||
"wi-fi",
|
||||
"wifi",
|
||||
"wireless",
|
||||
"wlan",
|
||||
"以太",
|
||||
"无线",
|
||||
)
|
||||
|
||||
|
||||
def _parse_ipv4(value: object) -> ipaddress.IPv4Address | None:
|
||||
try:
|
||||
ip = ipaddress.ip_address(str(value or "").strip())
|
||||
except ValueError:
|
||||
return None
|
||||
return ip if isinstance(ip, ipaddress.IPv4Address) else None
|
||||
|
||||
|
||||
def _is_reachable_client_ipv4(ip: ipaddress.IPv4Address) -> bool:
|
||||
return not (
|
||||
ip.is_loopback
|
||||
or ip.is_unspecified
|
||||
or ip.is_link_local
|
||||
or ip.is_multicast
|
||||
or ip.is_reserved
|
||||
)
|
||||
|
||||
|
||||
def _interface_penalty(name: str) -> int:
|
||||
lower = str(name or "").lower()
|
||||
if any(marker in lower for marker in _VIRTUAL_INTERFACE_MARKERS):
|
||||
return 30
|
||||
if any(marker in lower for marker in _PREFERRED_INTERFACE_MARKERS):
|
||||
return 0
|
||||
return 10
|
||||
|
||||
|
||||
def _add_candidate(
|
||||
candidates: list[tuple[int, int, int, str]],
|
||||
seen: set[str],
|
||||
value: object,
|
||||
*,
|
||||
interface_name: str = "",
|
||||
source_order: int = 0,
|
||||
) -> None:
|
||||
ip = _parse_ipv4(value)
|
||||
if not ip or not _is_reachable_client_ipv4(ip):
|
||||
return
|
||||
|
||||
text = str(ip)
|
||||
if text in seen:
|
||||
return
|
||||
seen.add(text)
|
||||
|
||||
private_rank = 0 if ip.is_private else 1
|
||||
candidates.append((private_rank, _interface_penalty(interface_name), source_order, text))
|
||||
|
||||
|
||||
def _add_psutil_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
|
||||
try:
|
||||
import psutil # type: ignore
|
||||
except Exception:
|
||||
return
|
||||
|
||||
try:
|
||||
stats_by_name = psutil.net_if_stats()
|
||||
interfaces = psutil.net_if_addrs()
|
||||
except Exception:
|
||||
return
|
||||
|
||||
for interface_name, addresses in interfaces.items():
|
||||
try:
|
||||
stats = stats_by_name.get(interface_name)
|
||||
if stats is not None and not bool(getattr(stats, "isup", False)):
|
||||
continue
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for addr in addresses:
|
||||
try:
|
||||
if getattr(addr, "family", None) != socket.AF_INET:
|
||||
continue
|
||||
_add_candidate(
|
||||
candidates,
|
||||
seen,
|
||||
getattr(addr, "address", ""),
|
||||
interface_name=interface_name,
|
||||
source_order=0,
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
def _add_route_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
|
||||
# UDP connect 不会实际发包,只用于询问系统默认出站路由会使用哪个本机地址。
|
||||
for target in ("223.5.5.5", "8.8.8.8", "1.1.1.1"):
|
||||
try:
|
||||
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock:
|
||||
sock.settimeout(0.2)
|
||||
sock.connect((target, 80))
|
||||
local_ip = sock.getsockname()[0]
|
||||
except Exception:
|
||||
continue
|
||||
_add_candidate(candidates, seen, local_ip, interface_name="", source_order=1)
|
||||
|
||||
|
||||
def _add_hostname_candidates(candidates: list[tuple[int, int, int, str]], seen: set[str]) -> None:
|
||||
try:
|
||||
hostname = socket.gethostname()
|
||||
_, _, addresses = socket.gethostbyname_ex(hostname)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
for address in addresses:
|
||||
_add_candidate(candidates, seen, address, interface_name="", source_order=2)
|
||||
|
||||
|
||||
def get_lan_access_host(default: str = "127.0.0.1") -> str:
|
||||
"""返回同网段设备可访问的本机 IPv4 地址。"""
|
||||
|
||||
candidates: list[tuple[int, int, int, str]] = []
|
||||
seen: set[str] = set()
|
||||
|
||||
_add_psutil_candidates(candidates, seen)
|
||||
_add_route_candidates(candidates, seen)
|
||||
_add_hostname_candidates(candidates, seen)
|
||||
|
||||
if not candidates:
|
||||
return default
|
||||
|
||||
candidates.sort()
|
||||
return candidates[0][3]
|
||||
@@ -14,6 +14,7 @@ from fastapi import APIRouter, BackgroundTasks, HTTPException
|
||||
from starlette.requests import Request
|
||||
|
||||
from ..logging_config import get_log_file_path, get_logger
|
||||
from ..network_access import get_lan_access_host
|
||||
from ..path_fix import PathFixRoute
|
||||
from ..runtime_settings import (
|
||||
LAN_BACKEND_HOST,
|
||||
@@ -56,6 +57,28 @@ def _get_backend_access_host() -> str:
|
||||
return host
|
||||
|
||||
|
||||
def _get_mcp_access_host(bind_host: str | None = None) -> str:
|
||||
host = str(bind_host or _get_backend_bind_host() or "").strip()
|
||||
if host in {LAN_BACKEND_HOST, "::"}:
|
||||
return get_lan_access_host(default=LOOPBACK_BACKEND_HOST)
|
||||
return host or LOOPBACK_BACKEND_HOST
|
||||
|
||||
|
||||
def _get_mcp_access_urls(port: int, bind_host: str | None = None) -> dict:
|
||||
access_host = _get_mcp_access_host(bind_host)
|
||||
origin = f"http://{_format_host_for_url(access_host)}:{int(port)}"
|
||||
return {
|
||||
"access_host": access_host,
|
||||
"accessHost": access_host,
|
||||
"mcp_endpoint": f"{origin}/mcp",
|
||||
"mcpEndpoint": f"{origin}/mcp",
|
||||
"skill_bundle_url": f"{origin}/mcp/skill/bundle",
|
||||
"skillBundleUrl": f"{origin}/mcp/skill/bundle",
|
||||
"skill_markdown_url": f"{origin}/mcp/skill",
|
||||
"skillMarkdownUrl": f"{origin}/mcp/skill",
|
||||
}
|
||||
|
||||
|
||||
def _is_loopback_client(request: Request) -> bool:
|
||||
client = request.client
|
||||
host = str(getattr(client, "host", "") or "").strip()
|
||||
@@ -272,6 +295,7 @@ async def get_mcp_access() -> dict:
|
||||
"default_host": LOOPBACK_BACKEND_HOST,
|
||||
"lan_host": LAN_BACKEND_HOST,
|
||||
"restart_required": False,
|
||||
**_get_mcp_access_urls(port, host),
|
||||
}
|
||||
|
||||
|
||||
@@ -407,6 +431,7 @@ async def set_mcp_access(payload: dict, request: Request, background_tasks: Back
|
||||
"port": int(current_port),
|
||||
"ui_url": f"http://{_format_host_for_url(_get_backend_access_host())}:{int(current_port)}/",
|
||||
"env_file": str(env_file) if env_file else None,
|
||||
**_get_mcp_access_urls(int(current_port), next_host),
|
||||
}
|
||||
|
||||
_PORT_CHANGE_IN_PROGRESS = True
|
||||
@@ -428,6 +453,7 @@ async def set_mcp_access(payload: dict, request: Request, background_tasks: Back
|
||||
"ui_url": f"http://{_format_host_for_url(LOOPBACK_BACKEND_HOST)}:{int(current_port)}/",
|
||||
"env_file": str(env_file) if env_file else None,
|
||||
"restart_scheduled": True,
|
||||
**_get_mcp_access_urls(int(current_port), next_host),
|
||||
}
|
||||
finally:
|
||||
_PORT_CHANGE_IN_PROGRESS = False
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import sys
|
||||
import threading
|
||||
import sqlite3
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
@@ -97,7 +98,101 @@ class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
|
||||
self.assertEqual(len(sessions), 1)
|
||||
self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
|
||||
|
||||
def test_sessions_ignore_invalid_utf8_avatar_url(self):
|
||||
with TemporaryDirectory() as td:
|
||||
account_dir = Path(td) / "acc"
|
||||
account_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
session_conn = sqlite3.connect(str(account_dir / "session.db"))
|
||||
try:
|
||||
session_conn.execute(
|
||||
"""
|
||||
CREATE TABLE SessionTable (
|
||||
username TEXT,
|
||||
unread_count INTEGER,
|
||||
is_hidden INTEGER,
|
||||
summary TEXT,
|
||||
draft TEXT,
|
||||
last_timestamp INTEGER,
|
||||
sort_timestamp INTEGER,
|
||||
last_msg_locald_id INTEGER,
|
||||
last_msg_type INTEGER,
|
||||
last_msg_sub_type INTEGER,
|
||||
last_msg_sender TEXT,
|
||||
last_sender_display_name TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
session_conn.execute(
|
||||
"INSERT INTO SessionTable VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
("wxid_bad_avatar", 0, 0, "hello", "", 100, 100, 1, 1, 0, "", ""),
|
||||
)
|
||||
session_conn.commit()
|
||||
finally:
|
||||
session_conn.close()
|
||||
|
||||
contact_conn = sqlite3.connect(str(account_dir / "contact.db"))
|
||||
try:
|
||||
contact_conn.execute(
|
||||
"""
|
||||
CREATE TABLE contact (
|
||||
username TEXT,
|
||||
remark TEXT,
|
||||
nick_name TEXT,
|
||||
alias TEXT,
|
||||
flag INTEGER,
|
||||
big_head_url TEXT,
|
||||
small_head_url TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
contact_conn.execute(
|
||||
"""
|
||||
CREATE TABLE stranger (
|
||||
username TEXT,
|
||||
remark TEXT,
|
||||
nick_name TEXT,
|
||||
alias TEXT,
|
||||
flag INTEGER,
|
||||
big_head_url TEXT,
|
||||
small_head_url TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
contact_conn.execute(
|
||||
"""
|
||||
INSERT INTO contact
|
||||
(username, remark, nick_name, alias, flag, big_head_url, small_head_url)
|
||||
VALUES (?, ?, ?, ?, ?, CAST(x'fffe687474703a2f2f6578616d706c652e746573742f612e706e67' AS TEXT), ?)
|
||||
""",
|
||||
("wxid_bad_avatar", "", "坏头像好友", "", 0, ""),
|
||||
)
|
||||
contact_conn.commit()
|
||||
finally:
|
||||
contact_conn.close()
|
||||
|
||||
with (
|
||||
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
|
||||
patch.object(chat_router.WCDB_REALTIME, "get_status", return_value={}),
|
||||
patch.object(chat_router, "load_session_last_messages", return_value={}),
|
||||
patch.object(chat_router, "_load_latest_message_previews", return_value={}),
|
||||
):
|
||||
resp = chat_router.list_chat_sessions(
|
||||
_DummyRequest(),
|
||||
account="acc",
|
||||
limit=50,
|
||||
include_hidden=True,
|
||||
include_official=True,
|
||||
preview="session",
|
||||
)
|
||||
|
||||
self.assertEqual(resp.get("status"), "success")
|
||||
sessions = resp.get("sessions") or []
|
||||
self.assertEqual(len(sessions), 1)
|
||||
self.assertEqual(sessions[0].get("name"), "坏头像好友")
|
||||
self.assertEqual(sessions[0].get("lastMessage"), "hello")
|
||||
self.assertIn("/api/chat/avatar", sessions[0].get("avatar") or "")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
|
||||
@@ -0,0 +1,143 @@
|
||||
import importlib
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest.mock import patch
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
|
||||
def _close_logging_handlers() -> None:
|
||||
for logger_name in ("", "uvicorn", "uvicorn.access", "uvicorn.error", "fastapi"):
|
||||
lg = logging.getLogger(logger_name)
|
||||
for handler in lg.handlers[:]:
|
||||
try:
|
||||
handler.close()
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
lg.removeHandler(handler)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
class TestMcpAccessHost(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self._prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
||||
self._prev_host = os.environ.get("WECHAT_TOOL_HOST")
|
||||
self._prev_port = os.environ.get("WECHAT_TOOL_PORT")
|
||||
self._td = TemporaryDirectory()
|
||||
os.environ["WECHAT_TOOL_DATA_DIR"] = self._td.name
|
||||
os.environ.pop("WECHAT_TOOL_HOST", None)
|
||||
os.environ.pop("WECHAT_TOOL_PORT", None)
|
||||
|
||||
import wechat_decrypt_tool.app_paths as app_paths
|
||||
import wechat_decrypt_tool.runtime_settings as runtime_settings
|
||||
import wechat_decrypt_tool.routers.admin as admin_router
|
||||
|
||||
importlib.reload(app_paths)
|
||||
importlib.reload(runtime_settings)
|
||||
importlib.reload(admin_router)
|
||||
|
||||
self.runtime_settings = runtime_settings
|
||||
self.admin_router = admin_router
|
||||
|
||||
def tearDown(self) -> None:
|
||||
_close_logging_handlers()
|
||||
|
||||
if self._prev_data_dir is None:
|
||||
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
||||
else:
|
||||
os.environ["WECHAT_TOOL_DATA_DIR"] = self._prev_data_dir
|
||||
|
||||
if self._prev_host is None:
|
||||
os.environ.pop("WECHAT_TOOL_HOST", None)
|
||||
else:
|
||||
os.environ["WECHAT_TOOL_HOST"] = self._prev_host
|
||||
|
||||
if self._prev_port is None:
|
||||
os.environ.pop("WECHAT_TOOL_PORT", None)
|
||||
else:
|
||||
os.environ["WECHAT_TOOL_PORT"] = self._prev_port
|
||||
|
||||
self._td.cleanup()
|
||||
|
||||
def _client(self) -> TestClient:
|
||||
app = FastAPI()
|
||||
app.include_router(self.admin_router.router)
|
||||
return TestClient(app, client=("127.0.0.1", 52010))
|
||||
|
||||
def test_mcp_access_reports_lan_endpoint_when_lan_enabled(self) -> None:
|
||||
self.runtime_settings.write_backend_host_setting(self.runtime_settings.LAN_BACKEND_HOST)
|
||||
self.runtime_settings.write_backend_port_setting(12092)
|
||||
client = self._client()
|
||||
|
||||
with patch.object(self.admin_router, "get_lan_access_host", return_value="192.168.1.23"):
|
||||
resp = client.get("/api/admin/mcp-access")
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
payload = resp.json()
|
||||
self.assertTrue(payload["enabled"])
|
||||
self.assertEqual(payload["host"], "0.0.0.0")
|
||||
self.assertEqual(payload["access_host"], "192.168.1.23")
|
||||
self.assertEqual(payload["accessHost"], "192.168.1.23")
|
||||
self.assertEqual(payload["mcp_endpoint"], "http://192.168.1.23:12092/mcp")
|
||||
self.assertEqual(payload["mcpEndpoint"], "http://192.168.1.23:12092/mcp")
|
||||
self.assertEqual(payload["skill_bundle_url"], "http://192.168.1.23:12092/mcp/skill/bundle")
|
||||
self.assertEqual(payload["skill_markdown_url"], "http://192.168.1.23:12092/mcp/skill")
|
||||
|
||||
def test_mcp_access_keeps_loopback_endpoint_when_lan_disabled(self) -> None:
|
||||
self.runtime_settings.write_backend_host_setting(self.runtime_settings.LOOPBACK_BACKEND_HOST)
|
||||
self.runtime_settings.write_backend_port_setting(12092)
|
||||
client = self._client()
|
||||
|
||||
with patch.object(self.admin_router, "get_lan_access_host", return_value="192.168.1.23"):
|
||||
resp = client.get("/api/admin/mcp-access")
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
payload = resp.json()
|
||||
self.assertFalse(payload["enabled"])
|
||||
self.assertEqual(payload["host"], "127.0.0.1")
|
||||
self.assertEqual(payload["access_host"], "127.0.0.1")
|
||||
self.assertEqual(payload["mcp_endpoint"], "http://127.0.0.1:12092/mcp")
|
||||
|
||||
|
||||
class TestNetworkAccessHost(unittest.TestCase):
|
||||
def test_get_lan_access_host_prefers_physical_private_ipv4(self) -> None:
|
||||
import wechat_decrypt_tool.network_access as network_access
|
||||
|
||||
with patch.object(network_access, "_add_psutil_candidates") as mocked_psutil, patch.object(
|
||||
network_access, "_add_route_candidates"
|
||||
) as mocked_route, patch.object(network_access, "_add_hostname_candidates") as mocked_hostname:
|
||||
|
||||
def add_psutil(candidates, seen):
|
||||
network_access._add_candidate(candidates, seen, "172.18.0.2", interface_name="Docker", source_order=0)
|
||||
network_access._add_candidate(candidates, seen, "192.168.1.23", interface_name="Wi-Fi", source_order=0)
|
||||
|
||||
mocked_psutil.side_effect = add_psutil
|
||||
mocked_route.side_effect = lambda candidates, seen: network_access._add_candidate(
|
||||
candidates, seen, "10.0.0.9", source_order=1
|
||||
)
|
||||
mocked_hostname.side_effect = lambda candidates, seen: None
|
||||
|
||||
self.assertEqual(network_access.get_lan_access_host(), "192.168.1.23")
|
||||
|
||||
def test_get_lan_access_host_falls_back_when_no_candidate(self) -> None:
|
||||
import wechat_decrypt_tool.network_access as network_access
|
||||
|
||||
with patch.object(network_access, "_add_psutil_candidates", return_value=None), patch.object(
|
||||
network_access, "_add_route_candidates", return_value=None
|
||||
), patch.object(network_access, "_add_hostname_candidates", return_value=None):
|
||||
self.assertEqual(network_access.get_lan_access_host(default="127.0.0.1"), "127.0.0.1")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
+200
-89
@@ -1,8 +1,9 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from unittest.mock import AsyncMock, Mock, patch
|
||||
from unittest.mock import patch
|
||||
|
||||
from fastapi import FastAPI
|
||||
from fastapi.testclient import TestClient
|
||||
@@ -14,6 +15,75 @@ sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
class TestMcpRouter(unittest.TestCase):
|
||||
TEST_TOKEN = "test-mcp-token-1234567890"
|
||||
REMOVED_MCP_TOOLS = {
|
||||
"wechat.setup.get_saved_keys",
|
||||
"wechat.setup.get_database_key",
|
||||
"wechat.setup.get_image_key",
|
||||
"wechat.setup.decrypt_databases",
|
||||
"wechat.setup.get_decrypt_stream_url",
|
||||
"wechat.setup.preview_import_decrypted",
|
||||
"wechat.setup.get_import_decrypted_stream_url",
|
||||
"wechat.setup.cancel_import_decrypted",
|
||||
"wechat.setup.save_media_keys",
|
||||
"wechat.setup.decrypt_all_media",
|
||||
"wechat.setup.get_decrypt_all_media_stream_url",
|
||||
"wechat.setup.get_download_all_emojis_stream_url",
|
||||
"wechat.contacts.export_contacts",
|
||||
"wechat.chat.get_realtime_status",
|
||||
"wechat.chat.sync_realtime_session",
|
||||
"wechat.chat.sync_realtime_all_sessions",
|
||||
"wechat.chat.get_realtime_events_url",
|
||||
"wechat.moments.sync_latest",
|
||||
"wechat.editing.list_edited_sessions",
|
||||
"wechat.editing.list_edited_messages",
|
||||
"wechat.editing.get_message_edit_status",
|
||||
"wechat.editing.edit_message",
|
||||
"wechat.editing.repair_message_sender",
|
||||
"wechat.editing.flip_message_direction",
|
||||
"wechat.editing.reset_message_edit",
|
||||
"wechat.editing.reset_session_edits",
|
||||
"wechat.export.preview_chat_targets",
|
||||
"wechat.export.create_chat_export",
|
||||
"wechat.export.list_chat_exports",
|
||||
"wechat.export.get_chat_export",
|
||||
"wechat.export.cancel_chat_export",
|
||||
"wechat.export.get_chat_export_download",
|
||||
"wechat.export.get_chat_export_events_url",
|
||||
"wechat.export.create_moments_export",
|
||||
"wechat.export.list_moments_exports",
|
||||
"wechat.export.get_moments_export",
|
||||
"wechat.export.cancel_moments_export",
|
||||
"wechat.export.get_moments_export_download",
|
||||
"wechat.export.get_moments_export_events_url",
|
||||
"wechat.export.create_account_archive",
|
||||
"wechat.export.get_account_archive",
|
||||
"wechat.export.cancel_account_archive",
|
||||
"wechat.export.get_account_archive_download",
|
||||
"wechat.mobile.export_job",
|
||||
"wechat.admin.detect_wechat_installation",
|
||||
"wechat.admin.get_current_wechat_account",
|
||||
"wechat.admin.get_wechat_runtime_status",
|
||||
"wechat.admin.delete_account_data",
|
||||
"wechat.system.api_root",
|
||||
"wechat.system.health_check",
|
||||
"wechat.system.get_backend_log_file",
|
||||
"wechat.system.open_backend_log_file",
|
||||
"wechat.system.log_frontend_server_error",
|
||||
"wechat.system.get_backend_port",
|
||||
"wechat.system.set_backend_port_setting",
|
||||
"wechat.system.set_backend_port_and_restart",
|
||||
"wechat.system.get_mcp_lan_access",
|
||||
"wechat.system.set_mcp_lan_access",
|
||||
"wechat.system.get_img_helper_status",
|
||||
"wechat.system.toggle_img_helper",
|
||||
"wechat.system.pick_directory",
|
||||
"wechat.chat.get_search_index_status",
|
||||
"wechat.chat.build_search_index",
|
||||
"wechat.chat.get_session_last_message_cache_status",
|
||||
"wechat.chat.build_session_last_message_cache",
|
||||
"wechat.media.download_chat_emoji",
|
||||
"wechat.media.open_chat_media_folder",
|
||||
}
|
||||
|
||||
def setUp(self):
|
||||
self._old_mcp_token = os.environ.get("WECHAT_TOOL_MCP_TOKEN")
|
||||
@@ -59,13 +129,11 @@ class TestMcpRouter(unittest.TestCase):
|
||||
self.assertIn("wechat.chat.list_search_senders", names)
|
||||
self.assertIn("wechat.chat.resolve_chat_history", names)
|
||||
self.assertIn("wechat.chat.resolve_app_message", names)
|
||||
self.assertIn("wechat.contacts.export_contacts", names)
|
||||
self.assertIn("wechat.export.create_chat_export", names)
|
||||
self.assertIn("wechat.export.get_account_archive_download", names)
|
||||
self.assertIn("wechat.moments.get_remote_video_url", names)
|
||||
self.assertNotIn("search_memory", names)
|
||||
self.assertNotIn("transcribe_voice_message", names)
|
||||
self.assertNotIn("transcribe_audio_file", names)
|
||||
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
|
||||
|
||||
def test_mcp_requires_token(self):
|
||||
client = self._client(auth=False)
|
||||
@@ -122,6 +190,11 @@ class TestMcpRouter(unittest.TestCase):
|
||||
self.assertIn("bundleText", payload)
|
||||
self.assertIn("WeChat MCP Copilot", payload["bundleText"])
|
||||
self.assertTrue(any(ref["path"] == "references/mobile.md" for ref in payload["references"]))
|
||||
self.assertFalse(any(ref["path"] == "references/system.md" for ref in payload["references"]))
|
||||
self.assertFalse(any(ref["path"] == "references/setup-system.md" for ref in payload["references"]))
|
||||
self.assertFalse(any(ref["path"] == "references/export.md" for ref in payload["references"]))
|
||||
for tool_name in self.REMOVED_MCP_TOOLS:
|
||||
self.assertNotIn(tool_name, payload["bundleText"])
|
||||
|
||||
def test_skill_text_can_be_loaded_over_http(self):
|
||||
client = self._client()
|
||||
@@ -292,6 +365,26 @@ class TestMcpRouter(unittest.TestCase):
|
||||
payload = resp.json()
|
||||
self.assertEqual(payload["error"]["code"], -32601)
|
||||
|
||||
def test_removed_mcp_tools_are_not_listed_or_callable(self):
|
||||
client = self._client()
|
||||
|
||||
tools_resp = client.post("/mcp", json=self._rpc("tools/list"))
|
||||
self.assertEqual(tools_resp.status_code, 200)
|
||||
names = {tool["name"] for tool in tools_resp.json()["result"]["tools"]}
|
||||
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
|
||||
|
||||
for tool_name in sorted(self.REMOVED_MCP_TOOLS):
|
||||
with self.subTest(tool_name=tool_name):
|
||||
direct_resp = client.post("/mcp", json=self._rpc(tool_name, {}))
|
||||
call_resp = client.post(
|
||||
"/mcp",
|
||||
json=self._rpc("tools/call", {"name": tool_name, "arguments": {}}),
|
||||
)
|
||||
self.assertEqual(direct_resp.status_code, 200)
|
||||
self.assertEqual(call_resp.status_code, 200)
|
||||
self.assertEqual(direct_resp.json()["error"]["code"], -32601)
|
||||
self.assertEqual(call_resp.json()["error"]["code"], -32601)
|
||||
|
||||
def test_missing_tool_name_returns_invalid_params(self):
|
||||
client = self._client()
|
||||
|
||||
@@ -347,6 +440,8 @@ class TestMcpRouter(unittest.TestCase):
|
||||
self.assertIn("/api/sns/media?", moments["url"])
|
||||
self.assertEqual(moments["params"]["post_id"], "post-a")
|
||||
self.assertEqual(moments["params"]["media_id"], "media-a")
|
||||
self.assertNotIn("use_cache", moments["params"])
|
||||
self.assertNotIn("use_cache", moments["url"])
|
||||
|
||||
def test_completed_mcp_packages_and_mobile_facade_are_listed(self):
|
||||
client = self._client()
|
||||
@@ -357,95 +452,36 @@ class TestMcpRouter(unittest.TestCase):
|
||||
tools = resp.json()["result"]["tools"]
|
||||
names = {tool["name"] for tool in tools}
|
||||
expected = {
|
||||
"wechat.setup.get_saved_keys",
|
||||
"wechat.setup.decrypt_databases",
|
||||
"wechat.setup.get_decrypt_stream_url",
|
||||
"wechat.setup.preview_import_decrypted",
|
||||
"wechat.setup.get_decrypt_all_media_stream_url",
|
||||
"wechat.system.health_check",
|
||||
"wechat.system.get_backend_port",
|
||||
"wechat.system.get_mcp_lan_access",
|
||||
"wechat.system.set_mcp_lan_access",
|
||||
"wechat.system.get_img_helper_status",
|
||||
"wechat.system.open_backend_log_file",
|
||||
"wechat.system.pick_directory",
|
||||
"wechat.system.set_backend_port_and_restart",
|
||||
"wechat.media.get_decrypted_resource_url",
|
||||
"wechat.media.get_proxy_image_url",
|
||||
"wechat.media.get_favicon_url",
|
||||
"wechat.media.open_chat_media_folder",
|
||||
"wechat.export.get_chat_export_events_url",
|
||||
"wechat.export.get_moments_export_events_url",
|
||||
"wechat.chat.get_realtime_events_url",
|
||||
"wechat.admin.delete_account_data",
|
||||
"wechat.mobile.get_overview",
|
||||
"wechat.mobile.resolve_target",
|
||||
"wechat.mobile.search_chat",
|
||||
"wechat.mobile.get_chat_context",
|
||||
"wechat.mobile.search_moments",
|
||||
"wechat.mobile.get_media_links",
|
||||
"wechat.mobile.export_job",
|
||||
}
|
||||
self.assertTrue(expected.issubset(names))
|
||||
self.assertFalse(self.REMOVED_MCP_TOOLS & names)
|
||||
self.assertNotIn("search_memory", names)
|
||||
self.assertNotIn("transcribe_voice_message", names)
|
||||
self.assertNotIn("transcribe_audio_file", names)
|
||||
|
||||
packages = {tool["annotations"]["package"] for tool in tools}
|
||||
self.assertTrue({"wechat.setup", "wechat.system", "wechat.mobile"}.issubset(packages))
|
||||
self.assertTrue({"wechat.core", "wechat.mobile", "wechat.media"}.issubset(packages))
|
||||
self.assertFalse({"wechat.setup", "wechat.export", "wechat.editing", "wechat.system", "wechat.admin"} & packages)
|
||||
|
||||
def test_new_url_helpers_return_urls_and_params(self):
|
||||
client = self._client()
|
||||
|
||||
checks = [
|
||||
(
|
||||
"wechat.setup.get_decrypt_stream_url",
|
||||
{"key": "a" * 64, "db_storage_path": r"D:\WeChat\db_storage"},
|
||||
"streamUrl",
|
||||
"/api/decrypt_stream?",
|
||||
),
|
||||
(
|
||||
"wechat.setup.get_import_decrypted_stream_url",
|
||||
{"import_path": r"D:\backup\wxid_a", "job_id": "job-1"},
|
||||
"streamUrl",
|
||||
"/api/import_decrypted?",
|
||||
),
|
||||
(
|
||||
"wechat.setup.get_decrypt_all_media_stream_url",
|
||||
{"account": "wxid_a", "xor_key": "0xA5", "concurrency": 3},
|
||||
"streamUrl",
|
||||
"/api/media/decrypt_all_stream?",
|
||||
),
|
||||
(
|
||||
"wechat.setup.get_download_all_emojis_stream_url",
|
||||
{"account": "wxid_a", "force": True, "concurrency": 4},
|
||||
"streamUrl",
|
||||
"/api/media/emoji/download_all_stream?",
|
||||
),
|
||||
(
|
||||
"wechat.media.get_decrypted_resource_url",
|
||||
{"account": "wxid_a", "md5": "a" * 32},
|
||||
"url",
|
||||
"/api/media/resource/",
|
||||
),
|
||||
(
|
||||
"wechat.chat.get_realtime_events_url",
|
||||
{"account": "wxid_a", "interval_ms": 300},
|
||||
"streamUrl",
|
||||
"/api/chat/realtime/stream?",
|
||||
),
|
||||
(
|
||||
"wechat.export.get_chat_export_events_url",
|
||||
{"export_id": "exp-1"},
|
||||
"streamUrl",
|
||||
"/api/chat/exports/exp-1/events",
|
||||
),
|
||||
(
|
||||
"wechat.export.get_moments_export_events_url",
|
||||
{"export_id": "sns-1"},
|
||||
"streamUrl",
|
||||
"/api/sns/exports/sns-1/events",
|
||||
),
|
||||
]
|
||||
|
||||
for tool_name, args, url_key, path_part in checks:
|
||||
@@ -456,26 +492,82 @@ class TestMcpRouter(unittest.TestCase):
|
||||
self.assertEqual(structured["status"], "success")
|
||||
self.assertIn(path_part, structured[url_key])
|
||||
|
||||
def test_setup_and_system_wrappers_call_underlying_router(self):
|
||||
def test_exposed_mcp_tools_are_read_only(self):
|
||||
client = self._client()
|
||||
keys_router = Mock()
|
||||
keys_router.get_saved_keys = AsyncMock(return_value={"status": "success", "keys": {"db_key": "k"}})
|
||||
system_router = Mock()
|
||||
system_router.get_img_helper_status = AsyncMock(return_value={"enabled": False})
|
||||
|
||||
with patch("wechat_decrypt_tool.mcp.tools._keys_router", return_value=keys_router), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._system_router", return_value=system_router
|
||||
):
|
||||
keys_resp = client.post(
|
||||
"/mcp",
|
||||
json=self._rpc("wechat.setup.get_saved_keys", {"account": "wxid_a", "wxid_dir": r"D:\WeChat\wxid_a"}),
|
||||
)
|
||||
helper_resp = client.post("/mcp", json=self._rpc("wechat.system.get_img_helper_status"))
|
||||
resp = client.post("/mcp", json=self._rpc("tools/list"))
|
||||
|
||||
self.assertEqual(keys_resp.status_code, 200)
|
||||
self.assertEqual(keys_resp.json()["result"]["structuredContent"]["keys"]["db_key"], "k")
|
||||
keys_router.get_saved_keys.assert_awaited_once_with(account="wxid_a", db_storage_path=None, wxid_dir=r"D:\WeChat\wxid_a")
|
||||
self.assertEqual(helper_resp.json()["result"]["structuredContent"], {"enabled": False})
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
tools = resp.json()["result"]["tools"]
|
||||
self.assertTrue(tools)
|
||||
for tool in tools:
|
||||
with self.subTest(tool_name=tool["name"]):
|
||||
annotations = tool.get("annotations") or {}
|
||||
self.assertTrue(annotations.get("readOnlyHint"))
|
||||
self.assertFalse(annotations.get("destructiveHint"))
|
||||
|
||||
def test_analytics_schema_does_not_expose_refresh(self):
|
||||
client = self._client()
|
||||
|
||||
resp = client.post("/mcp", json=self._rpc("tools/list"))
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
tools = {tool["name"]: tool for tool in resp.json()["result"]["tools"]}
|
||||
for tool_name in [
|
||||
"wechat.analytics.get_wrapped_meta",
|
||||
"wechat.analytics.get_wrapped_card",
|
||||
"wechat.analytics.get_wrapped_annual",
|
||||
]:
|
||||
with self.subTest(tool_name=tool_name):
|
||||
properties = tools[tool_name]["inputSchema"].get("properties") or {}
|
||||
self.assertNotIn("refresh", properties)
|
||||
|
||||
def test_analytics_tools_are_cache_only(self):
|
||||
client = self._client()
|
||||
|
||||
class FakeWrappedService:
|
||||
_CACHE_VERSION = 26
|
||||
_IMPLEMENTED_UPTO_ID = 7
|
||||
_WRAPPED_CARD_MANIFEST = ({"id": 0, "title": "Overview"},)
|
||||
|
||||
@staticmethod
|
||||
def _default_year():
|
||||
return 2025
|
||||
|
||||
def build_wrapped_annual_meta(self, **_kwargs):
|
||||
raise AssertionError("MCP analytics must not build wrapped meta.")
|
||||
|
||||
def build_wrapped_annual_card(self, **_kwargs):
|
||||
raise AssertionError("MCP analytics must not build wrapped card.")
|
||||
|
||||
def build_wrapped_annual_response(self, **_kwargs):
|
||||
raise AssertionError("MCP analytics must not build wrapped annual data.")
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
account_dir = Path(tmp) / "wxid_a"
|
||||
account_dir.mkdir()
|
||||
with patch("wechat_decrypt_tool.mcp.tools._resolve_account_dir", return_value=account_dir), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._wrapped_service", return_value=FakeWrappedService()
|
||||
):
|
||||
card_resp = client.post(
|
||||
"/mcp",
|
||||
json=self._rpc("wechat.analytics.get_wrapped_card", {"account": "wxid_a", "year": 2025, "card_id": 0}),
|
||||
)
|
||||
annual_resp = client.post(
|
||||
"/mcp",
|
||||
json=self._rpc("wechat.analytics.get_wrapped_annual", {"account": "wxid_a", "year": 2025}),
|
||||
)
|
||||
|
||||
self.assertFalse((account_dir / "_wrapped" / "cache").exists())
|
||||
|
||||
for resp in [card_resp, annual_resp]:
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
result = resp.json()["result"]
|
||||
self.assertTrue(result["isError"])
|
||||
structured = result["structuredContent"]
|
||||
self.assertEqual(structured["status"], "error")
|
||||
self.assertTrue(structured["cacheOnly"])
|
||||
self.assertEqual(structured["message"], "Wrapped cache not found. Open the app to generate it first.")
|
||||
|
||||
def test_mobile_overview_uses_compact_facade(self):
|
||||
client = self._client()
|
||||
@@ -486,12 +578,6 @@ class TestMcpRouter(unittest.TestCase):
|
||||
), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._list_sessions",
|
||||
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
|
||||
), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._chat_realtime_status", return_value={"status": "success", "available": True}
|
||||
), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._search_index_status", return_value={"status": "ready"}
|
||||
), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._session_last_message_status", return_value={"status": "ready"}
|
||||
):
|
||||
resp = client.post(
|
||||
"/mcp",
|
||||
@@ -504,6 +590,31 @@ class TestMcpRouter(unittest.TestCase):
|
||||
self.assertTrue(structured["ready"])
|
||||
self.assertEqual(structured["defaultAccount"], "wxid_a")
|
||||
self.assertIn("wechat.mobile.search_chat", structured["suggestedTools"])
|
||||
self.assertNotIn("wechat.mobile.export_job", structured["suggestedTools"])
|
||||
self.assertNotIn("realtime", structured["health"])
|
||||
self.assertNotIn("indexes", structured["health"])
|
||||
|
||||
def test_mobile_overview_does_not_expose_realtime_status(self):
|
||||
client = self._client()
|
||||
|
||||
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_a"]), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._get_account_info",
|
||||
return_value={"status": "success", "account": "wxid_a", "databaseCount": 3},
|
||||
), patch(
|
||||
"wechat_decrypt_tool.mcp.tools._list_sessions",
|
||||
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
|
||||
):
|
||||
resp = client.post(
|
||||
"/mcp",
|
||||
json=self._rpc("wechat.mobile.get_overview", {"account": "wxid_a", "session_limit": 5}),
|
||||
)
|
||||
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
payload = resp.json()
|
||||
self.assertNotIn("error", payload)
|
||||
structured = payload["result"]["structuredContent"]
|
||||
self.assertNotIn("realtime", structured["health"])
|
||||
self.assertNotIn("indexes", structured["health"])
|
||||
|
||||
def test_mobile_resolve_target_normalizes_candidates(self):
|
||||
client = self._client()
|
||||
|
||||
Reference in New Issue
Block a user