Compare commits

..

8 Commits

  • test(mcp): 补充 MCP 路由回归测试
    覆盖 MCP token 鉴权、skill bundle 读取、JSON-RPC 初始化、工具列表分页、直接工具调用、通知处理、错误响应和移动端 facade 工具。
    
    验证媒体与导出类工具只返回 URL 和参数,不内联二进制内容。
  • docs(mcp): 补充 MCP 接入说明与 Copilot Skill
    在 README 中补充 MCP 服务、鉴权方式、工具分层、Skill 获取地址和手机端接入流程。
    
    新增 wechat-mcp-copilot skill 及领域参考文档,约束外部 AI 按路由、分页和媒体 URL 规则访问微信数据。
  • feat(settings): 增加 MCP 局域网接入配置
    在设置面板新增 MCP 接入区,支持开启手机局域网访问、复制 MCP Token、重置 Token、复制 AI 接入提示词和 Skill Markdown。
    
    桌面端增加 MCP 局域网访问 IPC,并在切换监听地址后重启后端。
    
    后端运行时设置支持保存 backend_host 与 mcp_token,并同步写入 .env 供开发模式重启使用。
  • feat(mcp): 新增微信数据 MCP 服务
    新增 MCP JSON-RPC over HTTP 入口,支持 initialize、tools/list、tools/call、批量请求与通知响应。
    
    注册微信账号、联系人、聊天、朋友圈、媒体、导出、年度总结、系统管理等 MCP 工具,并为结果提供 structuredContent。
    
    增加 MCP token 鉴权,支持 Bearer、X-MCP-Token 和 query token。
  • fix(chat-search): 将会话列表外聊天纳入搜索索引
    构建聊天搜索索引时,合并 SessionTable、contact/stranger 与消息库 Name2Id 中存在消息表的联系人或群聊。
    
    这样左侧会话列表中不存在、但数据库里仍有消息记录的聊天也可以被全局搜索命中。
    
    提升搜索索引 schema 版本,触发旧索引自动重建,并补充默认搜索过滤下的回归测试。
  • fix(chat-export): 补充导出会话列表外的有消息聊天
    批量导出全部、群聊、单聊时,不再只依赖 SessionTable。
    
    导出目标会补充 contact/stranger 与消息库 Name2Id 中存在消息表的联系人或群聊,避免微信不显示会话从左侧列表消失后漏导。
    
    同时新增自定义范围,保留当前会话列表手动勾选导出的语义,并补充对应回归测试。
41 changed files with 5039 additions and 79 deletions
+35
View File
@@ -159,6 +159,41 @@ npm run dev
- API文档(默认): http://localhost:10392/docs
- 也可在应用内“设置 -> 后端端口”修改(支持“恢复默认”一键回到 10392):网页端会尝试重启本机后端到新端口并刷新(并写入 `output/runtime_settings.json`,开发模式下也会写入项目根目录 `.env``uv run` 下次启动使用);桌面端会重启内置后端并刷新
## MCP 服务
后端提供 MCP JSON-RPC over HTTP 服务,默认只监听 `127.0.0.1`。手机接入局域网时,在应用内打开 **设置 -> MCP 接入 -> 允许手机局域网接入 MCP**,后端会切换为监听 `0.0.0.0` 并重启。
MCP 入口需要 token 鉴权。设置页提供 **MCP Token**、**AI 接入提示词** 和 **Skill Markdown** 三个独立复制区;token 可一键复制或重置,重置后旧 token 立即失效。手机端或外部 AI 客户端访问 `/mcp``/mcp/skill/bundle``/mcp/skill` 时,都应带上 `Authorization: Bearer <MCP_TOKEN>`。兼容客户端也可以使用 `X-MCP-Token` 请求头或 `?token=` 查询参数,但推荐使用 Bearer token。
通用客户端可以通过 `GET /mcp/skill/bundle` 读取同一份 bundle,通过 `GET /mcp/skill` 读取 markdown 版本,这两个 skill 入口同样需要 MCP token。
工具调用成功时,客户端优先读取 `result.structuredContent``content[0].text` 只是给通用 MCP 客户端展示的 JSON 文本副本。业务未就绪时仍可能返回 JSON-RPC success,但 `result.isError=true`;协议错误或参数错误则返回 JSON-RPC `error`
工具按包分层:
- `wechat.core`: 状态、工具目录、账号列表、账号信息
- `wechat.setup`: 密钥读取、数据库解密、已解密目录导入、媒体密钥保存、批量媒体处理入口
- `wechat.system`: 健康检查、后端端口、日志路径、大图辅助插件状态等系统能力
- `wechat.mobile`: 面向手机和外部代理的聚合入口,默认返回小结果和下一步建议
- `wechat.contacts`: 联系人列表、模糊解析、联系人导出
- `wechat.chat`: 会话、消息、搜索、发送者筛选、上下文、锚点、合并转发/AppMsg 解析、统计
- `wechat.moments`: 朋友圈时间线、用户、同步、图片/视频/文章封面 URL
- `wechat.media`: 聊天/朋友圈图片、视频、表情、头像、语音文件 URL、远程图片代理与资源辅助
- `wechat.biz`: 公众号/服务号与微信支付记录
- `wechat.analytics`: 年度总结与卡片懒加载
- `wechat.export`: 聊天、朋友圈、账号归档导出任务、下载 URL 与进度 SSE URL
- `wechat.admin`: 微信检测、索引、实时同步等管理能力
- `wechat.editing`: 消息编辑、修复、恢复与审计
媒体、视频、SSE 进度和 ZIP 导出不会直接塞进 MCP JSON 响应;相关工具返回可访问 URL、`streamUrl`、任务状态或资源参数。
配套 skill 可通过 HTTP 加载,访问时需要带 MCP token:
- JSON bundle: `http://<电脑局域网IP>:10392/mcp/skill/bundle`
- Markdown bundle: `http://<电脑局域网IP>:10392/mcp/skill`
手机端或外部 AI 客户端应先拉取 skill bundle,将 `bundleText` 注入模型上下文,再按 `initialize``tools/list``tools/call` 使用 MCP。设置页中的“AI 接入提示词”会包含 endpoint 和 Bearer token,可直接复制给客户端作为接入指令。
## 打包为 EXEWindows 桌面端)
本项目提供基于 Electron 的桌面端安装包(NSIS `Setup.exe`)。
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "wechat-data-analysis-desktop",
"version": "1.8.1",
"version": "1.9.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "wechat-data-analysis-desktop",
"version": "1.8.1",
"version": "1.9.0",
"dependencies": {
"electron-updater": "^6.7.3"
},
+1 -1
View File
@@ -1,7 +1,7 @@
{
"name": "wechat-data-analysis-desktop",
"private": true,
"version": "1.8.1",
"version": "1.9.0",
"main": "src/main.cjs",
"scripts": {
"dev": "node scripts/dev.cjs",
+95 -7
View File
@@ -30,7 +30,8 @@ const {
normalizeDirectoryPath,
} = require("./output-dir.cjs");
const DEFAULT_BACKEND_HOST = String(process.env.WECHAT_TOOL_HOST || "127.0.0.1").trim() || "127.0.0.1";
const DEFAULT_BACKEND_HOST = "127.0.0.1";
const LAN_BACKEND_HOST = "0.0.0.0";
const DEFAULT_BACKEND_PORT = parsePort(process.env.WECHAT_TOOL_PORT) ?? 10392;
let backendProc = null;
@@ -86,7 +87,11 @@ function formatHostForUrl(host) {
}
function getBackendBindHost() {
return DEFAULT_BACKEND_HOST;
const envHost = String(process.env.WECHAT_TOOL_HOST || "").trim();
if (envHost === LAN_BACKEND_HOST || envHost === "::") return LAN_BACKEND_HOST;
if (envHost === DEFAULT_BACKEND_HOST || envHost === "localhost" || envHost === "::1") return DEFAULT_BACKEND_HOST;
if (!app.isPackaged) return DEFAULT_BACKEND_HOST;
return loadDesktopSettings()?.mcpLanAccessEnabled ? LAN_BACKEND_HOST : DEFAULT_BACKEND_HOST;
}
function getBackendAccessHost() {
@@ -116,6 +121,18 @@ function setBackendPortSetting(nextPort) {
return p;
}
function getMcpLanAccessEnabled() {
return getBackendBindHost() === LAN_BACKEND_HOST;
}
function setMcpLanAccessSetting(enabled) {
loadDesktopSettings();
desktopSettings.mcpLanAccessEnabled = !!enabled;
persistDesktopSettings();
process.env.WECHAT_TOOL_HOST = desktopSettings.mcpLanAccessEnabled ? LAN_BACKEND_HOST : DEFAULT_BACKEND_HOST;
return desktopSettings.mcpLanAccessEnabled;
}
function getBackendHealthUrl() {
const host = formatHostForUrl(getBackendAccessHost());
const port = getBackendPort();
@@ -128,6 +145,12 @@ function getBackendUiUrl() {
return `http://${host}:${port}/`;
}
function getDesktopUiUrl() {
const explicit = String(process.env.ELECTRON_START_URL || "").trim();
if (explicit) return explicit;
return app.isPackaged ? getBackendUiUrl() : "http://localhost:3000";
}
function isPortAvailable(port, host) {
return new Promise((resolve) => {
try {
@@ -597,6 +620,8 @@ function loadDesktopSettings() {
ignoredUpdateVersion: "",
// Backend (FastAPI) listens on this port. Used in packaged builds.
backendPort: DEFAULT_BACKEND_PORT,
// When enabled, the backend binds to 0.0.0.0 so phone clients can reach /mcp.
mcpLanAccessEnabled: false,
// Custom output dir; empty string means use the default dataDir/output.
outputDir: "",
// Pending output dir written by the installer before the next app startup.
@@ -623,6 +648,7 @@ function loadDesktopSettings() {
const parsed = JSON.parse(raw || "{}");
desktopSettings = { ...defaults, ...(parsed && typeof parsed === "object" ? parsed : {}) };
desktopSettings.backendPort = parsePort(desktopSettings.backendPort) ?? defaults.backendPort;
desktopSettings.mcpLanAccessEnabled = !!desktopSettings.mcpLanAccessEnabled;
desktopSettings.outputDir = safeNormalizeDirectory(desktopSettings.outputDir || "");
desktopSettings.pendingOutputDir =
parsed && typeof parsed === "object" && Object.prototype.hasOwnProperty.call(parsed, "pendingOutputDir")
@@ -2275,7 +2301,7 @@ function registerWindowIpc() {
const prevPort = getBackendPort();
if (nextPort === prevPort) {
return { success: true, changed: false, port: prevPort, uiUrl: getBackendUiUrl() };
return { success: true, changed: false, port: prevPort, uiUrl: getDesktopUiUrl() };
}
const bindHost = getBackendBindHost();
@@ -2296,7 +2322,7 @@ function registerWindowIpc() {
throw err;
}
const uiUrl = getBackendUiUrl();
const uiUrl = getDesktopUiUrl();
setTimeout(() => {
try {
if (!mainWindow || mainWindow.isDestroyed()) return;
@@ -2312,6 +2338,70 @@ function registerWindowIpc() {
}
});
ipcMain.handle("backend:getMcpLanAccess", () => {
try {
return {
enabled: getMcpLanAccessEnabled(),
host: getBackendBindHost(),
port: getBackendPort(),
uiUrl: getDesktopUiUrl(),
};
} catch (err) {
logMain(`[main] backend:getMcpLanAccess failed: ${err?.message || err}`);
return {
enabled: false,
host: DEFAULT_BACKEND_HOST,
port: DEFAULT_BACKEND_PORT,
uiUrl: getDesktopUiUrl(),
};
}
});
ipcMain.handle("backend:setMcpLanAccess", async (_event, enabled) => {
if (backendPortChangeInProgress) throw new Error("后端切换中,请稍后重试");
const nextEnabled = !!enabled;
const prevEnabled = getMcpLanAccessEnabled();
if (nextEnabled === prevEnabled) {
return {
success: true,
changed: false,
enabled: prevEnabled,
host: getBackendBindHost(),
port: getBackendPort(),
uiUrl: getDesktopUiUrl(),
};
}
backendPortChangeInProgress = true;
try {
setMcpLanAccessSetting(nextEnabled);
try {
await restartBackend({ timeoutMs: 30_000 });
} catch (err) {
setMcpLanAccessSetting(prevEnabled);
try {
await restartBackend({ timeoutMs: 30_000 });
} catch {}
throw err;
}
const uiUrl = getDesktopUiUrl();
logMain(`[main] MCP access changed enabled=${nextEnabled}; backend restarted without UI reload`);
return {
success: true,
changed: true,
enabled: nextEnabled,
host: getBackendBindHost(),
port: getBackendPort(),
uiUrl,
};
} finally {
backendPortChangeInProgress = false;
}
});
ipcMain.handle("app:getVersion", () => {
try {
return app.getVersion();
@@ -2510,9 +2600,7 @@ async function main() {
mainWindow = win;
ensureTrayForCloseBehavior();
const startUrl =
process.env.ELECTRON_START_URL ||
(app.isPackaged ? getBackendUiUrl() : "http://localhost:3000");
const startUrl = getDesktopUiUrl();
logMain(`[main] debugEnabled=${debugEnabled()} startUrl=${startUrl}`);
await loadWithRetry(win, startUrl);
+2
View File
@@ -78,6 +78,8 @@ contextBridge.exposeInMainWorld("wechatDesktop", {
getBackendPort: () => ipcRenderer.invoke("backend:getPort"),
setBackendPort: (port) => ipcRenderer.invoke("backend:setPort", Number(port)),
getMcpLanAccess: () => ipcRenderer.invoke("backend:getMcpLanAccess"),
setMcpLanAccess: (enabled) => ipcRenderer.invoke("backend:setMcpLanAccess", !!enabled),
chooseDirectory: (options = {}) => ipcRenderer.invoke("dialog:chooseDirectory", options),
+328
View File
@@ -261,6 +261,104 @@
</div>
</section>
<section ref="mcpSectionRef">
<div class="mb-2.5 text-[12px] font-bold text-[#999] tracking-widest">MCP 接入</div>
<div class="overflow-hidden rounded-[10px] border border-[#e7e7e7] bg-white divide-y divide-[#ececec]">
<div class="px-3.5 py-3">
<div class="flex items-center justify-between gap-3">
<div class="min-w-0 flex-1">
<div class="text-[13px] font-medium text-[#222]">允许手机局域网接入 MCP</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">开启后后端监听 0.0.0.0手机可通过接入提示词中的地址接入</div>
<div v-if="mcpLanAccessMessage" class="mt-1 text-[11px] leading-relaxed text-[#1b6b43]">{{ mcpLanAccessMessage }}</div>
<div v-if="mcpLanAccessError" class="mt-1 text-[11px] leading-relaxed text-red-600">{{ mcpLanAccessError }}</div>
</div>
<button
type="button"
role="switch"
:aria-checked="mcpLanAccessEnabled"
class="settings-switch shrink-0"
:class="switchTrackClass(mcpLanAccessEnabled, mcpLanAccessLoading)"
:disabled="mcpLanAccessLoading"
@click="toggleMcpLanAccess"
>
<span class="settings-switch-thumb" :class="mcpLanAccessEnabled ? 'translate-x-[20px]' : 'translate-x-0'" />
</button>
</div>
</div>
<div class="px-3.5 py-3">
<div class="flex flex-col gap-2">
<div class="flex flex-col gap-2 sm:flex-row sm:items-start sm:justify-between">
<div class="min-w-0 flex-1">
<div class="text-[13px] font-medium text-[#222]">MCP Token</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">手机端请求 MCP 时使用 Bearer token</div>
<div v-if="mcpTokenError" class="mt-1 text-[11px] leading-relaxed text-red-600">{{ mcpTokenError }}</div>
</div>
<div class="flex shrink-0 gap-1.5">
<button
type="button"
class="rounded-[6px] border border-[#e2e2e2] bg-white px-2 py-1 text-[12px] text-[#222] transition hover:bg-[#f9f9f9]"
:disabled="mcpTokenLoading || !mcpToken"
@click="copyMcpText('token', mcpToken)"
>
{{ mcpCopiedKey === 'token' ? '已复制' : (mcpTokenLoading ? '加载中...' : '复制 Token') }}
</button>
<button
type="button"
class="rounded-[6px] border border-[#e2e2e2] bg-white px-2 py-1 text-[12px] text-[#222] transition hover:bg-[#f9f9f9]"
:disabled="mcpTokenLoading"
@click="resetMcpToken"
>
{{ mcpCopiedKey === 'token-reset' ? '已重置' : '重置' }}
</button>
</div>
</div>
<pre class="max-h-[92px] overflow-auto rounded-[6px] bg-[#f7f7f7] px-2.5 py-2 text-[11px] leading-relaxed text-[#333] scrollbar-custom whitespace-pre-wrap">{{ mcpTokenText }}</pre>
</div>
</div>
<div class="px-3.5 py-3">
<div class="flex flex-col gap-2">
<div class="flex flex-col gap-2 sm:flex-row sm:items-start sm:justify-between">
<div class="min-w-0 flex-1">
<div class="text-[13px] font-medium text-[#222]">AI 接入提示词</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">复制到手机端 AI 的系统提示词或连接说明里</div>
</div>
<button
type="button"
class="shrink-0 rounded-[6px] border border-[#e2e2e2] bg-white px-2 py-1 text-[12px] text-[#222] transition hover:bg-[#f9f9f9]"
@click="copyMcpText('ai-prompt', mcpAiPrompt)"
>
{{ mcpCopiedKey === 'ai-prompt' ? '已复制' : '复制提示词' }}
</button>
</div>
<pre class="max-h-[220px] overflow-auto rounded-[6px] bg-[#f7f7f7] px-2.5 py-2 text-[11px] leading-relaxed text-[#333] scrollbar-custom whitespace-pre-wrap">{{ mcpAiPrompt }}</pre>
</div>
</div>
<div class="px-3.5 py-3">
<div class="flex flex-col gap-2">
<div class="flex flex-col gap-2 sm:flex-row sm:items-start sm:justify-between">
<div class="min-w-0 flex-1">
<div class="text-[13px] font-medium text-[#222]">Skill Markdown</div>
<div class="mt-0.5 text-[11px] leading-relaxed text-[#909090]">单独复制到手机端 AI skill 或知识配置</div>
<div v-if="mcpSkillBundleError" class="mt-1 text-[11px] leading-relaxed text-red-600">{{ mcpSkillBundleError }}</div>
</div>
<button
type="button"
class="shrink-0 rounded-[6px] border border-[#e2e2e2] bg-white px-2 py-1 text-[12px] text-[#222] transition hover:bg-[#f9f9f9]"
:disabled="mcpSkillBundleLoading"
@click="copyMcpText('skill', mcpSkillText)"
>
{{ mcpCopiedKey === 'skill' ? '已复制' : (mcpSkillBundleLoading ? '加载中...' : '复制 Skill') }}
</button>
</div>
<pre class="max-h-[420px] overflow-auto rounded-[6px] bg-[#f7f7f7] px-2.5 py-2 text-[11px] leading-relaxed text-[#333] scrollbar-custom whitespace-pre-wrap">{{ mcpSkillText }}</pre>
</div>
</div>
</div>
</section>
<section ref="startupSectionRef">
<div class="mb-2.5 text-[12px] font-bold text-[#999] tracking-widest">启动偏好</div>
<div class="overflow-hidden rounded-[10px] border border-[#e7e7e7] bg-white divide-y divide-[#ececec]">
@@ -376,6 +474,7 @@ const emit = defineEmits(['close'])
const settingNavItems = [
{ key: 'desktop', label: '桌面行为', hint: '启动 / 关闭 / 端口' },
{ key: 'mcp', label: 'MCP 接入', hint: '手机 / Skill / 工具' },
{ key: 'startup', label: '启动偏好', hint: '自动实时 / 默认页面' },
{ key: 'updates', label: '更新', hint: '版本信息 / 检查更新' },
{ key: 'sns', label: '朋友圈', hint: '图片缓存策略' },
@@ -384,6 +483,7 @@ const settingNavItems = [
const activeSection = ref(settingNavItems[0].key)
const contentScrollRef = ref(null)
const desktopSectionRef = ref(null)
const mcpSectionRef = ref(null)
const startupSectionRef = ref(null)
const updatesSectionRef = ref(null)
const snsSectionRef = ref(null)
@@ -497,6 +597,65 @@ const desktopLogFileText = computed(() => {
return v || '—'
})
const mcpLanAccessEnabled = ref(false)
const mcpLanAccessLoading = ref(false)
const mcpLanAccessError = ref('')
const mcpLanAccessMessage = ref('')
const mcpToken = ref('')
const mcpTokenLoading = ref(false)
const mcpTokenError = ref('')
const mcpSkillBundleText = ref('')
const mcpSkillBundleLoading = ref(false)
const mcpSkillBundleError = ref('')
const mcpCopiedKey = ref('')
let mcpCopiedTimer = null
const mcpPortText = computed(() => {
const n = Number(String(desktopBackendPortInput.value || '').trim())
if (Number.isInteger(n) && n >= 1 && n <= 65535) return String(n)
return '10392'
})
const mcpEndpoint = computed(() => {
if (!process.client || typeof window === 'undefined') return `http://127.0.0.1:${mcpPortText.value}/mcp`
const apiBase = useApiBase()
if (/^https?:\/\//i.test(apiBase)) {
try {
const u = new URL(apiBase)
return `${u.origin}/mcp`
} catch {}
}
const protocol = window.location?.protocol === 'https:' ? 'https:' : 'http:'
const host = String(window.location?.hostname || '127.0.0.1').trim() || '127.0.0.1'
return `${protocol}//${host}:${mcpPortText.value}/mcp`
})
const mcpSkillFallback = [
'# WeChat MCP Copilot',
'',
'Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targets, then fetch only the context needed to answer.',
'',
'Core rules:',
'1. Start with initialize and tools/list.',
'2. Prefer compact mobile facade tools before low-level tools.',
'3. Keep limits small, page results, and expand only when needed.',
'4. Use returned URLs for media and exports instead of inlining binary content.',
].join('\n')
const mcpAiPrompt = computed(() => [
'你现在可以通过 WeChatDataAnalysis MCP 访问本机微信数据。',
`MCP endpoint: ${mcpEndpoint.value}`,
`Authorization: Bearer ${mcpToken.value || '<MCP_TOKEN>'}`,
'',
'接入要求:',
'1. 使用 JSON-RPC 2.0 POST 到 MCP endpointContent-Type 为 application/json,并带上 Authorization Bearer token。',
'2. 先调用 initialize,再用 tools/list 分页读取工具 schema。',
'3. 工具调用使用 tools/call,优先读取 result.structuredContent。',
'4. 不要一次性请求大结果;按下方 skill 的分页和上下文预算逐步扩展。',
'5. 媒体、导出和 SSE 进度按返回 URL 在 App 侧加载,不要让模型内联二进制内容。',
].join('\n'))
const mcpSkillText = computed(() => mcpSkillBundleText.value || mcpSkillFallback)
const mcpTokenText = computed(() => mcpToken.value || '加载中...')
const switchTrackClass = (enabled, disabled = false) => {
if (disabled) return enabled ? 'bg-[#07b75b] opacity-50 cursor-not-allowed' : 'bg-[#d0d0d0] opacity-50 cursor-not-allowed'
return enabled ? 'bg-[#07b75b] hover:brightness-95' : 'bg-[#d0d0d0] hover:brightness-95'
@@ -535,6 +694,7 @@ const refreshDesktopOutputDirProgress = async () => {
const sectionElements = computed(() => [
{ key: 'desktop', el: desktopSectionRef.value },
{ key: 'mcp', el: mcpSectionRef.value },
{ key: 'startup', el: startupSectionRef.value },
{ key: 'updates', el: updatesSectionRef.value },
{ key: 'sns', el: snsSectionRef.value },
@@ -591,6 +751,164 @@ const fetchAdminEndpoint = async (url, options = {}) => {
}
}
const waitForBackendHealth = async (timeoutMs = 30_000) => {
if (!process.client || typeof window === 'undefined') return
const apiBase = useApiBase()
const healthUrl = `${String(apiBase || '').replace(/\/api\/?$/, '')}/api/health`
const startedAt = Date.now()
while (true) {
try {
const r = await fetch(healthUrl, { method: 'GET' })
if (r && r.status < 500) return
} catch {}
if (Date.now() - startedAt > timeoutMs) throw new Error(`后端启动超时:${healthUrl}`)
await new Promise((resolve) => setTimeout(resolve, 400))
}
}
const copyMcpText = async (key, text) => {
if (!process.client || typeof window === 'undefined') return
const value = String(text || '').trim()
if (!value) return
try {
if (navigator?.clipboard?.writeText) {
await navigator.clipboard.writeText(value)
} else {
const el = document.createElement('textarea')
el.value = value
el.setAttribute('readonly', '')
el.style.position = 'fixed'
el.style.left = '-9999px'
document.body.appendChild(el)
el.select()
document.execCommand('copy')
document.body.removeChild(el)
}
mcpCopiedKey.value = key
if (mcpCopiedTimer) clearTimeout(mcpCopiedTimer)
mcpCopiedTimer = setTimeout(() => {
if (mcpCopiedKey.value === key) mcpCopiedKey.value = ''
}, 1600)
} catch {}
}
const refreshMcpLanAccess = async () => {
if (!process.client || typeof window === 'undefined') return
mcpLanAccessLoading.value = true
mcpLanAccessError.value = ''
try {
if (window.wechatDesktop?.getMcpLanAccess) {
const resp = await window.wechatDesktop.getMcpLanAccess()
mcpLanAccessEnabled.value = !!resp?.enabled
return
}
const resp = await fetchAdminEndpoint('/admin/mcp-access')
mcpLanAccessEnabled.value = !!resp?.enabled
} catch (e) {
mcpLanAccessError.value = e?.message || '读取 MCP 接入状态失败'
} finally {
mcpLanAccessLoading.value = false
}
}
const refreshMcpToken = async () => {
if (!process.client || typeof window === 'undefined') return
mcpTokenLoading.value = true
mcpTokenError.value = ''
try {
const resp = await fetchAdminEndpoint('/admin/mcp-token')
const token = String(resp?.token || '').trim()
if (!token) throw new Error('MCP token is empty')
mcpToken.value = token
} catch (e) {
mcpTokenError.value = e?.message || '读取 MCP Token 失败'
} finally {
mcpTokenLoading.value = false
}
}
const resetMcpToken = async () => {
if (!process.client || typeof window === 'undefined') return
mcpTokenLoading.value = true
mcpTokenError.value = ''
try {
const resp = await fetchAdminEndpoint('/admin/mcp-token/reset', { method: 'POST' })
const token = String(resp?.token || '').trim()
if (!token) throw new Error('MCP token is empty')
mcpToken.value = token
mcpCopiedKey.value = 'token-reset'
if (mcpCopiedTimer) clearTimeout(mcpCopiedTimer)
mcpCopiedTimer = setTimeout(() => {
if (mcpCopiedKey.value === 'token-reset') mcpCopiedKey.value = ''
}, 1600)
await refreshMcpSkillBundle()
} catch (e) {
mcpTokenError.value = e?.message || '重置 MCP Token 失败'
} finally {
mcpTokenLoading.value = false
}
}
const refreshMcpSkillBundle = async () => {
if (!process.client || typeof window === 'undefined') return
mcpSkillBundleLoading.value = true
mcpSkillBundleError.value = ''
try {
if (!mcpToken.value) await refreshMcpToken()
const resp = await $fetch('/mcp/skill/bundle', {
baseURL: mcpEndpoint.value.replace(/\/mcp$/, ''),
headers: mcpToken.value ? { Authorization: `Bearer ${mcpToken.value}` } : {},
})
const bundleText = String(resp?.bundleText || '').trim()
if (!bundleText) throw new Error('Skill bundle is empty')
mcpSkillBundleText.value = bundleText
} catch (e) {
mcpSkillBundleError.value = e?.message || '读取 skill 失败,当前显示内置精简版'
if (!mcpSkillBundleText.value) mcpSkillBundleText.value = ''
} finally {
mcpSkillBundleLoading.value = false
}
}
const setMcpLanAccess = async (enabled) => {
if (!process.client || typeof window === 'undefined') return
mcpLanAccessLoading.value = true
mcpLanAccessError.value = ''
mcpLanAccessMessage.value = ''
const previous = mcpLanAccessEnabled.value
mcpLanAccessEnabled.value = !!enabled
try {
if (window.wechatDesktop?.setMcpLanAccess) {
const resp = await window.wechatDesktop.setMcpLanAccess(!!enabled)
mcpLanAccessEnabled.value = !!resp?.enabled
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,后端已重启。' : 'MCP 局域网接入状态未变化。'
return
}
const resp = await fetchAdminEndpoint('/admin/mcp-access', {
method: 'POST',
body: { enabled: !!enabled },
})
mcpLanAccessEnabled.value = !!resp?.enabled
mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,正在等待后端重启。' : 'MCP 局域网接入状态未变化。'
if (resp?.changed) {
await waitForBackendHealth(30_000)
mcpLanAccessMessage.value = 'MCP 局域网接入已更新,后端已恢复。'
}
} catch (e) {
mcpLanAccessEnabled.value = previous
mcpLanAccessError.value = e?.message || '设置 MCP 接入状态失败'
await refreshMcpLanAccess()
} finally {
mcpLanAccessLoading.value = false
}
}
const toggleMcpLanAccess = async () => {
if (mcpLanAccessLoading.value) return
await setMcpLanAccess(!mcpLanAccessEnabled.value)
}
const refreshDesktopAutoLaunch = async () => {
if (!process.client || typeof window === 'undefined') return
if (!window.wechatDesktop?.getAutoLaunch) return
@@ -945,6 +1263,9 @@ const onDesktopCheckUpdates = async () => {
watch(() => props.open, async (isOpen) => {
if (!isOpen) return
await refreshMcpLanAccess()
await refreshMcpToken()
await refreshMcpSkillBundle()
await refreshBackendLogFileInfo()
if (isDesktopEnv.value) {
await refreshDesktopOutputDir()
@@ -969,6 +1290,9 @@ onMounted(async () => {
snsUseCache.value = readLocalBoolSetting(SNS_SETTING_USE_CACHE_KEY, true)
await refreshDesktopBackendPort()
await refreshMcpLanAccess()
await refreshMcpToken()
await refreshMcpSkillBundle()
if (isDesktopEnv.value) {
void desktopUpdate.initListeners()
await refreshDesktopAutoLaunch()
@@ -984,6 +1308,10 @@ onMounted(async () => {
onBeforeUnmount(() => {
if (!process.client || typeof window === 'undefined') return
window.removeEventListener('keydown', onEscKeydown)
if (mcpCopiedTimer) {
clearTimeout(mcpCopiedTimer)
mcpCopiedTimer = null
}
if (typeof removeDesktopOutputDirProgressListener === 'function') {
removeDesktopOutputDirProgressListener()
removeDesktopOutputDirProgressListener = null
+18 -7
View File
@@ -1308,28 +1308,38 @@
<button
type="button"
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
:class="exportScope === 'selected' && exportListTab === 'all' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
:class="exportScope === 'all' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
@click="onExportBatchScopeClick('all')"
>
全部 {{ exportContactCounts.total }}
全部 {{ exportTargetsLoading ? '...' : exportContactCounts.total }}
</button>
<button
type="button"
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
:class="exportScope === 'selected' && exportListTab === 'groups' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
:class="exportScope === 'groups' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
@click="onExportBatchScopeClick('groups')"
>
群聊 {{ exportContactCounts.groups }}
群聊 {{ exportTargetsLoading ? '...' : exportContactCounts.groups }}
</button>
<button
type="button"
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
:class="exportScope === 'selected' && exportListTab === 'singles' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
:class="exportScope === 'singles' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
@click="onExportBatchScopeClick('singles')"
>
单聊 {{ exportContactCounts.singles }}
单聊 {{ exportTargetsLoading ? '...' : exportContactCounts.singles }}
</button>
<button
type="button"
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
:class="exportScope === 'selected' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
@click="onExportCustomScopeClick"
>
自定义
</button>
</div>
<div v-if="exportTargetsLoading" class="mt-1 text-[11px] text-gray-400">正在同步导出范围...</div>
<div v-else-if="exportTargetsError" class="mt-1 text-[11px] text-red-500">{{ exportTargetsError }}</div>
</div>
<div>
@@ -1395,7 +1405,7 @@
<div v-if="exportScope === 'selected'" class="mt-3">
<div class="mb-2 text-xs text-gray-500">
点击上方范围可筛选并默认全选当前结果再次点击可取消全选下方整行可点选会话
下方整行可点选会话搜索只影响当前自定义列表
</div>
<div class="flex items-center gap-2 mb-2">
<input
@@ -1424,6 +1434,7 @@
<div class="text-sm text-gray-800 truncate">
{{ c.name }}
<span class="text-xs text-gray-500">{{ c.isGroup ? '(群)' : '' }}</span>
<span v-if="!c.inSessionList" class="ml-1 text-[10px] text-[#03C160]">补充</span>
</div>
<div class="text-xs text-gray-500 truncate">{{ c.username }}</div>
</div>
+100 -7
View File
@@ -45,6 +45,10 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
const exportSearchQuery = ref('')
const exportListTab = ref('all')
const exportSelectedUsernames = ref([])
const exportTargetContacts = ref([])
const exportTargetsLoading = ref(false)
const exportTargetsLoaded = ref(false)
const exportTargetsError = ref('')
const exportJob = ref(null)
let exportPollTimer = null
@@ -121,9 +125,35 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
}, [])
}
const normalizeExportTargetContact = (item) => {
const username = String(item?.username || '').trim()
const name = String(item?.name || item?.displayName || username).trim() || username
return {
...item,
username,
name,
displayName: name,
avatar: String(item?.avatar || '').trim(),
isGroup: item?.isGroup != null ? !!item.isGroup : username.endsWith('@chatroom'),
isHidden: !!item?.isHidden,
inSessionList: item?.inSessionList == null ? true : !!item.inSessionList
}
}
const getLocalExportContacts = () => {
return (Array.isArray(contacts.value) ? contacts.value : [])
.map(normalizeExportTargetContact)
.filter((contact) => !!contact.username)
}
const getExportBaseContacts = () => {
if (exportTargetsLoaded.value) return exportTargetContacts.value
return getLocalExportContacts()
}
const getExportFilteredContacts = ({ tab = exportListTab.value, query = exportSearchQuery.value } = {}) => {
const normalizedQuery = String(query || '').trim().toLowerCase()
let list = Array.isArray(contacts.value) ? contacts.value : []
let list = getExportBaseContacts()
const normalizedTab = String(tab || 'all')
if (normalizedTab === 'groups') list = list.filter((contact) => !!contact?.isGroup)
@@ -142,7 +172,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
})
const exportContactCounts = computed(() => {
const list = Array.isArray(contacts.value) ? contacts.value : []
const list = getExportBaseContacts()
const total = list.length
const groups = list.filter((contact) => !!contact?.isGroup).length
return { total, groups, singles: total - groups }
@@ -198,8 +228,63 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
}
const onExportBatchScopeClick = (tab) => {
const nextTab = String(tab || 'all')
exportListTab.value = nextTab
exportScope.value = nextTab === 'groups' || nextTab === 'singles' ? nextTab : 'all'
selectExportFilteredContacts(nextTab)
}
const onExportCustomScopeClick = () => {
exportScope.value = 'selected'
onExportListTabClick(tab)
if (exportSelectedUsernames.value.length === 0) {
selectExportFilteredContacts(exportListTab.value)
}
}
const loadExportTargets = async ({ selectFiltered = false } = {}) => {
if (!selectedAccount.value) return
const selectedBeforeLoad = normalizeExportSelectedUsernames(exportSelectedUsernames.value)
const filteredBeforeLoad = getExportFilteredUsernames(exportListTab.value)
const selectedWasCurrentFiltered =
filteredBeforeLoad.length > 0 &&
filteredBeforeLoad.length === selectedBeforeLoad.length &&
filteredBeforeLoad.every((username) => selectedBeforeLoad.includes(username))
exportTargetsLoading.value = true
exportTargetsError.value = ''
try {
const response = await api.getChatExportTargets({
account: selectedAccount.value,
include_hidden: true,
include_official: false
})
const selectedNow = normalizeExportSelectedUsernames(exportSelectedUsernames.value)
const filteredNowBeforeLoad = getExportFilteredUsernames(exportListTab.value)
const selectedNowMatchesPreloadFilter =
filteredBeforeLoad.length > 0 &&
filteredBeforeLoad.length === selectedNow.length &&
filteredBeforeLoad.every((username) => selectedNow.includes(username))
const selectedNowMatchesCurrentFilter =
filteredNowBeforeLoad.length > 0 &&
filteredNowBeforeLoad.length === selectedNow.length &&
filteredNowBeforeLoad.every((username) => selectedNow.includes(username))
const targets = Array.isArray(response?.targets) ? response.targets : []
exportTargetContacts.value = targets
.map(normalizeExportTargetContact)
.filter((contact) => !!contact.username)
exportTargetsLoaded.value = true
if (selectFiltered || selectedWasCurrentFiltered || selectedNowMatchesPreloadFilter || selectedNowMatchesCurrentFilter) {
selectExportFilteredContacts(exportListTab.value)
}
} catch (error) {
exportTargetsLoaded.value = false
exportTargetContacts.value = []
exportTargetsError.value = error?.message || '加载导出范围失败'
if (!exportError.value) {
exportError.value = `加载导出范围失败:${exportTargetsError.value}`
}
} finally {
exportTargetsLoading.value = false
}
}
const isDesktopExportRuntime = () => {
@@ -407,6 +492,9 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
const openExportModal = () => {
exportModalOpen.value = true
exportError.value = ''
exportTargetsError.value = ''
exportTargetsLoaded.value = false
exportTargetContacts.value = []
resetExportSaveFeedback({ resetAutoSavedFor: true })
exportCancelRequested.value = false
exportSearchQuery.value = ''
@@ -417,9 +505,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
exportMessageTypes.value = exportMessageTypeOptions.map((item) => item.value)
exportAutoSavedFor.value = ''
exportScope.value = selectedContact.value?.username ? 'current' : 'selected'
if (!selectedContact.value?.username) {
selectExportFilteredContacts('all')
}
loadExportTargets({ selectFiltered: !selectedContact.value?.username })
}
const closeExportModal = () => {
@@ -488,6 +574,9 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
}
} else if (scope === 'selected') {
usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : []
} else if (scope !== 'all' && scope !== 'groups' && scope !== 'singles') {
scope = 'selected'
usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : []
}
if (scope === 'selected' && (!usernames || usernames.length === 0)) {
@@ -547,7 +636,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
format: exportFormat.value,
start_time: startTime,
end_time: endTime,
include_hidden: false,
include_hidden: scope === 'all' || scope === 'groups' || scope === 'singles',
include_official: false,
message_types: messageTypes,
include_media: includeMedia,
@@ -615,9 +704,13 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
exportJob,
exportOverallPercent,
exportCurrentPercent,
exportTargetsLoading,
exportTargetsLoaded,
exportTargetsError,
exportFilteredContacts,
exportContactCounts,
onExportBatchScopeClick,
onExportCustomScopeClick,
onExportListTabClick,
isExportContactSelected,
hasWebExportFolder,
+10
View File
@@ -449,6 +449,15 @@ export const useApi = () => {
return await request('/chat/exports')
}
const getChatExportTargets = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.include_hidden != null) query.set('include_hidden', String(!!params.include_hidden))
if (params && params.include_official != null) query.set('include_official', String(!!params.include_official))
const url = '/chat/exports/targets' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
const cancelChatExport = async (exportId) => {
if (!exportId) throw new Error('Missing exportId')
return await request(`/chat/exports/${encodeURIComponent(String(exportId))}`, { method: 'DELETE' })
@@ -693,6 +702,7 @@ export const useApi = () => {
createChatExport,
getChatExport,
listChatExports,
getChatExportTargets,
cancelChatExport,
createSnsExport,
getSnsExport,
+9 -2
View File
@@ -11,11 +11,11 @@
import uvicorn
import os
from pathlib import Path
from wechat_decrypt_tool.runtime_settings import read_effective_backend_port
from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
def main():
"""启动微信解密工具API服务"""
host = os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1")
host, host_source = read_effective_backend_host(default="127.0.0.1")
port, port_source = read_effective_backend_port(default=10392)
access_host = "127.0.0.1" if host in {"0.0.0.0", "::"} else host
@@ -29,6 +29,13 @@ def main():
print("端口来源: 配置文件 output/runtime_settings.json(由网页/桌面设置写入)")
else:
print("端口来源: 默认值")
if host_source == "env":
print("监听地址来源: 环境变量 WECHAT_TOOL_HOST")
elif host_source == "settings":
print("监听地址来源: 配置文件 output/runtime_settings.json(由网页/桌面设置写入)")
else:
print("监听地址来源: 默认值")
print(f"监听地址: {host}")
print(f"API文档: http://{access_host}:{port}/docs")
print(f"健康检查: http://{access_host}:{port}/api/health")
print("按 Ctrl+C 停止服务")
+1 -1
View File
@@ -1,6 +1,6 @@
[project]
name = "wechat-decrypt-tool"
version = "1.8.1"
version = "1.9.0"
description = "Modern WeChat database decryption tool with React frontend"
readme = "README.md"
requires-python = ">=3.11"
+32
View File
@@ -0,0 +1,32 @@
---
name: wechat-mcp-copilot
version: "1.0.0"
description: Use WeChatDataAnalysis MCP to inspect local WeChat accounts, contacts, sessions, messages, Moments, media, exports, and analytics through a small routed playbook. Trigger when the user asks to search, summarize, export, diagnose, or reason over local WeChat data.
---
# WeChat MCP Copilot
Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targets, then fetch only the context needed to answer.
## Core Rules
1. Start with `references/routing.md`.
2. Load only one domain reference after routing.
3. Load `references/pagination-budget.md` before broad searches, exports, or multi-page scans.
4. Use `references/failure-recovery.md` when MCP, database readiness, or empty results are unclear.
5. For phone, ScreenMemo, or external MCP clients, prefer `wechat.mobile.*` facade tools before low-level tools.
6. Do not load the full tool catalog unless the user asks about available tools.
## References
- `references/routing.md`: first-hop intent routing.
- `references/mobile.md`: phone-friendly facade tools and compact response rules.
- `references/setup-system.md`: setup, keys, decrypt, import, health, and system operations.
- `references/target-resolution.md`: fuzzy contact/session resolution.
- `references/chats.md`: chat sessions, messages, search, and context.
- `references/moments.md`: Moments timeline, posters, likes, comments, media.
- `references/media.md`: images, videos, emoji, files, voice resources without transcription.
- `references/export.md`: chat, Moments, and account archive export jobs.
- `references/analytics.md`: wrapped cards, counts, rankings, and aggregate analysis.
- `references/pagination-budget.md`: limits, cursors, result clipping, stopping rules.
- `references/failure-recovery.md`: empty result, not-ready database, ambiguous targets, retries.
@@ -0,0 +1,19 @@
# Analytics
Use this for annual summaries, rankings, counts, and aggregate questions.
## Tools
- `wechat.analytics.get_wrapped_meta`
- `wechat.analytics.get_wrapped_card`
- `wechat.analytics.get_wrapped_annual`
- `wechat.chat.get_daily_message_counts`
- `wechat.biz.get_pay_records`
## Rules
- Prefer `get_wrapped_meta` then `get_wrapped_card` for mobile or constrained contexts.
- Use `get_wrapped_annual` only when the user needs the whole annual dataset.
- For broad statistics, prefer aggregate tools or targeted searches over full message pagination.
- Always state the account, time range, and metric basis when answering.
@@ -0,0 +1,28 @@
# Chats
Use this for chat sessions, message search, and message context.
## Flow
1. Resolve fuzzy target with `wechat.chat.resolve_session`.
2. For recent messages, call `wechat.chat.get_messages` with a small `limit`.
3. For keywords, call `wechat.chat.search_messages`.
4. Use `wechat.chat.list_search_senders` when the user needs sender facets for a broad search.
5. For a hit that needs context, call `wechat.chat.get_message_around`.
6. For merged-forward chat history or AppMsg cards that only expose `server_id`, call `wechat.chat.resolve_chat_history` or `wechat.chat.resolve_app_message`.
7. Use `wechat.chat.get_message_raw` only for debugging or missing structured fields.
## Useful Tools
- `wechat.chat.list_sessions`
- `wechat.chat.resolve_session`
- `wechat.chat.get_messages`
- `wechat.chat.search_messages`
- `wechat.chat.list_search_senders`
- `wechat.chat.get_message_around`
- `wechat.chat.get_message_anchor`
- `wechat.chat.get_daily_message_counts`
- `wechat.chat.resolve_chat_history`
- `wechat.chat.resolve_app_message`
Do not scan full histories by pagination when an aggregate or search tool can answer.
@@ -0,0 +1,29 @@
# Export
Use export only when the user asks for an artifact.
## Chat Export
1. Resolve target session if `scope=selected`.
2. Confirm time range, format, media options, and output directory when needed.
3. Preview targets with `wechat.export.preview_chat_targets`.
4. Create job with `wechat.export.create_chat_export`.
5. Poll `wechat.export.get_chat_export`.
6. Return `wechat.export.get_chat_export_download` when ready.
7. Use `wechat.export.get_chat_export_events_url` when the client can consume SSE progress.
## Moments Export
Use `wechat.export.create_moments_export`, `wechat.export.get_moments_export`, `wechat.export.get_moments_export_download`, and `wechat.export.get_moments_export_events_url`.
## Account Archive
Use `wechat.export.create_account_archive`, `wechat.export.get_account_archive`, `wechat.export.get_account_archive_download`, and `wechat.export.cancel_account_archive`.
## Contacts Export
Use `wechat.contacts.export_contacts` only when the user asks for a contacts file. It writes JSON or CSV to a local output directory.
Do not silently export all history and all media unless the user explicitly asked for that scope.
For phone clients, prefer `wechat.mobile.export_job` unless exact low-level export options are required.
@@ -0,0 +1,21 @@
# Failure Recovery
Use this when MCP status, DB readiness, or results are suspicious.
## Checks
1. Phone clients: `wechat.mobile.get_overview`
2. `wechat.core.get_status`
3. `wechat.core.list_accounts`
4. `wechat.core.get_account_info`
5. Search index status with `wechat.chat.get_search_index_status` when message search fails.
6. Moments availability by checking account info and `wechat.moments.list_users`.
7. Setup readiness: load `setup-system.md` for keys, decrypt, import, health, or media-key problems.
## Empty Results
- Do not conclude "no data" after one failed query.
- Try contact/session resolution with a simpler keyword.
- Try session search before global message search when a target is known.
- For Moments, resolve poster identity before timeline filtering.
- If setup is not ready, stop content tools and explain the missing readiness condition.
@@ -0,0 +1,30 @@
# Media
Use this for image, video, emoji, file, link, and voice resources.
## Tools
- `wechat.media.get_avatar_url`
- `wechat.media.get_chat_image_url`
- `wechat.media.get_chat_emoji_url`
- `wechat.media.get_chat_video_thumb_url`
- `wechat.media.get_chat_video_url`
- `wechat.media.get_chat_voice_url`
- `wechat.media.get_decrypted_resource_url`
- `wechat.media.get_proxy_image_url`
- `wechat.media.get_favicon_url`
- `wechat.media.open_chat_media_folder`
- `wechat.biz.get_proxy_image_url`
- `wechat.moments.get_media_url`
- `wechat.moments.get_article_thumb_url`
- `wechat.moments.get_remote_video_url`
- `wechat.moments.get_local_video_url`
## Rules
- Media tools return URLs or resource metadata; they do not inline large binary payloads.
- Voice resources are files only. Do not transcribe voice messages.
- For phone clients, prefer `wechat.mobile.get_media_links` first.
- `open_chat_media_folder` is a desktop-host action; do not use it for phone-only flows.
- Locate the message first, then fetch media URL by message fields such as `server_id`, `username`, `md5`, or returned media references.
- For Moments, prefer local media URL fields from timeline records. Use remote video/article helpers only when the timeline record has a remote URL or article URL.
@@ -0,0 +1,30 @@
# Mobile Facade
Use this for phone, ScreenMemo, and external MCP clients unless the user needs a low-level operation.
## Default Tools
- `wechat.mobile.get_overview`: first call after initialize. Returns readiness, accounts, health, and suggested next tools.
- `wechat.mobile.get_home_snapshot`: small account/session/Moments snapshot for a home screen.
- `wechat.mobile.resolve_target`: resolve fuzzy people, groups, sessions, Moments users, or official accounts.
- `wechat.mobile.search_chat`: message search with optional tiny context windows.
- `wechat.mobile.get_chat_context`: recent, day, or around-anchor chat context.
- `wechat.mobile.get_session_bundle`: session metadata plus a page of messages.
- `wechat.mobile.search_moments`: compact Moments search.
- `wechat.mobile.get_media_links`: URL-only media lookup.
- `wechat.mobile.get_analytics`: compact analytics by metric.
- `wechat.mobile.export_job`: preview/create/status/download/cancel export jobs.
## Budget Rules
- Keep `limit` at 10-20 for first calls.
- Use `offset` or returned cursor fields for paging.
- Do not call full annual analytics by default; use `metric=digest` or a single card.
- Do not fetch binary media through MCP. Use returned URLs in the app.
- Use low-level tools only for debugging, editing, raw fields, unusual media, or exact export control.
## Recovery
- If `ready=false`, load `setup-system.md`.
- If target resolution is ambiguous, ask for one clarifying clue or show top candidates.
- If search returns nothing, try `resolve_target` and then `get_chat_context` before declaring no data.
@@ -0,0 +1,18 @@
# Moments
Use this for 朋友圈, posts, likes, comments, shared links, and Moments media.
## Flow
1. If the clue is a person, resolve with `wechat.contacts.resolve_contact`.
2. Use `wechat.moments.list_users` when you need poster candidates.
3. Use `wechat.moments.list_timeline` or `wechat.moments.search_moments`.
4. Use `wechat.moments.get_media_url` only when the user needs a media resource.
## Rules
- Person names must be resolved to username before filtering timeline by `usernames`.
- Keyword search is for post content/topic, not poster identity.
- Do not request raw XML by default.
- Use `wechat.moments.sync_latest` only when the user explicitly wants fresh local sync or status indicates data is stale.
@@ -0,0 +1,20 @@
# Pagination And Budget
Default limits:
- Contact/session candidates: 10
- Recent messages: 20
- Message search: 20
- Moments timeline: 10
- Media references: 20
- Ranking/analytics rows: 20
Hard rules:
- Keep single message/Moments pages at or below 50 unless user asks for more.
- Stop paging when enough evidence exists.
- Stop after two empty or low-value pages.
- Do not cross 500 raw messages without user confirmation.
- List responses should use ids, names, timestamps, preview, and evidence.
- Fetch full details only after a candidate or hit is selected.
@@ -0,0 +1,24 @@
# Routing
Use this first for every WeChatDataAnalysis MCP task.
## First Calls
- Phone, ScreenMemo, mobile app, or compact external client: load `mobile.md` and start with `wechat.mobile.get_overview`.
- Status, readiness, "why can't I find anything": `wechat.core.get_status`, or `wechat.mobile.get_overview` for phone clients.
- Available tools or packages: `wechat.core.list_tools`.
- Account selection: `wechat.core.list_accounts`, then `wechat.core.get_account_info`.
- Key/decrypt/import/backend health problems: load `setup-system.md`.
- Fuzzy person/group/official account: load `target-resolution.md`.
- Chat content, recent messages, keyword search: load `chats.md`.
- Moments / 朋友圈 / likes / comments / post media: load `moments.md`.
- Images, videos, emoji, files, voice resources: load `media.md`.
- Export requests: load `export.md`.
- Rankings, yearly summary, activity stats: load `analytics.md`.
- Empty results or setup errors: load `failure-recovery.md`.
## Mixed Intent
Resolve the target first, then load only the main domain reference. Do not load chats, moments, media, export, and analytics together unless the user explicitly asks for a broad audit.
For phone clients, keep using `mobile.md` until the user needs a low-level fallback such as editing, raw fields, special media URL construction, or exact export options.
@@ -0,0 +1,41 @@
# Setup And System
Use this when the database is not ready, keys are needed, decrypted data must be imported, or the backend itself needs inspection.
## Setup Tools
- `wechat.setup.get_saved_keys`: read saved DB/media keys for an account or wxid directory.
- `wechat.setup.get_database_key`: desktop workflow to extract the DB key from local WeChat.
- `wechat.setup.get_image_key`: fetch and save image AES/XOR keys.
- `wechat.setup.decrypt_databases`: decrypt databases from `db_storage_path` and a DB key.
- `wechat.setup.get_decrypt_stream_url`: SSE URL for decrypt progress.
- `wechat.setup.preview_import_decrypted`: validate an already-decrypted account directory.
- `wechat.setup.get_import_decrypted_stream_url`: SSE URL for import progress.
- `wechat.setup.cancel_import_decrypted`: cancel an import job by `job_id`.
- `wechat.setup.save_media_keys`: save media XOR/AES keys.
- `wechat.setup.decrypt_all_media`: decrypt all local `.dat` image resources.
- `wechat.setup.get_decrypt_all_media_stream_url`: SSE URL for bulk media decrypt progress.
- `wechat.setup.get_download_all_emojis_stream_url`: SSE URL for bulk emoji download progress.
## System Tools
- `wechat.system.health_check`
- `wechat.system.api_root`
- `wechat.system.get_backend_log_file`
- `wechat.system.open_backend_log_file`
- `wechat.system.get_backend_port`
- `wechat.system.set_backend_port_setting`
- `wechat.system.set_backend_port_and_restart`
- `wechat.system.get_img_helper_status`
- `wechat.system.toggle_img_helper`
- `wechat.system.pick_directory`
- `wechat.system.log_frontend_server_error`
## Rules
- Stream tools return `streamUrl`; the client consumes SSE outside the MCP JSON response.
- `set_backend_port_setting` persists the setting and may require backend restart by the user/client flow.
- `set_backend_port_and_restart` changes the port through the desktop restart flow and will disrupt the current backend connection.
- `open_backend_log_file` and `pick_directory` are desktop-host GUI actions; do not use them for phone-only flows.
- DB key extraction and image helper toggling depend on the local desktop WeChat state.
- Import/decrypt/media bulk operations write local files; summarize expected impact before running them.
@@ -0,0 +1,21 @@
# Target Resolution
Use target resolution whenever the user gives a loose name, nickname, group name, remark, official account name, or "the person/group who...".
## Tools
- Contacts: `wechat.contacts.resolve_contact`
- Sessions: `wechat.chat.resolve_session`
- Fallback list: `wechat.contacts.list_contacts`, `wechat.chat.list_sessions`
## Rules
- Prefer exact username/wxid match, then remark, nickname, display name, alias, recent session evidence.
- If a chat task mentions a person, resolve both contact and session when needed.
- If several candidates remain plausible, inspect recent session previews before choosing.
- If ambiguity survives after reasonable evidence, ask a short clarification.
## Evidence Fields
Use username/session id, display name, remark/nickname/alias, session type, recent timestamp, recent preview, and confidence.
+1 -1
View File
@@ -1,5 +1,5 @@
"""微信数据库解密工具
"""
__version__ = "1.8.1"
__version__ = "1.9.0"
__author__ = "WeChat Decrypt Tool"
+2
View File
@@ -31,6 +31,7 @@ from .routers.admin import router as _admin_router
from .routers.account_archive_export import router as _account_archive_export_router
from .routers.keys import router as _keys_router
from .routers.media import router as _media_router
from .routers.mcp import router as _mcp_router
from .routers.sns import router as _sns_router
from .routers.sns_export import router as _sns_export_router
from .routers.wechat_detection import router as _wechat_detection_router
@@ -73,6 +74,7 @@ app.include_router(_import_decrypted_router)
app.include_router(_decrypt_router)
app.include_router(_keys_router)
app.include_router(_media_router)
app.include_router(_mcp_router)
app.include_router(_chat_router)
app.include_router(_chat_contacts_router)
app.include_router(_chat_export_router)
+2 -2
View File
@@ -9,11 +9,11 @@ import os
import uvicorn
from wechat_decrypt_tool.api import app
from wechat_decrypt_tool.runtime_settings import read_effective_backend_port
from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
def main() -> None:
host = os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1")
host, _ = read_effective_backend_host(default="127.0.0.1")
port, _ = read_effective_backend_port(default=10392)
uvicorn.run(app, host=host, port=port, log_level="info")
+266 -11
View File
@@ -43,6 +43,7 @@ from .chat_helpers import (
_parse_location_message,
_parse_system_message_content,
_parse_pat_message,
_build_avatar_url,
_pick_display_name,
_quote_ident,
_resolve_account_dir,
@@ -50,6 +51,7 @@ from .chat_helpers import (
_resource_lookup_chat_id,
_should_keep_session,
_split_group_sender_prefix,
_resolve_msg_table_name_by_map,
)
from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC
from .logging_config import get_logger
@@ -3527,13 +3529,66 @@ def _resolve_export_targets(
uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
return uniq
session_rows, session_hidden_by_username = _load_export_session_targets(account_dir)
contact_usernames = _load_export_contact_usernames(account_dir)
discovered_message_targets = _load_message_backed_export_targets(
account_dir=account_dir,
seed_usernames=contact_usernames,
)
def should_include(u: str) -> bool:
if not u or u == account_dir.name:
return False
if not include_hidden and int(session_hidden_by_username.get(u) or 0) == 1:
return False
if not _should_keep_session(u, include_official=include_official):
return False
if scope == "groups" and (not u.endswith("@chatroom")):
return False
if scope == "singles" and u.endswith("@chatroom"):
return False
return True
out: list[str] = []
seen: set[str] = set()
for u, _sort_ts in session_rows:
if u in seen or (not should_include(u)):
continue
seen.add(u)
out.append(u)
for u, _sort_ts in sorted(discovered_message_targets.items(), key=lambda item: (-int(item[1] or 0), item[0])):
if u in seen or (not should_include(u)):
continue
seen.add(u)
out.append(u)
return out
def _load_export_session_targets(account_dir: Path) -> tuple[list[tuple[str, int]], dict[str, int]]:
session_db_path = account_dir / "session.db"
if not session_db_path.exists():
return [], {}
conn = sqlite3.connect(str(session_db_path))
conn.row_factory = sqlite3.Row
try:
columns = _sqlite_table_columns(conn, "SessionTable")
if "username" not in columns:
return [], {}
hidden_expr = "is_hidden" if "is_hidden" in columns else "0"
if "sort_timestamp" in columns:
sort_expr = "sort_timestamp"
elif "last_timestamp" in columns:
sort_expr = "last_timestamp"
else:
sort_expr = "0"
rows = conn.execute(
"""
SELECT username, is_hidden
f"""
SELECT username, {hidden_expr} AS is_hidden, {sort_expr} AS sort_timestamp
FROM SessionTable
ORDER BY sort_timestamp DESC
""",
@@ -3541,23 +3596,223 @@ def _resolve_export_targets(
finally:
conn.close()
out: list[str] = []
out: list[tuple[str, int]] = []
hidden_by_username: dict[str, int] = {}
seen: set[str] = set()
for r in rows:
u = str(r["username"] or "").strip()
if not u:
continue
if not include_hidden and int(r["is_hidden"] or 0) == 1:
try:
hidden = int(r["is_hidden"] or 0)
except Exception:
hidden = 0
if hidden:
hidden_by_username[u] = 1
else:
hidden_by_username.setdefault(u, 0)
if u in seen:
continue
if not _should_keep_session(u, include_official=include_official):
continue
if scope == "groups" and (not u.endswith("@chatroom")):
continue
if scope == "singles" and u.endswith("@chatroom"):
continue
out.append(u)
seen.add(u)
try:
sort_ts = int(r["sort_timestamp"] or 0)
except Exception:
sort_ts = 0
out.append((u, sort_ts))
return out, hidden_by_username
def _sqlite_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
try:
rows = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
except Exception:
return set()
columns: set[str] = set()
for row in rows:
try:
name = str(row["name"] if isinstance(row, sqlite3.Row) else row[1] or "").strip().lower()
except Exception:
name = ""
if name:
columns.add(name)
return columns
def _load_export_contact_usernames(account_dir: Path) -> set[str]:
contact_db_path = account_dir / "contact.db"
if not contact_db_path.exists():
return set()
out: set[str] = set()
conn = sqlite3.connect(str(contact_db_path))
conn.row_factory = sqlite3.Row
try:
for table in ("contact", "stranger"):
columns = _sqlite_table_columns(conn, table)
if "username" not in columns:
continue
try:
rows = conn.execute(f"SELECT username FROM {_quote_ident(table)}").fetchall()
except Exception:
continue
for row in rows:
try:
username = str(row["username"] or "").strip()
except Exception:
username = ""
if username:
out.add(username)
finally:
conn.close()
return out
def _load_name2id_usernames(conn: sqlite3.Connection) -> set[str]:
columns = _sqlite_table_columns(conn, "Name2Id")
username_col = "user_name" if "user_name" in columns else ("username" if "username" in columns else "")
if not username_col:
return set()
out: set[str] = set()
try:
rows = conn.execute(f"SELECT {_quote_ident(username_col)} AS username FROM Name2Id").fetchall()
except Exception:
return out
for row in rows:
try:
username = str(row["username"] if isinstance(row, sqlite3.Row) else row[0] or "").strip()
except Exception:
username = ""
if username:
out.add(username)
return out
def _message_table_latest_timestamp(conn: sqlite3.Connection, table_name: str) -> Optional[int]:
quoted = _quote_ident(table_name)
try:
row = conn.execute(f"SELECT MAX(create_time) FROM {quoted}").fetchone()
if row is not None and row[0] is not None:
return int(row[0] or 0)
except Exception:
pass
try:
row = conn.execute(f"SELECT 1 FROM {quoted} LIMIT 1").fetchone()
if row is not None:
return 0
except Exception:
pass
return None
def _load_message_backed_export_targets(*, account_dir: Path, seed_usernames: set[str]) -> dict[str, int]:
out: dict[str, int] = {}
for db_path in _iter_message_db_paths(account_dir):
conn: Optional[sqlite3.Connection] = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
table_names = [str(r["name"] if isinstance(r, sqlite3.Row) else r[0] or "") for r in rows]
lower_to_actual = {name.lower(): name for name in table_names if name}
if not lower_to_actual:
continue
candidates = set(seed_usernames)
candidates.update(_load_name2id_usernames(conn))
for username in candidates:
u = str(username or "").strip()
if not u or u == account_dir.name:
continue
table_name = _resolve_msg_table_name_by_map(lower_to_actual, u)
if not table_name:
continue
latest_ts = _message_table_latest_timestamp(conn, table_name)
if latest_ts is None:
continue
previous_ts = out.get(u)
if previous_ts is None or int(latest_ts or 0) > int(previous_ts or 0):
out[u] = int(latest_ts or 0)
except Exception:
continue
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
return out
def build_chat_export_targets_preview(
*,
account_dir: Path,
include_hidden: bool = True,
include_official: bool = False,
base_url: str = "",
) -> dict[str, Any]:
targets = _resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=bool(include_hidden),
include_official=bool(include_official),
)
session_rows, session_hidden_by_username = _load_export_session_targets(account_dir)
session_usernames = {u for u, _sort_ts in session_rows}
contact_rows = _load_contact_rows(account_dir / "contact.db", targets)
base = str(base_url or "").rstrip("/")
conversations: list[dict[str, Any]] = []
for u in targets:
row = contact_rows.get(u)
display_name = _pick_display_name(row, u) if row is not None else u
avatar_path = _build_avatar_url(account_dir.name, u)
conversations.append(
{
"username": u,
"name": display_name,
"displayName": display_name,
"isGroup": bool(u.endswith("@chatroom")),
"isHidden": bool(int(session_hidden_by_username.get(u) or 0) == 1),
"inSessionList": bool(u in session_usernames),
"avatar": f"{base}{avatar_path}" if base else avatar_path,
}
)
group_count = sum(1 for item in conversations if bool(item.get("isGroup")))
return {
"status": "success",
"account": account_dir.name,
"includeHidden": bool(include_hidden),
"includeOfficial": bool(include_official),
"targets": conversations,
"counts": {
"total": len(conversations),
"groups": group_count,
"singles": len(conversations) - group_count,
},
}
def get_chat_export_targets_preview(
*,
account: Optional[str],
include_hidden: bool = True,
include_official: bool = False,
base_url: str = "",
) -> dict[str, Any]:
account_dir = _resolve_account_dir(account)
return build_chat_export_targets_preview(
account_dir=account_dir,
include_hidden=include_hidden,
include_official=include_official,
base_url=base_url,
)
def _conversation_dir_name(
idx: int,
display_name: str,
+126 -3
View File
@@ -18,7 +18,7 @@ from .logging_config import get_logger
logger = get_logger(__name__)
_SCHEMA_VERSION = 1
_SCHEMA_VERSION = 2
_INDEX_DB_NAME = "chat_search_index.db"
_INDEX_DB_TMP_NAME = "chat_search_index.tmp.db"
_LEGACY_INDEX_DB_NAME = "message_fts.db"
@@ -188,7 +188,24 @@ def _update_build_state(account_key: str, **kwargs: Any) -> None:
st.update(kwargs)
def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
def _sqlite_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
try:
rows = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
except Exception:
return set()
columns: set[str] = set()
for row in rows:
try:
name = str(row["name"] if isinstance(row, sqlite3.Row) else row[1] or "").strip().lower()
except Exception:
name = ""
if name:
columns.add(name)
return columns
def _load_session_table_targets(account_dir: Path) -> dict[str, dict[str, Any]]:
session_db_path = account_dir / "session.db"
if not session_db_path.exists():
return {}
@@ -196,7 +213,11 @@ def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
conn = sqlite3.connect(str(session_db_path))
conn.row_factory = sqlite3.Row
try:
rows = conn.execute("SELECT username, is_hidden FROM SessionTable").fetchall()
columns = _sqlite_table_columns(conn, "SessionTable")
if "username" not in columns:
return {}
hidden_expr = "is_hidden" if "is_hidden" in columns else "0"
rows = conn.execute(f"SELECT username, {hidden_expr} AS is_hidden FROM SessionTable").fetchall()
finally:
conn.close()
@@ -214,6 +235,108 @@ def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
return out
def _load_contact_usernames_for_index(account_dir: Path) -> set[str]:
contact_db_path = account_dir / "contact.db"
if not contact_db_path.exists():
return set()
out: set[str] = set()
conn = sqlite3.connect(str(contact_db_path))
conn.row_factory = sqlite3.Row
try:
for table in ("contact", "stranger"):
columns = _sqlite_table_columns(conn, table)
if "username" not in columns:
continue
try:
rows = conn.execute(f"SELECT username FROM {_quote_ident(table)}").fetchall()
except Exception:
continue
for row in rows:
username = _decode_sqlite_text(row["username"]).strip()
if username:
out.add(username)
finally:
conn.close()
return out
def _load_name2id_usernames_for_index(conn: sqlite3.Connection) -> set[str]:
columns = _sqlite_table_columns(conn, "Name2Id")
username_col = "user_name" if "user_name" in columns else ("username" if "username" in columns else "")
if not username_col:
return set()
out: set[str] = set()
try:
rows = conn.execute(f"SELECT {_quote_ident(username_col)} AS username FROM Name2Id").fetchall()
except Exception:
return out
for row in rows:
try:
raw = row["username"] if isinstance(row, sqlite3.Row) else row[0]
except Exception:
raw = ""
username = _decode_sqlite_text(raw).strip()
if username:
out.add(username)
return out
def _load_message_backed_index_targets(*, account_dir: Path, seed_usernames: set[str]) -> set[str]:
out: set[str] = set()
for db_path in _iter_message_db_paths(account_dir):
conn: Optional[sqlite3.Connection] = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
table_names = [_decode_sqlite_text(r["name"] if isinstance(r, sqlite3.Row) else r[0]).strip() for r in rows]
lower_to_actual = {name.lower(): name for name in table_names if name}
if not lower_to_actual:
continue
candidates = set(seed_usernames)
candidates.update(_load_name2id_usernames_for_index(conn))
for username in candidates:
u = str(username or "").strip()
if not u or u == account_dir.name:
continue
if not _should_keep_session(u, include_official=True):
continue
if _resolve_msg_table_name_by_map(lower_to_actual, u):
out.add(u)
except Exception:
continue
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
return out
def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
sessions = _load_session_table_targets(account_dir)
contact_usernames = _load_contact_usernames_for_index(account_dir)
message_backed_usernames = _load_message_backed_index_targets(
account_dir=account_dir,
seed_usernames=contact_usernames,
)
for u in sorted(message_backed_usernames):
if u in sessions:
continue
sessions[u] = {
"is_hidden": 0,
"is_official": 1 if u.startswith("gh_") else 0,
}
return sessions
def _init_index_db(conn: sqlite3.Connection) -> None:
# NOTE: This index DB is built as a temporary file and then atomically swapped in.
# Using WAL here would create `-wal/-shm` side files that are *not* swapped together,
+2
View File
@@ -0,0 +1,2 @@
"""MCP integration for WeChatDataAnalysis."""
+35
View File
@@ -0,0 +1,35 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from fastapi import HTTPException
JSONRPC_PARSE_ERROR = -32700
JSONRPC_INVALID_REQUEST = -32600
JSONRPC_METHOD_NOT_FOUND = -32601
JSONRPC_INVALID_PARAMS = -32602
JSONRPC_INTERNAL_ERROR = -32603
@dataclass
class McpError(Exception):
code: int
message: str
data: Any = None
def error_from_exception(exc: Exception) -> McpError:
if isinstance(exc, McpError):
return exc
if isinstance(exc, HTTPException):
return McpError(
JSONRPC_INVALID_PARAMS if int(exc.status_code or 500) < 500 else JSONRPC_INTERNAL_ERROR,
str(exc.detail or "HTTP error"),
{"httpStatus": int(exc.status_code or 500)},
)
if isinstance(exc, (ValueError, TypeError, KeyError)):
return McpError(JSONRPC_INVALID_PARAMS, str(exc) or "Invalid params")
return McpError(JSONRPC_INTERNAL_ERROR, "Internal error", {"detail": str(exc)})
+102
View File
@@ -0,0 +1,102 @@
from __future__ import annotations
from typing import Any
from .errors import (
JSONRPC_INVALID_REQUEST,
JSONRPC_METHOD_NOT_FOUND,
JSONRPC_PARSE_ERROR,
McpError,
error_from_exception,
)
from .registry import McpToolContext, McpToolRegistry
PROTOCOL_VERSION = "2025-06-18"
def jsonrpc_result(request_id: Any, result: Any) -> dict[str, Any]:
return {"jsonrpc": "2.0", "id": request_id, "result": result}
def jsonrpc_error(request_id: Any, error: McpError) -> dict[str, Any]:
payload: dict[str, Any] = {"code": error.code, "message": error.message}
if error.data is not None:
payload["data"] = error.data
return {"jsonrpc": "2.0", "id": request_id, "error": payload}
def initialize_result() -> dict[str, Any]:
return {
"protocolVersion": PROTOCOL_VERSION,
"capabilities": {"tools": {"listChanged": False}},
"serverInfo": {"name": "wechat-data-analysis-mcp", "version": "1.0.0"},
"instructions": (
"Use this MCP server to inspect local WeChatDataAnalysis data. "
"Prefer resolve tools before broad message queries. Keep list limits small and expand details only when needed."
),
}
async def handle_jsonrpc_payload(payload: Any, registry: McpToolRegistry, context: McpToolContext) -> Any:
if isinstance(payload, list):
if not payload:
return jsonrpc_error(None, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
responses = []
for item in payload:
response = await handle_jsonrpc_request(item, registry, context)
if response is not None:
responses.append(response)
return responses or None
return await handle_jsonrpc_request(payload, registry, context)
async def handle_jsonrpc_request(request_obj: Any, registry: McpToolRegistry, context: McpToolContext) -> dict[str, Any] | None:
if not isinstance(request_obj, dict):
return jsonrpc_error(None, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
if "method" not in request_obj:
if request_obj.get("jsonrpc") == "2.0" and ("result" in request_obj or "error" in request_obj):
return None
return jsonrpc_error(request_obj.get("id"), McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
request_id = request_obj.get("id")
is_notification = "id" not in request_obj
method_value = request_obj.get("method")
method = method_value.strip() if isinstance(method_value, str) else ""
if request_obj.get("jsonrpc") != "2.0" or not method:
return None if is_notification else jsonrpc_error(request_id, McpError(JSONRPC_INVALID_REQUEST, "Invalid Request"))
if is_notification and method == "notifications/initialized":
return None
try:
params = request_obj.get("params")
if method == "initialize":
result = initialize_result()
elif method == "ping":
result = {}
elif method == "tools/list":
params_dict = params if isinstance(params, dict) else {}
result = registry.list_tools(
cursor=params_dict.get("cursor"),
limit=params_dict.get("limit"),
)
elif method == "tools/call":
if not isinstance(params, dict):
raise ValueError("params is required.")
name = str(params.get("name") or "").strip()
if not name:
raise ValueError("Tool name is required.")
arguments = params["arguments"] if "arguments" in params else {}
result = await registry.call_tool(name, arguments, context)
elif registry.has_tool(method):
result = await registry.call_tool(method, {} if params is None else params, context)
else:
raise McpError(JSONRPC_METHOD_NOT_FOUND, "Method not found")
return None if is_notification else jsonrpc_result(request_id, result)
except Exception as exc:
return None if is_notification else jsonrpc_error(request_id, error_from_exception(exc))
def parse_error_response() -> dict[str, Any]:
return jsonrpc_error(None, McpError(JSONRPC_PARSE_ERROR, "Parse error"))
+158
View File
@@ -0,0 +1,158 @@
from __future__ import annotations
import inspect
import json
from dataclasses import dataclass
from typing import Any, Awaitable, Callable
from fastapi.encoders import jsonable_encoder
from .errors import JSONRPC_METHOD_NOT_FOUND, McpError
ToolHandler = Callable[[dict[str, Any], "McpToolContext"], Any | Awaitable[Any]]
@dataclass(frozen=True)
class McpToolContext:
request: Any
@property
def base_url(self) -> str:
try:
return str(self.request.base_url).rstrip("/")
except Exception:
return ""
@dataclass(frozen=True)
class McpTool:
name: str
description: str
input_schema: dict[str, Any]
handler: ToolHandler
package: str = "wechat"
annotations: dict[str, Any] | None = None
def to_public_dict(self) -> dict[str, Any]:
payload: dict[str, Any] = {
"name": self.name,
"description": self.description,
"inputSchema": self.input_schema,
}
if self.annotations:
payload["annotations"] = self.annotations
return payload
class McpToolRegistry:
def __init__(self) -> None:
self._tools: dict[str, McpTool] = {}
def register(self, tool: McpTool) -> None:
if not tool.name:
raise ValueError("Tool name is required.")
if tool.name in self._tools:
raise ValueError(f"Duplicate MCP tool: {tool.name}")
self._tools[tool.name] = tool
def tool_names(self) -> list[str]:
return sorted(self._tools)
def list_tools(self, *, cursor: str | None = None, limit: int | None = None) -> dict[str, Any]:
names = sorted(self._tools)
start = 0
if cursor:
try:
start = int(cursor)
except Exception as exc:
raise ValueError("Invalid cursor.") from exc
if start < 0 or start > len(names):
raise ValueError("Invalid cursor.")
if limit is None:
page_names = names[start:]
next_cursor = None
else:
page_size = max(1, min(100, int(limit)))
page_names = names[start : start + page_size]
next_index = start + page_size
next_cursor = str(next_index) if next_index < len(names) else None
payload: dict[str, Any] = {
"tools": [self._tools[name].to_public_dict() for name in page_names],
"count": len(page_names),
"total": len(names),
}
if next_cursor is not None:
payload["nextCursor"] = next_cursor
return payload
def has_tool(self, name: str) -> bool:
return name in self._tools
async def call_tool(self, name: str, arguments: Any, context: McpToolContext) -> dict[str, Any]:
tool = self._tools.get(str(name or "").strip())
if tool is None:
raise McpError(JSONRPC_METHOD_NOT_FOUND, f"Unknown tool: {name}")
if arguments is None:
args: dict[str, Any] = {}
elif isinstance(arguments, dict):
args = dict(arguments)
else:
raise ValueError("Tool arguments must be an object.")
result = tool.handler(args, context)
if inspect.isawaitable(result):
result = await result
encoded = jsonable_encoder(result)
text = json.dumps(encoded, ensure_ascii=False, indent=2)
is_error = isinstance(encoded, dict) and str(encoded.get("status") or "").lower() == "error"
return {
"content": [{"type": "text", "text": text}],
"structuredContent": encoded,
"isError": is_error,
}
def object_schema(
properties: dict[str, Any] | None = None,
*,
required: list[str] | None = None,
additional_properties: bool = False,
) -> dict[str, Any]:
schema: dict[str, Any] = {
"type": "object",
"properties": properties or {},
"additionalProperties": additional_properties,
}
if required:
schema["required"] = required
return schema
def string_schema(description: str, *, enum: list[str] | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "string", "description": description}
if enum:
out["enum"] = enum
return out
def int_schema(description: str, *, minimum: int | None = None, maximum: int | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "integer", "description": description}
if minimum is not None:
out["minimum"] = minimum
if maximum is not None:
out["maximum"] = maximum
return out
def bool_schema(description: str, *, default: bool | None = None) -> dict[str, Any]:
out: dict[str, Any] = {"type": "boolean", "description": description}
if default is not None:
out["default"] = default
return out
def array_schema(description: str, items: dict[str, Any]) -> dict[str, Any]:
return {"type": "array", "description": description, "items": items}
File diff suppressed because it is too large Load Diff
+154 -4
View File
@@ -15,7 +15,19 @@ from starlette.requests import Request
from ..logging_config import get_log_file_path, get_logger
from ..path_fix import PathFixRoute
from ..runtime_settings import read_effective_backend_port, write_backend_port_env_file, write_backend_port_setting
from ..runtime_settings import (
LAN_BACKEND_HOST,
LOOPBACK_BACKEND_HOST,
read_effective_backend_host,
read_effective_mcp_token,
read_effective_backend_port,
reset_mcp_token,
write_backend_host_env_file,
write_backend_host_setting,
write_backend_port_env_file,
write_backend_port_setting,
write_mcp_token_env_file,
)
router = APIRouter(route_class=PathFixRoute)
@@ -33,7 +45,8 @@ def _format_host_for_url(host: str) -> str:
def _get_backend_bind_host() -> str:
return str(os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1") or "").strip() or "127.0.0.1"
host, _ = read_effective_backend_host(default=LOOPBACK_BACKEND_HOST)
return host
def _get_backend_access_host() -> str:
@@ -116,10 +129,10 @@ async def _wait_for_backend_ready(health_url: str, timeout_s: float = 30.0) -> b
return False
def _spawn_backend_process(next_port: int) -> subprocess.Popen:
def _spawn_backend_process(next_port: int, next_host: str | None = None) -> subprocess.Popen:
env = os.environ.copy()
env["WECHAT_TOOL_PORT"] = str(int(next_port))
env.setdefault("WECHAT_TOOL_HOST", _get_backend_bind_host())
env["WECHAT_TOOL_HOST"] = str(next_host or _get_backend_bind_host())
# Keep the same working directory so output paths remain consistent.
# (When `WECHAT_TOOL_DATA_DIR` is not set, the app uses `Path.cwd()`.)
@@ -150,6 +163,40 @@ def _spawn_backend_process(next_port: int) -> subprocess.Popen:
return subprocess.Popen(cmd, cwd=spawn_cwd, env=env)
def _spawn_backend_process_after_delay(next_port: int, next_host: str, delay_s: float = 0.8) -> subprocess.Popen:
env = os.environ.copy()
env["WECHAT_TOOL_PORT"] = str(int(next_port))
env["WECHAT_TOOL_HOST"] = str(next_host or _get_backend_bind_host())
cwd = os.getcwd()
cwd_path = Path(cwd)
src_dir = cwd_path / "src"
try:
existing_pp = str(env.get("PYTHONPATH", "") or "").strip()
if src_dir.is_dir():
env["PYTHONPATH"] = str(src_dir) if not existing_pp else f"{src_dir}{os.pathsep}{existing_pp}"
except Exception:
pass
if getattr(sys, "frozen", False):
target = [sys.executable]
else:
main_py = cwd_path / "main.py"
if main_py.is_file():
target = [sys.executable, str(main_py)]
else:
target = [sys.executable, "-m", "wechat_decrypt_tool.backend_entry"]
# Keep the launcher independent from this process; it starts the backend after
# the current process has released its listening socket.
launcher_code = (
"import os,subprocess,sys,time;"
f"time.sleep({max(0.0, float(delay_s))!r});"
"subprocess.Popen(sys.argv[1:], cwd=os.getcwd(), env=os.environ)"
)
return subprocess.Popen([sys.executable, "-c", launcher_code, *target], cwd=cwd, env=env)
async def _exit_process_after(delay_s: float) -> None:
try:
await asyncio.sleep(max(0.0, float(delay_s)))
@@ -212,6 +259,58 @@ async def get_backend_port() -> dict:
return {"port": port, "source": source, "default_port": DEFAULT_BACKEND_PORT}
@router.get("/api/admin/mcp-access", summary="获取 MCP 局域网接入状态")
async def get_mcp_access() -> dict:
host, source = read_effective_backend_host(default=LOOPBACK_BACKEND_HOST)
port, port_source = read_effective_backend_port(default=DEFAULT_BACKEND_PORT)
return {
"enabled": host == LAN_BACKEND_HOST,
"host": host,
"source": source,
"port": port,
"port_source": port_source,
"default_host": LOOPBACK_BACKEND_HOST,
"lan_host": LAN_BACKEND_HOST,
"restart_required": False,
}
@router.get("/api/admin/mcp-token", summary="获取 MCP token(仅允许本机访问)")
async def get_mcp_token(request: Request) -> dict:
if not _is_loopback_client(request):
raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
from ..runtime_settings import ensure_mcp_token
token, source = ensure_mcp_token()
env_file = write_mcp_token_env_file(token)
return {
"success": True,
"token": token,
"source": source,
"env_file": str(env_file) if env_file else None,
}
@router.post("/api/admin/mcp-token/reset", summary="重置 MCP token(仅允许本机访问)")
async def reset_mcp_token_endpoint(request: Request) -> dict:
if not _is_loopback_client(request):
raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
previous, previous_source = read_effective_mcp_token()
token = reset_mcp_token()
os.environ["WECHAT_TOOL_MCP_TOKEN"] = token
env_file = write_mcp_token_env_file(token)
return {
"success": True,
"changed": token != previous,
"token": token,
"previous_source": previous_source,
"source": "reset",
"env_file": str(env_file) if env_file else None,
}
@router.post("/api/admin/port", summary="修改后端端口并重启(仅允许本机访问)")
async def set_backend_port(payload: dict, request: Request, background_tasks: BackgroundTasks) -> dict:
if not _is_loopback_client(request):
@@ -281,3 +380,54 @@ async def set_backend_port(payload: dict, request: Request, background_tasks: Ba
}
finally:
_PORT_CHANGE_IN_PROGRESS = False
@router.post("/api/admin/mcp-access", summary="开启或关闭 MCP 局域网接入并重启后端(仅允许本机访问)")
async def set_mcp_access(payload: dict, request: Request, background_tasks: BackgroundTasks) -> dict:
if not _is_loopback_client(request):
raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
global _PORT_CHANGE_IN_PROGRESS
if _PORT_CHANGE_IN_PROGRESS:
raise HTTPException(status_code=409, detail="后端切换中,请稍后重试")
enabled = bool(payload.get("enabled")) if isinstance(payload, dict) else False
next_host = LAN_BACKEND_HOST if enabled else LOOPBACK_BACKEND_HOST
current_host = _get_backend_bind_host()
current_port, _ = read_effective_backend_port(default=DEFAULT_BACKEND_PORT)
if next_host == current_host:
write_backend_host_setting(next_host)
env_file = write_backend_host_env_file(next_host)
return {
"success": True,
"changed": False,
"enabled": enabled,
"host": next_host,
"port": int(current_port),
"ui_url": f"http://{_format_host_for_url(_get_backend_access_host())}:{int(current_port)}/",
"env_file": str(env_file) if env_file else None,
}
_PORT_CHANGE_IN_PROGRESS = True
try:
write_backend_host_setting(next_host)
env_file = write_backend_host_env_file(next_host)
# Host changes keep the same port. The old socket must close before the
# new process can bind, so start a detached launcher and then exit.
background_tasks.add_task(_spawn_backend_process_after_delay, int(current_port), next_host, 0.8)
background_tasks.add_task(_exit_process_after, 0.2)
return {
"success": True,
"changed": True,
"enabled": enabled,
"host": next_host,
"port": int(current_port),
"ui_url": f"http://{_format_host_for_url(LOOPBACK_BACKEND_HOST)}:{int(current_port)}/",
"env_file": str(env_file) if env_file else None,
"restart_scheduled": True,
}
finally:
_PORT_CHANGE_IN_PROGRESS = False
+17 -1
View File
@@ -7,7 +7,7 @@ from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import FileResponse, StreamingResponse
from pydantic import BaseModel, Field
from ..chat_export_service import CHAT_EXPORT_MANAGER
from ..chat_export_service import CHAT_EXPORT_MANAGER, get_chat_export_targets_preview
from ..path_fix import PathFixRoute
router = APIRouter(route_class=PathFixRoute)
@@ -101,6 +101,22 @@ async def list_chat_exports():
return {"status": "success", "jobs": jobs}
@router.get("/api/chat/exports/targets", summary="获取聊天记录导出目标预览")
async def preview_chat_export_targets(
request: Request,
account: Optional[str] = None,
include_hidden: bool = True,
include_official: bool = False,
):
base_url = str(request.base_url).rstrip("/")
return get_chat_export_targets_preview(
account=account,
include_hidden=bool(include_hidden),
include_official=bool(include_official),
base_url=base_url,
)
@router.get("/api/chat/exports/{export_id}", summary="获取导出任务状态")
async def get_chat_export(export_id: str):
job = CHAT_EXPORT_MANAGER.get_job(str(export_id or "").strip())
+119
View File
@@ -0,0 +1,119 @@
from __future__ import annotations
import hmac
from pathlib import Path
from typing import Any
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, PlainTextResponse
from ..mcp.protocol import handle_jsonrpc_payload, parse_error_response
from ..mcp.registry import McpToolContext
from ..mcp.tools import MCP_REGISTRY
from ..path_fix import PathFixRoute
from ..runtime_settings import ensure_mcp_token
router = APIRouter(route_class=PathFixRoute)
PROJECT_ROOT = Path(__file__).resolve().parents[3]
SKILL_ROOT = PROJECT_ROOT / "skills" / "wechat-mcp-copilot"
def _extract_mcp_token(request: Request) -> str:
auth = str(request.headers.get("authorization") or "").strip()
if auth.lower().startswith("bearer "):
return auth[7:].strip()
header_token = str(request.headers.get("x-mcp-token") or "").strip()
if header_token:
return header_token
return str(request.query_params.get("token") or "").strip()
def _mcp_unauthorized() -> JSONResponse:
return JSONResponse(
{"status": "error", "message": "Invalid or missing MCP token."},
status_code=401,
headers={"WWW-Authenticate": "Bearer"},
)
def _verify_mcp_token(request: Request) -> bool:
expected, _ = ensure_mcp_token()
provided = _extract_mcp_token(request)
if not expected or not provided:
return False
return hmac.compare_digest(provided, expected)
def _read_skill_bundle() -> dict[str, Any]:
entry_path = SKILL_ROOT / "SKILL.md"
if not entry_path.is_file():
return {
"status": "error",
"message": "Skill not found.",
"path": str(entry_path),
}
references = []
bundle_parts = [entry_path.read_text(encoding="utf-8")]
references_dir = SKILL_ROOT / "references"
if references_dir.is_dir():
for ref_path in sorted(references_dir.glob("*.md")):
content = ref_path.read_text(encoding="utf-8")
rel_path = ref_path.relative_to(SKILL_ROOT).as_posix()
references.append({"path": rel_path, "content": content})
bundle_parts.append(f"\n\n---\n# {rel_path}\n\n{content}")
return {
"status": "success",
"name": "wechat-mcp-copilot",
"version": "1.0.0",
"entry": "SKILL.md",
"entryContent": bundle_parts[0],
"references": references,
"bundleText": "".join(bundle_parts),
}
@router.get("/mcp", summary="MCP endpoint")
async def mcp_get(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
return PlainTextResponse("Use POST with JSON-RPC 2.0.", status_code=405, headers={"Allow": "POST"})
@router.get("/mcp/skill/bundle", summary="MCP skill bundle")
async def mcp_skill_bundle(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
payload = _read_skill_bundle()
status_code = 200 if payload.get("status") == "success" else 404
return JSONResponse(payload, status_code=status_code)
@router.get("/mcp/skill", summary="MCP skill text")
async def mcp_skill_text(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
payload = _read_skill_bundle()
if payload.get("status") != "success":
return PlainTextResponse(str(payload.get("message") or "Skill not found."), status_code=404)
return PlainTextResponse(str(payload.get("bundleText") or ""), media_type="text/markdown; charset=utf-8")
@router.post("/mcp", summary="MCP JSON-RPC endpoint")
async def mcp_post(request: Request):
if not _verify_mcp_token(request):
return _mcp_unauthorized()
try:
payload: Any = await request.json()
except Exception:
return JSONResponse(parse_error_response(), status_code=400)
result = await handle_jsonrpc_payload(payload, MCP_REGISTRY, McpToolContext(request=request))
if result is None:
return PlainTextResponse("", status_code=202)
return JSONResponse(result)
+180 -29
View File
@@ -3,14 +3,21 @@ from __future__ import annotations
import json
import os
import re
import secrets
from pathlib import Path
RUNTIME_SETTINGS_FILENAME = "runtime_settings.json"
BACKEND_PORT_KEY = "backend_port"
BACKEND_HOST_KEY = "backend_host"
MCP_TOKEN_KEY = "mcp_token"
ENV_PORT_KEY = "WECHAT_TOOL_PORT"
ENV_HOST_KEY = "WECHAT_TOOL_HOST"
ENV_MCP_TOKEN_KEY = "WECHAT_TOOL_MCP_TOKEN"
ENV_FILE_KEY = "WECHAT_TOOL_ENV_FILE"
DEFAULT_ENV_FILENAME = ".env"
LOOPBACK_BACKEND_HOST = "127.0.0.1"
LAN_BACKEND_HOST = "0.0.0.0"
def _parse_port(value: object) -> int | None:
@@ -31,6 +38,66 @@ def _parse_port(value: object) -> int | None:
return port
def _normalize_host(value: object) -> str | None:
try:
raw = str(value or "").strip()
except Exception:
return None
if raw in {LOOPBACK_BACKEND_HOST, "localhost", "::1"}:
return LOOPBACK_BACKEND_HOST
if raw in {LAN_BACKEND_HOST, "::"}:
return LAN_BACKEND_HOST
return None
def _normalize_mcp_token(value: object) -> str | None:
try:
raw = str(value or "").strip()
except Exception:
return None
if len(raw) < 16 or len(raw) > 512:
return None
if any(ch.isspace() for ch in raw):
return None
return raw
def generate_mcp_token() -> str:
return secrets.token_urlsafe(32)
def _read_runtime_settings() -> dict:
path = get_runtime_settings_path()
try:
if not path.is_file():
return {}
data = json.loads(path.read_text(encoding="utf-8") or "{}")
return data if isinstance(data, dict) else {}
except Exception:
return {}
def _write_runtime_settings(data: dict) -> None:
path = get_runtime_settings_path()
try:
path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
return
try:
cleaned = data if isinstance(data, dict) else {}
if not cleaned:
try:
path.unlink(missing_ok=True)
except Exception:
pass
return
path.write_text(json.dumps(cleaned, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
return
def get_runtime_settings_path() -> Path:
from .app_paths import get_output_dir
@@ -38,50 +105,24 @@ def get_runtime_settings_path() -> Path:
def read_backend_port_setting() -> int | None:
path = get_runtime_settings_path()
try:
if not path.is_file():
return None
data = json.loads(path.read_text(encoding="utf-8") or "{}")
if not isinstance(data, dict):
return None
data = _read_runtime_settings()
return _parse_port(data.get(BACKEND_PORT_KEY))
except Exception:
return None
def write_backend_port_setting(port: int | None) -> None:
path = get_runtime_settings_path()
safe_port = _parse_port(port)
try:
path.parent.mkdir(parents=True, exist_ok=True)
except Exception:
return
try:
data: dict = {}
if path.is_file():
try:
existing = json.loads(path.read_text(encoding="utf-8") or "{}")
if isinstance(existing, dict):
data = existing
except Exception:
data = {}
data = _read_runtime_settings()
if safe_port is None:
data.pop(BACKEND_PORT_KEY, None)
else:
data[BACKEND_PORT_KEY] = safe_port
# Keep the file small and stable; remove if empty.
if not data:
try:
path.unlink(missing_ok=True)
except Exception:
pass
return
path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
_write_runtime_settings(data)
except Exception:
return
@@ -101,6 +142,92 @@ def read_effective_backend_port(default: int) -> tuple[int, str]:
return int(default), "default"
def read_backend_host_setting() -> str | None:
try:
data = _read_runtime_settings()
return _normalize_host(data.get(BACKEND_HOST_KEY))
except Exception:
return None
def write_backend_host_setting(host: str | None) -> None:
safe_host = _normalize_host(host)
try:
data = _read_runtime_settings()
if safe_host is None:
data.pop(BACKEND_HOST_KEY, None)
else:
data[BACKEND_HOST_KEY] = safe_host
_write_runtime_settings(data)
except Exception:
return
def read_effective_backend_host(default: str = LOOPBACK_BACKEND_HOST) -> tuple[str, str]:
"""Return (host, source) where source is one of: env | settings | default."""
env_host = _normalize_host(os.environ.get(ENV_HOST_KEY, ""))
if env_host is not None:
return env_host, "env"
settings_host = read_backend_host_setting()
if settings_host is not None:
return settings_host, "settings"
return _normalize_host(default) or LOOPBACK_BACKEND_HOST, "default"
def read_mcp_token_setting() -> str | None:
try:
data = _read_runtime_settings()
return _normalize_mcp_token(data.get(MCP_TOKEN_KEY))
except Exception:
return None
def write_mcp_token_setting(token: str | None) -> None:
safe_token = _normalize_mcp_token(token)
try:
data = _read_runtime_settings()
if safe_token is None:
data.pop(MCP_TOKEN_KEY, None)
else:
data[MCP_TOKEN_KEY] = safe_token
_write_runtime_settings(data)
except Exception:
return
def read_effective_mcp_token() -> tuple[str | None, str]:
"""Return (token, source) where source is one of: env | settings | missing."""
env_token = _normalize_mcp_token(os.environ.get(ENV_MCP_TOKEN_KEY, ""))
if env_token is not None:
return env_token, "env"
settings_token = read_mcp_token_setting()
if settings_token is not None:
return settings_token, "settings"
return None, "missing"
def ensure_mcp_token() -> tuple[str, str]:
token, source = read_effective_mcp_token()
if token:
return token, source
token = generate_mcp_token()
write_mcp_token_setting(token)
return token, "generated"
def reset_mcp_token() -> str:
token = generate_mcp_token()
write_mcp_token_setting(token)
return token
def get_env_file_path() -> Path | None:
"""Best-effort env file path for `uv run` (defaults to repo root `.env`)."""
@@ -173,3 +300,27 @@ def write_backend_port_env_file(port: int | None) -> Path | None:
safe_port = _parse_port(port)
ok = _set_env_var_in_file(env_file, ENV_PORT_KEY, str(safe_port) if safe_port is not None else None)
return env_file if ok else None
def write_backend_host_env_file(host: str | None) -> Path | None:
"""Write `WECHAT_TOOL_HOST` into a `.env` file so `uv run main.py` picks it up on restart."""
env_file = get_env_file_path()
if not env_file:
return None
safe_host = _normalize_host(host)
ok = _set_env_var_in_file(env_file, ENV_HOST_KEY, safe_host)
return env_file if ok else None
def write_mcp_token_env_file(token: str | None) -> Path | None:
"""Write `WECHAT_TOOL_MCP_TOKEN` into a `.env` file so `uv run main.py` picks it up."""
env_file = get_env_file_path()
if not env_file:
return None
safe_token = _normalize_mcp_token(token)
ok = _set_env_var_in_file(env_file, ENV_MCP_TOKEN_KEY, safe_token)
return env_file if ok else None
+225
View File
@@ -0,0 +1,225 @@
import hashlib
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportTargets(unittest.TestCase):
def _seed_contact_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
rows = [
(account, "", "Me", "", 1, 0, "", ""),
("wxid_visible", "", "Visible friend", "", 1, 0, "", ""),
("wxid_no_session", "", "No session friend", "", 1, 0, "", ""),
("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""),
("room_no_session@chatroom", "", "No session group", "", 1, 0, "", ""),
("room_hidden@chatroom", "", "Hidden session group", "", 1, 0, "", ""),
("gh_official_no_session", "", "Official account", "", 1, 24, "", ""),
("wxid_no_messages", "", "No messages friend", "", 1, 0, "", ""),
]
conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100))
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200))
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("room_hidden@chatroom", 1, 250))
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
usernames = [
account,
"wxid_visible",
"wxid_no_session",
"wxid_session_hidden",
"room_no_session@chatroom",
"room_hidden@chatroom",
"gh_official_no_session",
"wxid_no_messages",
]
for idx, username in enumerate(usernames, start=1):
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username))
message_usernames = {
"wxid_visible": 100,
"wxid_no_session": 300,
"wxid_session_hidden": 400,
"room_no_session@chatroom": 350,
"room_hidden@chatroom": 450,
"gh_official_no_session": 360,
}
for username, create_time in message_usernames.items():
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
conn.execute(
f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 1, 1, 2, create_time, f"message for {username}", None),
)
conn.commit()
finally:
conn.close()
def _prepare_account(self, root: Path) -> Path:
account = "wxid_account"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account)
self._seed_session_db(account_dir / "session.db")
self._seed_message_db(account_dir / "message_0.db", account=account)
return account_dir
def test_all_scope_includes_contacts_with_messages_missing_from_session_list(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
targets = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=False,
include_official=False,
)
self.assertIn("wxid_visible", targets)
self.assertIn("wxid_no_session", targets)
self.assertIn("room_no_session@chatroom", targets)
self.assertNotIn("wxid_session_hidden", targets)
self.assertNotIn("room_hidden@chatroom", targets)
self.assertNotIn("gh_official_no_session", targets)
self.assertNotIn("wxid_no_messages", targets)
def test_group_single_and_official_filters_apply_to_message_discovered_targets(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
groups = svc._resolve_export_targets(
account_dir=account_dir,
scope="groups",
usernames=[],
include_hidden=False,
include_official=False,
)
singles = svc._resolve_export_targets(
account_dir=account_dir,
scope="singles",
usernames=[],
include_hidden=False,
include_official=False,
)
with_official = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=False,
include_official=True,
)
self.assertEqual(groups, ["room_no_session@chatroom"])
self.assertIn("wxid_no_session", singles)
self.assertNotIn("room_no_session@chatroom", singles)
self.assertIn("gh_official_no_session", with_official)
def test_preview_counts_match_bulk_export_targets_including_hidden_sessions(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
preview = svc.build_chat_export_targets_preview(
account_dir=account_dir,
include_hidden=True,
include_official=False,
base_url="http://example.test",
)
actual_targets = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=True,
include_official=False,
)
preview_targets = preview["targets"]
preview_usernames = [item["username"] for item in preview_targets]
by_username = {item["username"]: item for item in preview_targets}
self.assertEqual(preview_usernames, actual_targets)
self.assertEqual(preview["counts"], {"total": 5, "groups": 2, "singles": 3})
self.assertTrue(by_username["room_hidden@chatroom"]["isHidden"])
self.assertTrue(by_username["room_hidden@chatroom"]["inSessionList"])
self.assertFalse(by_username["room_no_session@chatroom"]["inSessionList"])
self.assertTrue(by_username["room_no_session@chatroom"]["avatar"].startswith("http://example.test/api/chat/avatar?"))
if __name__ == "__main__":
unittest.main()
+174
View File
@@ -0,0 +1,174 @@
import hashlib
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatSearchIndexTargets(unittest.TestCase):
def _seed_contact_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
rows = [
(account, "", "Me", "", 1, 0, "", ""),
("wxid_visible", "", "Visible friend", "", 1, 0, "", ""),
("wxid_no_session", "", "No session friend", "", 1, 0, "", ""),
("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""),
("gh_official_no_session", "", "Official account", "", 1, 24, "", ""),
]
conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100))
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200))
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
usernames = [
account,
"wxid_visible",
"wxid_no_session",
"wxid_session_hidden",
"gh_official_no_session",
]
for idx, username in enumerate(usernames, start=1):
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username))
message_usernames = {
"wxid_visible": "visible searchable text",
"wxid_no_session": "missing session searchable text",
"wxid_session_hidden": "hidden searchable text",
"gh_official_no_session": "official searchable text",
}
for username, content in message_usernames.items():
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
conn.execute(
f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 1, 1, 2, 300, content, None),
)
conn.commit()
finally:
conn.close()
def _prepare_account(self, root: Path) -> Path:
account = "wxid_account"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account)
self._seed_session_db(account_dir / "session.db")
self._seed_message_db(account_dir / "message_0.db", account=account)
return account_dir
def test_index_includes_message_backed_contacts_missing_from_session_list(self):
import wechat_decrypt_tool.chat_search_index as idx
from wechat_decrypt_tool.chat_helpers import _build_fts_query
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
idx._build_worker(account_dir, rebuild=True)
index_path = idx.get_chat_search_index_db_path(account_dir)
conn = sqlite3.connect(str(index_path))
try:
rows = conn.execute(
"""
SELECT username, is_hidden, is_official
FROM message_fts
ORDER BY username
"""
).fetchall()
fts_query = _build_fts_query("missing session")
default_search_rows = conn.execute(
"""
SELECT username
FROM message_fts
WHERE message_fts MATCH ?
AND CAST(is_hidden AS INTEGER) = 0
AND CAST(is_official AS INTEGER) = 0
""",
(fts_query,),
).fetchall()
finally:
conn.close()
by_username = {str(r[0]): (int(r[1] or 0), int(r[2] or 0)) for r in rows}
default_search_usernames = [str(r[0]) for r in default_search_rows]
self.assertIn("wxid_visible", by_username)
self.assertIn("wxid_no_session", by_username)
self.assertIn("wxid_session_hidden", by_username)
self.assertIn("gh_official_no_session", by_username)
self.assertEqual(by_username["wxid_no_session"], (0, 0))
self.assertEqual(by_username["wxid_session_hidden"], (1, 0))
self.assertEqual(by_username["gh_official_no_session"], (0, 1))
self.assertEqual(default_search_usernames, ["wxid_no_session"])
if __name__ == "__main__":
unittest.main()
+557
View File
@@ -0,0 +1,557 @@
import os
import sys
import unittest
from pathlib import Path
from unittest.mock import AsyncMock, Mock, patch
from fastapi import FastAPI
from fastapi.testclient import TestClient
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestMcpRouter(unittest.TestCase):
TEST_TOKEN = "test-mcp-token-1234567890"
def setUp(self):
self._old_mcp_token = os.environ.get("WECHAT_TOOL_MCP_TOKEN")
os.environ["WECHAT_TOOL_MCP_TOKEN"] = self.TEST_TOKEN
def tearDown(self):
if self._old_mcp_token is None:
os.environ.pop("WECHAT_TOOL_MCP_TOKEN", None)
else:
os.environ["WECHAT_TOOL_MCP_TOKEN"] = self._old_mcp_token
def _client(self, auth: bool = True) -> TestClient:
from wechat_decrypt_tool.routers.mcp import router
app = FastAPI()
app.include_router(router)
client = TestClient(app)
if auth:
client.headers.update({"Authorization": f"Bearer {self.TEST_TOKEN}"})
return client
def _rpc(self, method, params=None, request_id=1):
payload = {"jsonrpc": "2.0", "id": request_id, "method": method}
if params is not None:
payload["params"] = params
return payload
def test_initialize_and_tools_list(self):
client = self._client()
init_resp = client.post("/mcp", json=self._rpc("initialize"))
self.assertEqual(init_resp.status_code, 200)
init_payload = init_resp.json()["result"]
self.assertEqual(init_payload["protocolVersion"], "2025-06-18")
self.assertEqual(init_payload["serverInfo"]["name"], "wechat-data-analysis-mcp")
tools_resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(tools_resp.status_code, 200)
tools = tools_resp.json()["result"]["tools"]
names = {tool["name"] for tool in tools}
self.assertIn("wechat.core.get_status", names)
self.assertIn("wechat.chat.search_messages", names)
self.assertIn("wechat.chat.list_search_senders", names)
self.assertIn("wechat.chat.resolve_chat_history", names)
self.assertIn("wechat.chat.resolve_app_message", names)
self.assertIn("wechat.contacts.export_contacts", names)
self.assertIn("wechat.export.create_chat_export", names)
self.assertIn("wechat.export.get_account_archive_download", names)
self.assertIn("wechat.moments.get_remote_video_url", names)
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
def test_mcp_requires_token(self):
client = self._client(auth=False)
resp = client.post("/mcp", json=self._rpc("ping"))
self.assertEqual(resp.status_code, 401)
self.assertEqual(resp.headers.get("www-authenticate"), "Bearer")
def test_skill_endpoints_require_token(self):
client = self._client(auth=False)
for path in ("/mcp/skill/bundle", "/mcp/skill"):
with self.subTest(path=path):
resp = client.get(path)
self.assertEqual(resp.status_code, 401)
self.assertEqual(resp.headers.get("www-authenticate"), "Bearer")
def test_mcp_rejects_invalid_token(self):
client = self._client(auth=False)
resp = client.post("/mcp", json=self._rpc("ping"), headers={"Authorization": "Bearer wrong-token"})
self.assertEqual(resp.status_code, 401)
def test_mcp_accepts_x_mcp_token_and_query_token(self):
client = self._client(auth=False)
header_resp = client.post("/mcp", json=self._rpc("ping"), headers={"X-MCP-Token": self.TEST_TOKEN})
query_resp = client.post(f"/mcp?token={self.TEST_TOKEN}", json=self._rpc("ping"))
self.assertEqual(header_resp.status_code, 200)
self.assertEqual(header_resp.json()["result"], {})
self.assertEqual(query_resp.status_code, 200)
self.assertEqual(query_resp.json()["result"], {})
def test_get_mcp_returns_method_not_allowed(self):
client = self._client()
resp = client.get("/mcp")
self.assertEqual(resp.status_code, 405)
self.assertEqual(resp.headers.get("allow"), "POST")
def test_skill_bundle_can_be_loaded_over_http(self):
client = self._client()
resp = client.get("/mcp/skill/bundle")
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertEqual(payload["status"], "success")
self.assertEqual(payload["name"], "wechat-mcp-copilot")
self.assertIn("bundleText", payload)
self.assertIn("WeChat MCP Copilot", payload["bundleText"])
self.assertTrue(any(ref["path"] == "references/mobile.md" for ref in payload["references"]))
def test_skill_text_can_be_loaded_over_http(self):
client = self._client()
resp = client.get("/mcp/skill")
self.assertEqual(resp.status_code, 200)
self.assertIn("WeChat MCP Copilot", resp.text)
def test_ping_returns_empty_result(self):
client = self._client()
resp = client.post("/mcp", json=self._rpc("ping"))
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["result"], {})
def test_tools_list_supports_cursor_pagination(self):
client = self._client()
first_resp = client.post("/mcp", json=self._rpc("tools/list", {"limit": 3}))
self.assertEqual(first_resp.status_code, 200)
first = first_resp.json()["result"]
self.assertEqual(first["count"], 3)
self.assertEqual(len(first["tools"]), 3)
self.assertIn("nextCursor", first)
self.assertGreater(first["total"], 3)
second_resp = client.post(
"/mcp",
json=self._rpc("tools/list", {"limit": 3, "cursor": first["nextCursor"]}),
)
second = second_resp.json()["result"]
self.assertEqual(second["count"], 3)
self.assertNotEqual(first["tools"][0]["name"], second["tools"][0]["name"])
def test_core_list_tools_supports_package_filter_and_pagination(self):
client = self._client()
resp = client.post(
"/mcp",
json=self._rpc(
"tools/call",
{"name": "wechat.core.list_tools", "arguments": {"package": "wechat.media", "limit": 2}},
),
)
self.assertEqual(resp.status_code, 200)
structured = resp.json()["result"]["structuredContent"]
self.assertEqual(structured["status"], "success")
self.assertEqual(structured["count"], 2)
self.assertIn("nextCursor", structured)
self.assertTrue(all(t["annotations"]["package"] == "wechat.media" for t in structured["tools"]))
def test_tools_call_status_uses_structured_content(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_test"]):
resp = client.post(
"/mcp",
json=self._rpc(
"tools/call",
{"name": "wechat.core.get_status", "arguments": {}},
),
)
self.assertEqual(resp.status_code, 200)
result = resp.json()["result"]
self.assertFalse(result["isError"])
self.assertEqual(result["structuredContent"]["status"], "success")
self.assertTrue(result["structuredContent"]["dbReady"])
self.assertEqual(result["structuredContent"]["defaultAccount"], "wxid_test")
self.assertEqual(result["content"][0]["type"], "text")
def test_direct_tool_method_is_supported(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=[]):
resp = client.post("/mcp", json=self._rpc("wechat.core.list_accounts"))
self.assertEqual(resp.status_code, 200)
result = resp.json()["result"]
self.assertTrue(result["isError"])
structured = result["structuredContent"]
self.assertEqual(structured["status"], "error")
self.assertEqual(structured["accounts"], [])
def test_notification_returns_accepted_empty_body(self):
client = self._client()
resp = client.post(
"/mcp",
json={"jsonrpc": "2.0", "method": "notifications/initialized"},
)
self.assertEqual(resp.status_code, 202)
self.assertEqual(resp.text, "")
def test_json_rpc_response_input_returns_accepted_empty_body(self):
client = self._client()
resp = client.post("/mcp", json={"jsonrpc": "2.0", "id": 99, "result": {"ok": True}})
self.assertEqual(resp.status_code, 202)
self.assertEqual(resp.text, "")
def test_notification_batch_returns_accepted_empty_body(self):
client = self._client()
resp = client.post(
"/mcp",
json=[
{"jsonrpc": "2.0", "method": "notifications/initialized"},
{"jsonrpc": "2.0", "method": "notifications/initialized"},
],
)
self.assertEqual(resp.status_code, 202)
self.assertEqual(resp.text, "")
def test_empty_batch_returns_invalid_request(self):
client = self._client()
resp = client.post("/mcp", json=[])
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["error"]["code"], -32600)
def test_non_string_method_returns_invalid_request(self):
client = self._client()
resp = client.post("/mcp", json={"jsonrpc": "2.0", "id": 1, "method": 1})
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["error"]["code"], -32600)
def test_batch_mixed_requests_and_notifications(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_test"]):
resp = client.post(
"/mcp",
json=[
self._rpc("ping", request_id=1),
{"jsonrpc": "2.0", "method": "notifications/initialized"},
self._rpc("wechat.core.get_status", request_id=2),
],
)
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertEqual(len(payload), 2)
self.assertEqual(payload[0]["result"], {})
self.assertEqual(payload[1]["result"]["structuredContent"]["defaultAccount"], "wxid_test")
def test_unknown_tool_returns_json_rpc_error(self):
client = self._client()
resp = client.post(
"/mcp",
json=self._rpc(
"tools/call",
{"name": "wechat.nope", "arguments": {}},
),
)
self.assertEqual(resp.status_code, 200)
payload = resp.json()
self.assertEqual(payload["error"]["code"], -32601)
def test_missing_tool_name_returns_invalid_params(self):
client = self._client()
resp = client.post("/mcp", json=self._rpc("tools/call", {"arguments": {}}))
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["error"]["code"], -32602)
def test_non_object_arguments_returns_invalid_params(self):
client = self._client()
resp = client.post(
"/mcp",
json=self._rpc("tools/call", {"name": "wechat.core.get_status", "arguments": []}),
)
self.assertEqual(resp.status_code, 200)
self.assertEqual(resp.json()["error"]["code"], -32602)
def test_media_url_helpers_pass_supported_parameters(self):
client = self._client()
image_resp = client.post(
"/mcp",
json=self._rpc(
"wechat.media.get_chat_image_url",
{
"md5": "abc",
"file_id": "fid",
"msg_svr_id": 123,
"username": "wxid_a",
"account": "wxid_acc",
"deep_scan": True,
"prefer_live": True,
},
),
)
image = image_resp.json()["result"]["structuredContent"]
self.assertIn("/api/chat/media/image?", image["url"])
self.assertEqual(image["params"]["server_id"], 123)
self.assertEqual(image["params"]["file_id"], "fid")
self.assertTrue(image["params"]["deep_scan"])
self.assertTrue(image["params"]["prefer_live"])
moments_resp = client.post(
"/mcp",
json=self._rpc(
"wechat.moments.get_media_url",
{"tid": "post-a", "media_id": "media-a", "md5": "deadbeef", "use_cache": 1},
),
)
moments = moments_resp.json()["result"]["structuredContent"]
self.assertIn("/api/sns/media?", moments["url"])
self.assertEqual(moments["params"]["post_id"], "post-a")
self.assertEqual(moments["params"]["media_id"], "media-a")
def test_completed_mcp_packages_and_mobile_facade_are_listed(self):
client = self._client()
resp = client.post("/mcp", json=self._rpc("tools/list"))
self.assertEqual(resp.status_code, 200)
tools = resp.json()["result"]["tools"]
names = {tool["name"] for tool in tools}
expected = {
"wechat.setup.get_saved_keys",
"wechat.setup.decrypt_databases",
"wechat.setup.get_decrypt_stream_url",
"wechat.setup.preview_import_decrypted",
"wechat.setup.get_decrypt_all_media_stream_url",
"wechat.system.health_check",
"wechat.system.get_backend_port",
"wechat.system.get_mcp_lan_access",
"wechat.system.set_mcp_lan_access",
"wechat.system.get_img_helper_status",
"wechat.system.open_backend_log_file",
"wechat.system.pick_directory",
"wechat.system.set_backend_port_and_restart",
"wechat.media.get_decrypted_resource_url",
"wechat.media.get_proxy_image_url",
"wechat.media.get_favicon_url",
"wechat.media.open_chat_media_folder",
"wechat.export.get_chat_export_events_url",
"wechat.export.get_moments_export_events_url",
"wechat.chat.get_realtime_events_url",
"wechat.admin.delete_account_data",
"wechat.mobile.get_overview",
"wechat.mobile.resolve_target",
"wechat.mobile.search_chat",
"wechat.mobile.get_chat_context",
"wechat.mobile.search_moments",
"wechat.mobile.get_media_links",
"wechat.mobile.export_job",
}
self.assertTrue(expected.issubset(names))
self.assertNotIn("search_memory", names)
self.assertNotIn("transcribe_voice_message", names)
self.assertNotIn("transcribe_audio_file", names)
packages = {tool["annotations"]["package"] for tool in tools}
self.assertTrue({"wechat.setup", "wechat.system", "wechat.mobile"}.issubset(packages))
def test_new_url_helpers_return_urls_and_params(self):
client = self._client()
checks = [
(
"wechat.setup.get_decrypt_stream_url",
{"key": "a" * 64, "db_storage_path": r"D:\WeChat\db_storage"},
"streamUrl",
"/api/decrypt_stream?",
),
(
"wechat.setup.get_import_decrypted_stream_url",
{"import_path": r"D:\backup\wxid_a", "job_id": "job-1"},
"streamUrl",
"/api/import_decrypted?",
),
(
"wechat.setup.get_decrypt_all_media_stream_url",
{"account": "wxid_a", "xor_key": "0xA5", "concurrency": 3},
"streamUrl",
"/api/media/decrypt_all_stream?",
),
(
"wechat.setup.get_download_all_emojis_stream_url",
{"account": "wxid_a", "force": True, "concurrency": 4},
"streamUrl",
"/api/media/emoji/download_all_stream?",
),
(
"wechat.media.get_decrypted_resource_url",
{"account": "wxid_a", "md5": "a" * 32},
"url",
"/api/media/resource/",
),
(
"wechat.chat.get_realtime_events_url",
{"account": "wxid_a", "interval_ms": 300},
"streamUrl",
"/api/chat/realtime/stream?",
),
(
"wechat.export.get_chat_export_events_url",
{"export_id": "exp-1"},
"streamUrl",
"/api/chat/exports/exp-1/events",
),
(
"wechat.export.get_moments_export_events_url",
{"export_id": "sns-1"},
"streamUrl",
"/api/sns/exports/sns-1/events",
),
]
for tool_name, args, url_key, path_part in checks:
with self.subTest(tool_name=tool_name):
resp = client.post("/mcp", json=self._rpc(tool_name, args))
self.assertEqual(resp.status_code, 200)
structured = resp.json()["result"]["structuredContent"]
self.assertEqual(structured["status"], "success")
self.assertIn(path_part, structured[url_key])
def test_setup_and_system_wrappers_call_underlying_router(self):
client = self._client()
keys_router = Mock()
keys_router.get_saved_keys = AsyncMock(return_value={"status": "success", "keys": {"db_key": "k"}})
system_router = Mock()
system_router.get_img_helper_status = AsyncMock(return_value={"enabled": False})
with patch("wechat_decrypt_tool.mcp.tools._keys_router", return_value=keys_router), patch(
"wechat_decrypt_tool.mcp.tools._system_router", return_value=system_router
):
keys_resp = client.post(
"/mcp",
json=self._rpc("wechat.setup.get_saved_keys", {"account": "wxid_a", "wxid_dir": r"D:\WeChat\wxid_a"}),
)
helper_resp = client.post("/mcp", json=self._rpc("wechat.system.get_img_helper_status"))
self.assertEqual(keys_resp.status_code, 200)
self.assertEqual(keys_resp.json()["result"]["structuredContent"]["keys"]["db_key"], "k")
keys_router.get_saved_keys.assert_awaited_once_with(account="wxid_a", db_storage_path=None, wxid_dir=r"D:\WeChat\wxid_a")
self.assertEqual(helper_resp.json()["result"]["structuredContent"], {"enabled": False})
def test_mobile_overview_uses_compact_facade(self):
client = self._client()
with patch("wechat_decrypt_tool.mcp.tools._list_decrypted_accounts", return_value=["wxid_a"]), patch(
"wechat_decrypt_tool.mcp.tools._get_account_info",
return_value={"status": "success", "account": "wxid_a", "databaseCount": 3},
), patch(
"wechat_decrypt_tool.mcp.tools._list_sessions",
return_value={"status": "success", "sessions": [{"username": "friend", "displayName": "Friend"}]},
), patch(
"wechat_decrypt_tool.mcp.tools._chat_realtime_status", return_value={"status": "success", "available": True}
), patch(
"wechat_decrypt_tool.mcp.tools._search_index_status", return_value={"status": "ready"}
), patch(
"wechat_decrypt_tool.mcp.tools._session_last_message_status", return_value={"status": "ready"}
):
resp = client.post(
"/mcp",
json=self._rpc("wechat.mobile.get_overview", {"account": "wxid_a", "session_limit": 5}),
)
self.assertEqual(resp.status_code, 200)
structured = resp.json()["result"]["structuredContent"]
self.assertTrue(structured["ok"])
self.assertTrue(structured["ready"])
self.assertEqual(structured["defaultAccount"], "wxid_a")
self.assertIn("wechat.mobile.search_chat", structured["suggestedTools"])
def test_mobile_resolve_target_normalizes_candidates(self):
client = self._client()
with patch(
"wechat_decrypt_tool.mcp.tools._resolve_contact",
return_value={"status": "success", "candidates": [{"username": "wxid_friend", "remark": "Alice", "confidence": 92}]},
), patch(
"wechat_decrypt_tool.mcp.tools._resolve_session",
return_value={"status": "success", "candidates": [{"username": "chatroom", "displayName": "Alice Group", "confidence": 75}]},
), patch(
"wechat_decrypt_tool.mcp.tools._sns_users", return_value={"status": "success", "users": []}
), patch(
"wechat_decrypt_tool.mcp.tools._biz_accounts", return_value={"status": "success", "accounts": []}
):
resp = client.post("/mcp", json=self._rpc("wechat.mobile.resolve_target", {"query": "Alice", "limit": 5}))
self.assertEqual(resp.status_code, 200)
structured = resp.json()["result"]["structuredContent"]
self.assertEqual(structured["status"], "success")
self.assertEqual(structured["best"]["username"], "wxid_friend")
self.assertEqual(structured["best"]["kind"], "contact")
def test_mobile_media_links_does_not_fetch_binary_content(self):
client = self._client()
resp = client.post(
"/mcp",
json=self._rpc(
"wechat.mobile.get_media_links",
{"kind": "favicon", "url": "https://example.com/article", "max_items": 5},
),
)
self.assertEqual(resp.status_code, 200)
structured = resp.json()["result"]["structuredContent"]
self.assertEqual(structured["status"], "success")
self.assertEqual(structured["resources"][0]["kind"], "favicon")
self.assertIn("/api/chat/media/favicon?", structured["resources"][0]["url"])
def test_invalid_json_returns_parse_error(self):
client = self._client()
resp = client.post("/mcp", content="{not-json", headers={"Content-Type": "application/json"})
self.assertEqual(resp.status_code, 400)
self.assertEqual(resp.json()["error"]["code"], -32700)
if __name__ == "__main__":
unittest.main()
Generated
+1 -1
View File
@@ -872,7 +872,7 @@ wheels = [
[[package]]
name = "wechat-decrypt-tool"
version = "1.8.1"
version = "1.9.0"
source = { editable = "." }
dependencies = [
{ name = "aiofiles" },