Compare commits

...

15 Commits

29 changed files with 2100 additions and 134 deletions
+1
View File
@@ -17,6 +17,7 @@ wheels/
# Local config templates
/wechat_db_config_template.json
.ace-tool/
pnpm-lock.yaml
# Local dev repos and data
/WxDatDecrypt/
+9
View File
@@ -212,6 +212,15 @@ npm run dist
6. **[vue3-wechat-tool](https://github.com/Ele-Cat/vue3-wechat-tool)** - 微信聊天记录工具(Vue3)
- 提供了聊天记录展示与交互的实现参考
7. **[wx-dat](https://github.com/waaaaashi/wx-dat)** - 微信图片密钥获取工具
- 实现真正的无头获取图片密钥,不再依赖扫描微信内存与点击朋友圈大图
8. **PR #24 贡献者 [H3CoF6](https://github.com/H3CoF6)** - 微信密钥获取能力增强
- 无第三方工具依赖实现微信密钥获取能力
- 实现数据库密钥获取:实现形式参考 [wx_key](https://github.com/ycccccccy/wx_key) 项目,完成 Python 预编译 wheel 封装,详情见 [py_wx_key](https://github.com/H3CoF6/py_wx_key)
- 特征码不在 C++ 内硬编码,而由 Python 模块传入,减少 wheel 更新次数
- 实现真正的无头获取图片密钥,不再依赖扫描微信内存(以及点击朋友圈大图),感谢项目 [wx-dat](https://github.com/waaaaashi/wx-dat)
## Star History
[![Star History Chart](https://api.star-history.com/svg?repos=LifeArchiveProject/WeChatDataAnalysis&type=Date)](https://www.star-history.com/#LifeArchiveProject/WeChatDataAnalysis&Date)
+20 -2
View File
@@ -375,7 +375,22 @@ export const useApi = () => {
const url = `/wrapped/annual/cards/${safeId}` + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
// 获取微信进程状态
const getWxStatus = async () => {
return await request('/wechat/status')
}
// 获取数据库密钥
const getDbKey = async () => {
return await request('/get_db_key')
}
// 获取图片密钥
const getImageKey = async () => {
return await request('/get_image_key')
}
return {
detectWechat,
detectCurrentAccount,
@@ -408,6 +423,9 @@ export const useApi = () => {
exportChatContacts,
getWrappedAnnual,
getWrappedAnnualMeta,
getWrappedAnnualCard
getWrappedAnnualCard,
getDbKey,
getImageKey,
getWxStatus,
}
}
+3
View File
@@ -19,5 +19,8 @@
"ogl": "^1.0.11",
"vue": "^3.5.17",
"vue-router": "^4.5.1"
},
"devDependencies": {
"tailwindcss": "3.4.17"
}
}
+89 -32
View File
@@ -242,14 +242,20 @@
@click="selectContact(contact)">
<div class="flex items-center space-x-3 w-full">
<!-- 联系人头像 -->
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300 flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
<div v-if="contact.avatar" class="w-full h-full">
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
</div>
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
{{ contact.name.charAt(0) }}
<div class="relative flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300">
<div v-if="contact.avatar" class="w-full h-full">
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
</div>
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
{{ contact.name.charAt(0) }}
</div>
</div>
<span
v-if="contact.unreadCount > 0"
class="absolute z-10 -top-[calc(4px/var(--dpr))] -right-[calc(4px/var(--dpr))] w-[calc(10px/var(--dpr))] h-[calc(10px/var(--dpr))] bg-[#ed4d4d] rounded-full"
></span>
</div>
<!-- 联系人信息 -->
@@ -257,13 +263,12 @@
<div class="flex items-center justify-between">
<h3 class="text-sm font-medium text-gray-900 truncate" :class="{ 'privacy-blur': privacyMode }">{{ contact.name }}</h3>
<div class="flex items-center flex-shrink-0 ml-2">
<span v-if="contact.unreadCount > 0" class="text-[10px] text-white bg-red-500 rounded-full min-w-[18px] h-[18px] px-1 flex items-center justify-center mr-2">
{{ contact.unreadCount > 99 ? '99+' : contact.unreadCount }}
</span>
<span class="text-xs text-gray-500">{{ contact.lastMessageTime }}</span>
</div>
</div>
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">{{ contact.lastMessage }}</p>
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">
{{ contact.unreadCount > 0 ? `[${contact.unreadCount > 99 ? '99+' : contact.unreadCount}条] ` : '' }}{{ contact.lastMessage }}
</p>
</div>
</div>
</div>
@@ -616,15 +621,15 @@
<img v-else :src="seg.emojiSrc" :alt="seg.content" class="inline-block w-[1.25em] h-[1.25em] align-text-bottom mx-px">
</span>
</div>
<div
v-if="message.quoteTitle || message.quoteContent"
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[61px] flex items-center bg-[#e1e1e1]">
<div class="py-2 min-w-0 w-full">
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
<button
type="button"
class="flex items-center gap-1 min-w-0 hover:opacity-80"
<div
v-if="message.quoteTitle || message.quoteContent"
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[65px] overflow-hidden flex items-start bg-[#e1e1e1]">
<div class="py-2 min-w-0 flex-1">
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
<button
type="button"
class="flex items-center gap-1 min-w-0 hover:opacity-80"
:disabled="!message.quoteVoiceUrl"
:class="!message.quoteVoiceUrl ? 'opacity-60 cursor-not-allowed' : ''"
@click.stop="message.quoteVoiceUrl && playQuoteVoice(message)"
@@ -647,13 +652,35 @@
:ref="el => setVoiceRef(getQuoteVoiceId(message), el)"
:src="message.quoteVoiceUrl"
preload="none"
class="hidden"
></audio>
</div>
<div v-else class="line-clamp-2">{{ message.quoteTitle ? (message.quoteTitle + ': ') : '' }}{{ message.quoteContent }}</div>
</div>
</div>
</template>
class="hidden"
></audio>
</div>
<div v-else class="line-clamp-2">
<span v-if="message.quoteTitle">{{ message.quoteTitle }}:</span>
<span
v-if="message.quoteContent && !(isQuotedImage(message) && message.quoteTitle && message.quoteImageUrl && !message._quoteImageError)"
:class="message.quoteTitle ? 'ml-1' : ''"
>
{{ message.quoteContent }}
</span>
</div>
</div>
<div
v-if="isQuotedImage(message) && message.quoteImageUrl && !message._quoteImageError"
class="ml-2 my-2 flex-shrink-0 w-[45px] h-[45px] rounded overflow-hidden cursor-pointer"
@click.stop="openImagePreview(message.quoteImageUrl)"
>
<img
:src="message.quoteImageUrl"
alt="引用图片"
class="w-full h-full object-contain"
loading="lazy"
decoding="async"
@error="onQuoteImageError(message)"
/>
</div>
</div>
</template>
<!-- 合并转发聊天记录Chat History -->
<div
v-else-if="message.renderType === 'chatHistory'"
@@ -3464,6 +3491,19 @@ const isQuotedVoice = (message) => {
return false
}
const isQuotedImage = (message) => {
const t = String(message?.quoteType || '').trim()
if (t === '3') return true
if (String(message?.quoteContent || '').trim() === '[图片]' && String(message?.quoteServerId || '').trim()) return true
return false
}
const onQuoteImageError = (message) => {
try {
if (message) message._quoteImageError = true
} catch {}
}
const playQuoteVoice = (message) => {
playVoice({ id: getQuoteVoiceId(message) })
}
@@ -4665,6 +4705,23 @@ const normalizeMessage = (msg) => {
}
}
const quoteServerIdStr = String(msg.quoteServerId || '').trim()
const quoteTypeStr = String(msg.quoteType || '').trim()
const quoteVoiceUrl = quoteServerIdStr
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(quoteServerIdStr)}`
: ''
const quoteImageUrl = (() => {
if (!quoteServerIdStr) return ''
if (quoteTypeStr !== '3' && String(msg.quoteContent || '').trim() !== '[图片]') return ''
const convUsername = String(selectedContact.value?.username || '').trim()
const parts = [
`account=${encodeURIComponent(selectedAccount.value || '')}`,
`server_id=${encodeURIComponent(quoteServerIdStr)}`,
convUsername ? `username=${encodeURIComponent(convUsername)}` : ''
].filter(Boolean)
return parts.length ? `${mediaBase}/api/chat/media/image?${parts.join('&')}` : ''
})()
return {
id: msg.id,
serverId: msg.serverId || 0,
@@ -4701,12 +4758,12 @@ const normalizeMessage = (msg) => {
quoteTitle: msg.quoteTitle || '',
quoteContent,
quoteUsername: msg.quoteUsername || '',
quoteServerId: String(msg.quoteServerId || '').trim(),
quoteType: String(msg.quoteType || '').trim(),
quoteServerId: quoteServerIdStr,
quoteType: quoteTypeStr,
quoteVoiceLength: msg.quoteVoiceLength || '',
quoteVoiceUrl: String(msg.quoteServerId || '').trim()
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(String(msg.quoteServerId || '').trim())}`
: '',
quoteVoiceUrl,
quoteImageUrl: quoteImageUrl || '',
_quoteImageError: false,
amount: msg.amount || '',
coverUrl: msg.coverUrl || '',
fileSize: msg.fileSize || '',
+152 -18
View File
@@ -26,24 +26,40 @@
<!-- 密钥输入 -->
<div>
<label for="key" class="block text-sm font-medium text-[#000000e6] mb-2">
<svg class="w-4 h-4 inline mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M15 7a2 2 0 012 2m4 0a6 6 0 01-7.743 5.743L11 17H9v2H7v2H4a1 1 0 01-1-1v-2.586a1 1 0 01.293-.707l5.964-5.964A6 6 0 1121 9z"/>
</svg>
解密密钥 <span class="text-red-500">*</span>
</label>
<div class="relative">
<input
id="key"
v-model="formData.key"
type="text"
placeholder="请输入64位十六进制密钥"
class="w-full px-4 py-3 bg-white border border-[#EDEDED] rounded-lg font-mono text-sm focus:outline-none focus:ring-2 focus:ring-[#07C160] focus:border-transparent transition-all duration-200"
:class="{ 'border-red-500': formErrors.key }"
required
/>
<div v-if="formData.key" class="absolute right-3 top-1/2 transform -translate-y-1/2">
<span class="text-xs text-[#7F7F7F]">{{ formData.key.length }}/64</span>
<div class="flex gap-3">
<div class="relative flex-1">
<input
id="key"
v-model="formData.key"
type="text"
placeholder="请输入64位十六进制密钥"
class="w-full px-4 py-3 bg-white border border-[#EDEDED] rounded-lg font-mono text-sm focus:outline-none focus:ring-2 focus:ring-[#07C160] focus:border-transparent transition-all duration-200"
:class="{ 'border-red-500': formErrors.key }"
required
/>
<div v-if="formData.key" class="absolute right-3 top-1/2 transform -translate-y-1/2">
<span class="text-xs text-[#7F7F7F]">{{ formData.key.length }}/64</span>
</div>
</div>
<button
type="button"
@click="handleGetDbKey"
:disabled="isGettingDbKey"
class="flex-none inline-flex items-center px-4 py-3 bg-[#07C160] text-white rounded-lg text-sm font-medium hover:bg-[#06AD56] transition-all duration-200 disabled:opacity-50 disabled:cursor-wait whitespace-nowrap"
>
<svg v-if="isGettingDbKey" class="animate-spin -ml-1 mr-2 h-4 w-4 text-white" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
<path class="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4z"></path>
</svg>
<svg v-else class="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
{{ isGettingDbKey ? '获取中...' : '自动获取' }}
</button>
</div>
<p v-if="formErrors.key" class="mt-1 text-sm text-red-600 flex items-center">
<svg class="w-4 h-4 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
@@ -55,7 +71,7 @@
<svg class="w-4 h-4 mr-1 text-[#10AEEF]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"/>
</svg>
使用 <a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a> 等工具获取的64位十六进制字符串
尝试自动获取或者使用 <a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a> 等工具获取的64位十六进制字符串
</p>
</div>
@@ -131,6 +147,26 @@
<!-- 填写密钥 -->
<div class="mb-6">
<div class="bg-gray-50 rounded-lg p-4">
<div class="flex justify-between items-center mb-4 pb-3 border-b border-gray-200">
<span class="text-sm font-medium text-gray-500">手动输入或通过微信获取</span>
<button
type="button"
@click="handleGetImageKey"
:disabled="isGettingImageKey"
class="flex-none inline-flex items-center px-4 py-3 bg-[#07C160] text-white rounded-lg text-sm font-medium hover:bg-[#06AD56] transition-all duration-200 disabled:opacity-50 disabled:cursor-wait whitespace-nowrap"
>
<svg v-if="isGettingImageKey" class="animate-spin -ml-1 mr-2 h-4 w-4 text-white" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
<path class="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4z"></path>
</svg>
<svg v-else class="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
{{ isGettingImageKey ? '正在获取...' : '自动获取' }}
</button>
</div>
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
<div>
<label class="block text-sm font-medium text-[#000000e6] mb-2">XOR必填</label>
@@ -158,7 +194,7 @@
<svg class="w-4 h-4 mr-1 text-[#10AEEF]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"/>
</svg>
使用 <a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a> 获取图片密钥AES 可选V4-V2 需要
尝试自动获取使用 <a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a> 获取图片密钥AES 可选V4-V2 需要
</p>
</div>
</div>
@@ -325,6 +361,19 @@
</div>
</div>
</div>
<!-- 警告渲染 -->
<transition name="fade">
<div v-if="warning" class="bg-amber-50 border border-amber-200 rounded-lg p-4 mt-6 flex items-start">
<svg class="h-5 w-5 mr-2 flex-shrink-0 text-amber-500" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M12 9v2m0 4h.01m-6.938 4h13.856c1.54 0 2.502-1.667 1.732-3L13.732 4c-.77-1.333-2.694-1.333-3.464 0L3.34 16c-.77 1.333.192 3 1.732 3z"/>
</svg>
<div>
<p class="font-semibold text-amber-800">温馨提示</p>
<p class="text-sm mt-1 text-amber-700">{{ warning }}</p>
</div>
</div>
</transition>
<!-- 错误提示 -->
<transition name="fade">
@@ -367,12 +416,15 @@
import { ref, reactive, computed, onMounted } from 'vue'
import { useApi } from '~/composables/useApi'
const { decryptDatabase, saveMediaKeys, getSavedKeys } = useApi()
const { decryptDatabase, saveMediaKeys, getSavedKeys, getDbKey, getImageKey, getWxStatus } = useApi()
const loading = ref(false)
const error = ref('')
const warning = ref('') // 警告,用于密钥提示
const currentStep = ref(0)
const mediaAccount = ref('')
const isGettingDbKey = ref(false)
const isGettingImageKey = ref(false)
// 步骤定义
const steps = [
@@ -453,10 +505,89 @@ const prefillKeysForAccount = async (account) => {
}
}
const handleGetDbKey = async () => {
if (isGettingDbKey.value) return
isGettingDbKey.value = true
error.value = ''
warning.value = ''
formErrors.key = ''
try {
const statusRes = await getWxStatus() // pid不是主进程,但是没关系
const wxStatus = statusRes?.wx_status
if (wxStatus?.is_running) {
warning.value = '检测到微信正在运行,5秒后将终止进程并重启以获取密钥!!'
await new Promise(resolve => setTimeout(resolve, 5000))
} else {
// 没有逻辑
}
warning.value = '正在启动微信以获取密钥,请确保微信未开启“自动登录”,并在启动后 1 分钟内完成登录操作。'
const res = await getDbKey()
if (res && res.status === 0) {
if (res.data?.db_key) {
formData.key = res.data.db_key
warning.value = ''
}
if (res.errmsg && res.errmsg !== 'ok') {
warning.value = res.errmsg
}
} else {
error.value = '获取失败: ' + (res?.errmsg || '未知错误')
}
} catch (e) {
console.error(e)
error.value = '系统错误: ' + e.message
} finally {
isGettingDbKey.value = false
}
}
const handleGetImageKey = async () => {
if (isGettingImageKey.value) return
isGettingImageKey.value = true
manualKeyErrors.xor_key = ''
manualKeyErrors.aes_key = ''
error.value = ''
warning.value = ''
try {
const res = await getImageKey()
if (res && res.status === 0) {
if (res.data?.aes_key) {
manualKeys.aes_key = res.data.aes_key
}
if (res.data?.xor_key) {
// 后端记得处理为16进制再返回!!!
manualKeys.xor_key = res.data.xor_key
}
if (res.errmsg && res.errmsg !== 'ok') {
error.value = res.errmsg
}
} else {
error.value = '获取失败: ' + (res?.errmsg || '未知错误')
}
} catch (e) {
console.error(e)
error.value = '系统错误: ' + e.message
} finally {
isGettingImageKey.value = false
}
}
const applyManualKeys = () => {
manualKeyErrors.xor_key = ''
manualKeyErrors.aes_key = ''
error.value = ''
warning.value = ''
const aes = normalizeAesKey(manualKeys.aes_key)
if (!aes.ok) {
@@ -550,6 +681,7 @@ const handleDecrypt = async () => {
loading.value = true
error.value = ''
warning.value = ''
try {
const result = await decryptDatabase({
@@ -596,6 +728,7 @@ const decryptAllImages = async () => {
mediaDecrypting.value = true
mediaDecryptResult.value = null
error.value = ''
warning.value = ''
// 重置进度
decryptProgress.current = 0
@@ -671,6 +804,7 @@ const decryptAllImages = async () => {
// 从密钥步骤进入图片解密步骤
const goToMediaDecryptStep = async () => {
error.value = ''
warning.value = ''
// 校验并应用(未填写则允许直接进入,后端会使用已保存密钥或报错提示)
const ok = applyManualKeys()
if (!ok || manualKeyErrors.xor_key || manualKeyErrors.aes_key) return
+6
View File
@@ -19,6 +19,9 @@ dependencies = [
"zstandard>=0.23.0",
"pilk>=0.2.4",
"pypinyin>=0.53.0",
"wx_key",
"packaging",
"httpx",
]
[project.optional-dependencies]
@@ -40,3 +43,6 @@ include = [
"src/wechat_decrypt_tool/native/wcdb_api.dll",
"src/wechat_decrypt_tool/native/WCDB.dll",
]
[tool.uv]
find-links = ["./tools/key_wheels/"]
@@ -28,6 +28,7 @@ from .chat_helpers import (
_load_contact_rows,
_lookup_resource_md5,
_parse_app_message,
_parse_system_message_content,
_parse_pat_message,
_pick_display_name,
_quote_ident,
@@ -954,13 +955,7 @@ def _parse_message_for_export(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re as _re
content_text = _re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = _re.sub(r"\\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
+337 -11
View File
@@ -645,6 +645,43 @@ def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str:
return _extract_xml_attr(xml_text, name)
def _parse_system_message_content(raw_text: str) -> str:
text = str(raw_text or "").strip()
if not text:
return "[系统消息]"
def _clean_system_text(value: str) -> str:
candidate = str(value or "").strip()
if not candidate:
return ""
nested_content = _extract_xml_tag_text(candidate, "content")
if nested_content:
candidate = nested_content
candidate = re.sub(r"<!\[CDATA\[", "", candidate, flags=re.IGNORECASE)
candidate = re.sub(r"\]\]>", "", candidate)
candidate = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", candidate)
candidate = re.sub(r"\s+", " ", candidate).strip()
return candidate
if "revokemsg" in text.lower():
replace_msg = _extract_xml_tag_text(text, "replacemsg")
cleaned_replace_msg = _clean_system_text(replace_msg)
if cleaned_replace_msg:
return cleaned_replace_msg
revoke_msg = _extract_xml_tag_text(text, "revokemsg")
cleaned_revoke_msg = _clean_system_text(revoke_msg)
if cleaned_revoke_msg:
return cleaned_revoke_msg
return "撤回了一条消息"
content_text = _clean_system_text(text)
return content_text or "[系统消息]"
def _extract_refermsg_block(xml_text: str) -> str:
if not xml_text:
return ""
@@ -1053,11 +1090,7 @@ def _build_latest_message_preview(
content_text = ""
if local_type == 10000:
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 244813135921:
parsed = _parse_app_message(raw_text)
qt = str(parsed.get("quoteTitle") or "").strip()
@@ -1093,7 +1126,7 @@ def _build_latest_message_preview(
elif local_type == 43 or local_type == 62:
content_text = "[视频]"
elif local_type == 47:
content_text = "[表情]"
content_text = "[动画表情]"
else:
if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
content_text = raw_text
@@ -1107,6 +1140,101 @@ def _build_latest_message_preview(
return content_text
def _extract_group_preview_sender_username(preview_text: str) -> str:
text = str(preview_text or "").strip()
if not text:
return ""
match = re.match(r"^([^:\s]{1,128}):\s*.+$", text)
if not match:
return ""
sender = str(match.group(1) or "").strip()
if not sender:
return ""
if sender.startswith("wxid_") or sender.endswith("@chatroom") or ("@" in sender):
return sender
if re.fullmatch(r"[A-Za-z][A-Za-z0-9_-]{1,127}", sender):
return sender
return ""
def _normalize_session_preview_text(
preview_text: str,
*,
is_group: bool,
sender_display_names: Optional[dict[str, str]] = None,
) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
text = text.replace("[表情]", "[动画表情]")
if (not is_group) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\s]{1,128}):\s*(.+)$", text)
if not match:
return text
sender_username = str(match.group(1) or "").strip()
body = str(match.group(2) or "").strip()
if (not sender_username) or (not body):
return text
display_name = str((sender_display_names or {}).get(sender_username) or "").strip()
if display_name and display_name != sender_username:
return f"{display_name}: {body}"
return text
def _replace_preview_sender_prefix(preview_text: str, sender_display_name: str) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
display_name = str(sender_display_name or "").strip()
if (not display_name) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if not match:
return text
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if not body:
return text
return f"{display_name}: {body}"
def _build_group_sender_display_name_map(
contact_db_path: Path,
previews: dict[str, str],
) -> dict[str, str]:
group_sender_usernames: set[str] = set()
for conv_username, preview_text in previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username:
group_sender_usernames.add(sender_username)
if not group_sender_usernames:
return {}
display_names: dict[str, str] = {}
sender_contact_rows = _load_contact_rows(contact_db_path, list(group_sender_usernames))
for sender_username in group_sender_usernames:
row = sender_contact_rows.get(sender_username)
if row is None:
continue
display_name = _pick_display_name(row, sender_username)
if display_name and display_name != sender_username:
display_names[sender_username] = display_name
return display_names
def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
if not usernames:
return {}
@@ -1338,6 +1466,208 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
conn.close()
def _load_group_nickname_map_from_contact_db(
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
) -> dict[str, str]:
"""Best-effort mapping for group member nickname (aka group card) from contact.db.
WeChat stores per-chatroom member nicknames in `contact.db.chat_room.ext_buffer` as a protobuf-like blob.
This helper parses that blob and returns { sender_username -> group_nickname } for the requested senders.
Notes:
- Best-effort: never raises; returns {} on any failure.
- Only resolves usernames included in `sender_usernames` to keep parsing cheap.
"""
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
target_set = set(targets)
def decode_varint(raw: bytes, offset: int) -> tuple[Optional[int], int]:
value = 0
shift = 0
pos = int(offset)
n = len(raw)
while pos < n:
byte = raw[pos]
pos += 1
value |= (byte & 0x7F) << shift
if (byte & 0x80) == 0:
return value, pos
shift += 7
if shift > 63:
return None, n
return None, n
def iter_fields(raw: bytes):
idx = 0
n = len(raw)
while idx < n:
tag, idx_next = decode_varint(raw, idx)
if tag is None or idx_next <= idx:
break
idx = idx_next
field_no = int(tag) >> 3
wire_type = int(tag) & 0x7
if wire_type == 0:
_, idx_next = decode_varint(raw, idx)
if idx_next <= idx:
break
idx = idx_next
continue
if wire_type == 2:
size, idx_next = decode_varint(raw, idx)
if size is None or idx_next <= idx:
break
idx = idx_next
end = idx + int(size)
if end > n:
break
chunk = raw[idx:end]
idx = end
yield field_no, wire_type, chunk
continue
if wire_type == 1:
idx += 8
continue
if wire_type == 5:
idx += 4
continue
break
def is_strong_username_hint(s: str) -> bool:
v = str(s or "").strip()
return v.startswith("wxid_") or v.endswith("@chatroom") or v.startswith("gh_") or ("@" in v)
def looks_like_username(s: str) -> bool:
v = str(s or "").strip()
if not v:
return False
if is_strong_username_hint(v):
return True
# Common alias-style WeChat IDs are ASCII-ish and do not contain whitespace.
if len(v) < 6 or len(v) > 32:
return False
if re.search(r"\s", v):
return False
if not re.match(r"^[A-Za-z][A-Za-z0-9_-]+$", v):
return False
if v.isdigit():
return False
return True
def pick_display(strings: list[tuple[int, str]], target: str) -> str:
best_score = -1
best = ""
for i, (fno, value) in enumerate(strings):
v = str(value or "").strip()
if (not v) or v == target:
continue
if is_strong_username_hint(v):
continue
if "\n" in v or "\r" in v:
continue
if len(v) > 64:
continue
score = 0
if int(fno) == 2:
score += 100
if not looks_like_username(v):
score += 20
score += max(0, 32 - len(v))
# Stable tie-breaker: prefer earlier appearance.
score = score * 1000 - i
if score > best_score:
best_score = score
best = v
return best
try:
conn = sqlite3.connect(str(contact_db_path))
except Exception:
return {}
try:
row = conn.execute(
"SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1",
(chatroom,),
).fetchone()
if row is None:
return {}
ext = row[0]
if ext is None:
return {}
if isinstance(ext, memoryview):
ext_buf = ext.tobytes()
elif isinstance(ext, (bytes, bytearray)):
ext_buf = bytes(ext)
else:
return {}
if not ext_buf:
return {}
out: dict[str, str] = {}
for _, wire_type, chunk in iter_fields(ext_buf):
if wire_type != 2 or (not chunk):
continue
# Parse submessage and collect UTF-8 strings.
strings: list[tuple[int, str]] = []
try:
for sfno, swire, sval in iter_fields(chunk):
if swire != 2:
continue
if not sval:
continue
if len(sval) > 256:
continue
try:
txt = bytes(sval).decode("utf-8", errors="strict")
except Exception:
continue
txt = txt.strip()
if not txt:
continue
strings.append((int(sfno), txt))
except Exception:
continue
if not strings:
continue
present = [v for _, v in strings if v in target_set and v not in out]
if not present:
continue
for target in present:
disp = pick_display(strings, target)
if disp:
out[target] = disp
if len(out) >= len(target_set):
break
return out
except Exception:
return {}
finally:
try:
conn.close()
except Exception:
pass
def _load_usernames_by_display_names(contact_db_path: Path, names: list[str]) -> dict[str, str]:
"""Best-effort mapping from display name -> username using contact.db.
@@ -1515,11 +1845,7 @@ def _row_to_search_hit(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
+357
View File
@@ -0,0 +1,357 @@
# import sys
# import requests
try:
import wx_key
except ImportError:
print('[!] 环境中未安装wx_key依赖,可能无法自动获取数据库密钥')
wx_key = None
# sys.exit(1)
import time
import psutil
import subprocess
import hashlib
import os
import json
import random
import logging
import httpx
from pathlib import Path
from typing import Optional, List, Dict, Any
from dataclasses import dataclass
from packaging import version as pkg_version # 建议使用 packaging 库处理版本比较
from .wechat_detection import detect_wechat_installation
from .key_store import upsert_account_keys_in_store
from .media_helpers import _resolve_account_dir, _resolve_account_wxid_dir
logger = logging.getLogger(__name__)
# ====================== 以下是hook逻辑 ======================================
@dataclass
class HookConfig:
min_version: str
pattern: str # 用 00 不要用 ? !!!! 否则C++内存会炸
mask: str
offset: int
class WeChatKeyFetcher:
def __init__(self):
self.process_name = "Weixin.exe"
self.timeout_seconds = 60
@staticmethod
def _hex_array_to_str(hex_array: List[int]) -> str:
return " ".join([f"{b:02X}" for b in hex_array])
def _get_hook_config(self, version_str: str) -> Optional[HookConfig]:
"""搬运自wx_key代码,未来用ida脚本直接获取即可"""
try:
v_curr = pkg_version.parse(version_str)
except Exception as e:
logger.error(f"版本号解析失败: {version_str} || {e}")
return None
if v_curr > pkg_version.parse("4.1.6.14"):
return HookConfig(
min_version=">4.1.6.14",
pattern=self._hex_array_to_str([
0x24, 0x50, 0x48, 0xC7, 0x45, 0x00, 0xFE, 0xFF, 0xFF, 0xFF,
0x44, 0x89, 0xCF, 0x44, 0x89, 0xC3, 0x49, 0x89, 0xD6, 0x48,
0x89, 0xCE, 0x48, 0x89
]),
mask="xxxxxxxxxxxxxxxxxxxxxxxx",
offset=-3
)
if pkg_version.parse("4.1.4") <= v_curr <= pkg_version.parse("4.1.6.14"):
return HookConfig(
min_version="4.1.4-4.1.6.14",
pattern=self._hex_array_to_str([
0x24, 0x08, 0x48, 0x89, 0x6c, 0x24, 0x10, 0x48, 0x89, 0x74,
0x00, 0x18, 0x48, 0x89, 0x7c, 0x00, 0x20, 0x41, 0x56, 0x48,
0x83, 0xec, 0x50, 0x41
]),
mask="xxxxxxxxxx?xxxx?xxxxxxxx",
offset=-3
)
if v_curr < pkg_version.parse("4.1.4"):
return HookConfig(
min_version="<4.1.4",
pattern=self._hex_array_to_str([
0x24, 0x50, 0x48, 0xc7, 0x45, 0x00, 0xfe, 0xff, 0xff, 0xff,
0x44, 0x89, 0xcf, 0x44, 0x89, 0xc3, 0x49, 0x89, 0xd6, 0x48,
0x89, 0xce, 0x48, 0x89
]),
mask="xxxxxxxxxxxxxxxxxxxxxxxx",
offset=-15 # -0xf
)
return None
def kill_wechat(self):
"""检测并查杀微信进程"""
killed = False
for proc in psutil.process_iter(['pid', 'name']):
try:
if proc.info['name'] == self.process_name:
logger.info(f"Killing WeChat process: {proc.info['pid']}")
proc.terminate()
killed = True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
pass
if killed:
time.sleep(1) # 等待完全退出
def launch_wechat(self, exe_path: str) -> int:
"""启动微信并返回 PID"""
try:
process = subprocess.Popen(exe_path)
time.sleep(2)
candidates = []
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
if proc.info['name'] == self.process_name:
candidates.append(proc)
if candidates:
candidates.sort(key=lambda x: x.info['create_time'], reverse=True)
target_pid = candidates[0].info['pid']
return target_pid
return process.pid
except Exception as e:
logger.error(f"启动微信失败: {e}")
raise RuntimeError(f"无法启动微信: {e}")
def fetch_key(self) -> str:
"""没有wx_key模块无法自动获取密钥"""
if wx_key is None:
raise RuntimeError("wx_key 模块未安装或加载失败")
install_info = detect_wechat_installation()
exe_path = install_info.get('wechat_exe_path')
version = install_info.get('wechat_version')
if not exe_path or not version:
raise RuntimeError("无法自动定位微信安装路径或版本")
logger.info(f"Detect WeChat: {version} at {exe_path}")
config = self._get_hook_config(version)
if not config:
raise RuntimeError(f"不支持的微信版本: {version}")
self.kill_wechat()
pid = self.launch_wechat(exe_path)
logger.info(f"WeChat launched, PID: {pid}")
logger.info(f"Initializing Hook with pattern: {config.pattern[:20]}... Offset: {config.offset}")
if not wx_key.initialize_hook(pid, "", config.pattern, config.mask, config.offset):
err = wx_key.get_last_error_msg()
raise RuntimeError(f"Hook初始化失败: {err}")
start_time = time.time()
try:
while True:
if time.time() - start_time > self.timeout_seconds:
raise TimeoutError("获取密钥超时 (60s)")
key = wx_key.poll_key_data()
if key:
found_key = key
break
while True:
msg, level = wx_key.get_status_message()
if msg is None:
break
if level == 2:
logger.error(f"[Hook Error] {msg}")
time.sleep(0.1)
finally:
logger.info("Cleaning up hook...")
wx_key.cleanup_hook()
if found_key:
return found_key
else:
raise RuntimeError("未知错误,未获取到密钥")
def get_db_key_workflow():
fetcher = WeChatKeyFetcher()
return fetcher.fetch_key()
# ============================== 以下是图片密钥逻辑 =====================================
# 远程 API 配置
REMOTE_URL = "https://view.free.c3o.re/dashboard"
NEXT_ACTION_ID = "7c8f99280c70626ccf5960cc4a68f368197e15f8e9"
def get_wechat_internal_global_config(wx_dir: Path, file_name1) -> bytes:
"""
读微信目录下的主配置文件
"""
xwechat_files_root = wx_dir.parent
target_path = os.path.join(xwechat_files_root, "all_users", "config", file_name1)
if not os.path.exists(target_path):
logger.error(f"未找到微信内部 global_config: {target_path}")
raise FileNotFoundError(f"找不到文件: {target_path},请确认微信数据目录结构是否完整")
return Path(target_path).read_bytes()
# def get_local_config_sha3_224() -> bytes:
# """
# 不要在意,抽象的实现 哈哈哈
# """
# content = json.dumps({
# "wxfile_dir": "C:\\Users\\17078\\xwechat_files",
# "weixin_id_folder": "wxid_lnyf4hdo9csb12_f1c4",
# "cache_dir": "C:\\Users\\17078\\Desktop\\wxDBHook\\test\\wx-dat\\wx-dat\\.cache",
# "db_key": "",
# "port": 8001
# }, indent=4).encode("utf-8")
#
# # 计算 SHA3-224
# digest = hashlib.sha3_224(content).digest()
# return digest
# async def log_request(request):
# print(f"--- Request Raw ---")
# print(f"{request.method} {request.url} {request.extensions.get('http_version', b'HTTP/1.1').decode()}")
# for name, value in request.headers.items():
# print(f"{name}: {value}")
#
# print()
#
# body = request.read()
# if body:
# print(body.decode(errors='replace'))
# print(f"-------------------\n")
async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str, Any]:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
wxid = wx_id_dir.name
logger.info(f"正在为账号 {wxid} 获取密钥...")
try:
blob1_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1= "global_config") # 估计这是唯一有效的数据!!
logger.info(f"获取微信内部配置成功,大小: {len(blob1_bytes)} bytes")
except Exception as e:
raise RuntimeError(f"读取微信内部文件失败: {e}")
try:
blob2_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1= "global_config.crc")
logger.info(f"获取微信内部配置成功,大小: {len(blob2_bytes)} bytes")
except Exception as e:
raise RuntimeError(f"读取微信内部文件失败: {e}")
blob3_bytes = b""
headers = {
"Accept": "text/x-component",
"Next-Action": NEXT_ACTION_ID,
"Next-Router-State-Tree": "%5B%22%22%2C%7B%22children%22%3A%5B%22dashboard%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D",
"Origin": "https://view.free.c3o.re",
"Referer": "https://view.free.c3o.re/dashboard",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
}
files = {
'1': ('blob', blob1_bytes, 'application/octet-stream'),
'2': ('blob', blob2_bytes, 'application/octet-stream'),
'3': ('blob', blob3_bytes, 'application/octet-stream'),
'0': (None, json.dumps([wxid, "$A1", "$A2", "$A3", 0],separators=(",",":")).encode('utf-8')),
}
async with httpx.AsyncClient(timeout=30) as client:
logger.info("向远程服务器发送请求...")
response = await client.post(REMOTE_URL, headers=headers, files=files)
if response.status_code != 200:
raise RuntimeError(f"远程服务器错误: {response.status_code} - {response.text[:100]}")
result_data = {}
lines = response.text.split('\n')
found_config = False
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith('1:'):
try:
json_part = line[2:] # 去掉 "1:"
data_obj = json.loads(json_part)
if "config" in data_obj:
config = data_obj["config"]
result_data = {
"xor_key": config.get("xor_key", ""),
"aes_key": config.get("aes_key", ""),
"nick_name": config.get("nick_name", ""),
"avatar_url": config.get("avatar_url", "")
}
found_config = True
break
except Exception as e:
logger.warning(f"解析响应行失败: {e}")
continue
if not found_config or not result_data.get("aes_key"):
logger.error(f"响应中未找到密钥信息。Full Response: {response.text[:500]}")
raise RuntimeError("解析失败: 服务器未返回 config 数据")
# 6. 处理并保存密钥
xor_raw = str(result_data["xor_key"])
aes_val = str(result_data["aes_key"])
try:
if xor_raw.startswith("0x"):
xor_int = int(xor_raw, 16)
else:
xor_int = int(xor_raw)
xor_hex_str = f"0x{xor_int:02X}"
except:
xor_hex_str = xor_raw
upsert_account_keys_in_store(
account=wxid,
image_xor_key=xor_hex_str,
image_aes_key=aes_val
)
return {
"wxid": wxid,
"xor_key": xor_hex_str,
"aes_key": aes_val,
"nick_name": result_data["nick_name"]
}
+6 -5
View File
@@ -23,17 +23,17 @@ logger = get_logger(__name__)
# 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录)
_OUTPUT_DATABASES_DIR = get_output_databases_dir()
_PACKAGE_ROOT = Path(__file__).resolve().parent
def _list_decrypted_accounts() -> list[str]:
"""列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)"""
if not _OUTPUT_DATABASES_DIR.exists():
output_db_dir = get_output_databases_dir()
if not output_db_dir.exists():
return []
accounts: list[str] = []
for p in _OUTPUT_DATABASES_DIR.iterdir():
for p in output_db_dir.iterdir():
if not p.is_dir():
continue
if (p / "session.db").exists() and (p / "contact.db").exists():
@@ -45,6 +45,7 @@ def _list_decrypted_accounts() -> list[str]:
def _resolve_account_dir(account: Optional[str]) -> Path:
"""解析账号目录,并进行路径安全校验(防止路径穿越)"""
output_db_dir = get_output_databases_dir()
accounts = _list_decrypted_accounts()
if not accounts:
raise HTTPException(
@@ -53,8 +54,8 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
)
selected = account or accounts[0]
base = _OUTPUT_DATABASES_DIR.resolve()
candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
base = output_db_dir.resolve()
candidate = (output_db_dir / selected).resolve()
if candidate != base and base not in candidate.parents:
raise HTTPException(status_code=400, detail="Invalid account path.")
+341 -57
View File
@@ -39,11 +39,17 @@ from ..chat_helpers import (
_make_snippet,
_match_tokens,
_load_contact_rows,
_load_group_nickname_map_from_contact_db,
_load_usernames_by_display_names,
_load_latest_message_previews,
_build_group_sender_display_name_map,
_normalize_session_preview_text,
_extract_group_preview_sender_username,
_replace_preview_sender_prefix,
_lookup_resource_md5,
_normalize_xml_url,
_parse_app_message,
_parse_system_message_content,
_parse_pat_message,
_pick_display_name,
_query_head_image_usernames,
@@ -69,6 +75,8 @@ from ..wcdb_realtime import (
WCDB_REALTIME,
get_avatar_urls as _wcdb_get_avatar_urls,
get_display_names as _wcdb_get_display_names,
get_group_members as _wcdb_get_group_members,
get_group_nicknames as _wcdb_get_group_nicknames,
get_messages as _wcdb_get_messages,
get_sessions as _wcdb_get_sessions,
)
@@ -97,6 +105,142 @@ def _avatar_url_unified(
return _build_avatar_url(str(account_dir.name or ""), u)
def _load_group_nickname_map_from_wcdb(
*,
account_dir: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
except Exception:
return {}
target_set = set(targets)
out: dict[str, str] = {}
try:
with wcdb_conn.lock:
nickname_map = _wcdb_get_group_nicknames(wcdb_conn.handle, chatroom)
for username, nickname in (nickname_map or {}).items():
su = str(username or "").strip()
nn = str(nickname or "").strip()
if su and nn and su in target_set:
out[su] = nn
except Exception:
pass
unresolved = [u for u in targets if u not in out]
if not unresolved:
return out
try:
with wcdb_conn.lock:
members = _wcdb_get_group_members(wcdb_conn.handle, chatroom)
except Exception:
return out
if not members:
return out
unresolved_set = set(unresolved)
for member in members:
try:
username = str(member.get("username") or "").strip()
except Exception:
username = ""
if (not username) or (username not in unresolved_set):
continue
nickname = ""
for key in ("nickname", "displayName", "remark", "originalName"):
try:
candidate = str(member.get(key) or "").strip()
except Exception:
candidate = ""
if candidate:
nickname = candidate
break
if nickname:
out[username] = nickname
return out
def _load_group_nickname_map(
*,
account_dir: Path,
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
"""Resolve group member nickname (group card) via WCDB and contact.db ext_buffer (best-effort)."""
contact_map: dict[str, str] = {}
try:
contact_map = _load_group_nickname_map_from_contact_db(
contact_db_path,
chatroom_id,
sender_usernames,
)
except Exception:
contact_map = {}
wcdb_map: dict[str, str] = {}
try:
wcdb_map = _load_group_nickname_map_from_wcdb(
account_dir=account_dir,
chatroom_id=chatroom_id,
sender_usernames=sender_usernames,
rt_conn=rt_conn,
)
except Exception:
wcdb_map = {}
if not contact_map and not wcdb_map:
return {}
# Merge: WCDB wins (newer DLLs may provide higher-quality group nicknames).
merged: dict[str, str] = {}
merged.update(contact_map)
merged.update(wcdb_map)
return merged
def _resolve_sender_display_name(
*,
sender_username: str,
sender_contact_rows: dict[str, sqlite3.Row],
wcdb_display_names: dict[str, str],
group_nicknames: Optional[dict[str, str]] = None,
) -> str:
su = str(sender_username or "").strip()
if not su:
return ""
gn = str((group_nicknames or {}).get(su) or "").strip()
if gn:
return gn
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
return display_name
def _realtime_sync_lock(account: str, username: str) -> threading.Lock:
key = (str(account or "").strip(), str(username or "").strip())
with _REALTIME_SYNC_MU:
@@ -557,8 +701,11 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
"draft",
"last_timestamp",
"sort_timestamp",
"last_msg_locald_id",
"last_msg_type",
"last_msg_sub_type",
"last_msg_sender",
"last_sender_display_name",
]
update_cols = [c for c in desired_cols if c in cols]
if not update_cols:
@@ -583,7 +730,15 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
continue
values: list[Any] = []
for c in update_cols:
if c in {"unread_count", "is_hidden", "last_timestamp", "sort_timestamp", "last_msg_type", "last_msg_sub_type"}:
if c in {
"unread_count",
"is_hidden",
"last_timestamp",
"sort_timestamp",
"last_msg_locald_id",
"last_msg_type",
"last_msg_sub_type",
}:
values.append(_int((r or {}).get(c)))
else:
values.append(_text((r or {}).get(c)))
@@ -1510,8 +1665,17 @@ def sync_chat_realtime_messages_all(
"sort_timestamp",
item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))),
),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
}
# Prefer the row with the newer sort timestamp for the same username.
prev = realtime_rows_by_user.get(uname)
@@ -2137,11 +2301,7 @@ def _append_full_messages_from_rows(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -2598,6 +2758,13 @@ def _postprocess_full_messages(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -2613,13 +2780,12 @@ def _postprocess_full_messages(
su = str(m.get("senderUsername") or "")
if not su:
continue
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
m["senderDisplayName"] = display_name
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -2836,6 +3002,17 @@ def list_chat_sessions(
"sort_timestamp": item.get("sort_timestamp", item.get("sortTimestamp", item.get("last_timestamp", 0))),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
# Keep these fields so group session previews can render "sender: content" without
# crashing (realtime rows are dicts, not sqlite Rows).
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
}
)
@@ -2923,12 +3100,16 @@ def list_chat_sessions(
try:
need_display: list[str] = []
need_avatar: list[str] = []
if source_norm == "realtime":
# In realtime mode, always ask WCDB for display names: decrypted contact.db can be stale.
need_display = [str(u or "").strip() for u in usernames if str(u or "").strip()]
for u in usernames:
if not u:
continue
row = contact_rows.get(u)
if _pick_display_name(row, u) == u:
need_display.append(u)
if source_norm != "realtime":
row = contact_rows.get(u)
if _pick_display_name(row, u) == u:
need_display.append(u)
if source_norm == "realtime":
# In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale).
if u not in local_avatar_usernames:
@@ -2983,14 +3164,40 @@ def list_chat_sessions(
if v:
last_previews[u] = v
group_sender_display_names: dict[str, str] = _build_group_sender_display_name_map(
contact_db_path,
last_previews,
)
unresolved = []
for conv_username, preview_text in last_previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username and sender_username not in group_sender_display_names:
unresolved.append(sender_username)
unresolved = list(dict.fromkeys(unresolved))
if unresolved:
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
with wcdb_conn.lock:
wcdb_names = _wcdb_get_display_names(wcdb_conn.handle, unresolved)
for sender_username in unresolved:
wcdb_name = str(wcdb_names.get(sender_username) or "").strip()
if wcdb_name and wcdb_name != sender_username:
group_sender_display_names[sender_username] = wcdb_name
except Exception:
pass
sessions: list[dict[str, Any]] = []
for r in filtered:
username = r["username"]
c_row = contact_rows.get(username)
display_name = _pick_display_name(c_row, username)
if display_name == username:
wd = str(wcdb_display_names.get(username) or "").strip()
wd = str(wcdb_display_names.get(username) or "").strip()
if source_norm == "realtime" and wd and wd != username:
display_name = wd
elif display_name == username:
if wd and wd != username:
display_name = wd
@@ -3046,6 +3253,37 @@ def list_chat_sessions(
if last_msg_type == 81604378673 or (last_msg_type == 49 and last_msg_sub_type == 19):
last_message = "[聊天记录]"
last_message = _normalize_session_preview_text(
last_message,
is_group=bool(str(username or "").endswith("@chatroom")),
sender_display_names=group_sender_display_names,
)
if str(username or "").endswith("@chatroom") and str(last_message or "") and not str(last_message).startswith("[草稿]"):
# Prefer group card nickname when available. In realtime mode, WCDB session rows can provide
# `last_sender_display_name`, but we may still get a summary that doesn't include "sender:".
# Also guard against URL schemes like "https://..." being mis-parsed as "https: //...".
raw_sender_display = ""
try:
raw_sender_display = r["last_sender_display_name"]
except Exception:
try:
raw_sender_display = r.get("last_sender_display_name", "")
except Exception:
raw_sender_display = ""
sender_display = _decode_sqlite_text(raw_sender_display).strip()
if sender_display:
text = re.sub(r"\s+", " ", str(last_message or "").strip()).strip()
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if match:
prefix = str(match.group(1) or "").strip()
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if prefix.lower() in {"http", "https"} and body.startswith("//"):
last_message = f"{sender_display}: {text}"
else:
last_message = f"{sender_display}: {body}"
else:
last_message = f"{sender_display}: {text}"
last_time = _format_session_time(r["sort_timestamp"] or r["last_timestamp"])
sessions.append(
@@ -3248,13 +3486,7 @@ def _collect_chat_messages(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -3957,13 +4189,7 @@ def list_chat_messages(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -4412,6 +4638,13 @@ def list_chat_messages(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -4427,13 +4660,12 @@ def list_chat_messages(
su = str(m.get("senderUsername") or "")
if not su:
continue
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
m["senderDisplayName"] = display_name
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -4930,19 +5162,24 @@ async def _search_chat_messages_via_fts(
username=username,
local_avatar_usernames=local_avatar_usernames,
)
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in hits],
)
for h in hits:
su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name
h["conversationAvatar"] = conv_avatar
if su:
row = contact_rows.get(su)
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
h["senderDisplayName"] = display_name
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -4986,6 +5223,23 @@ async def _search_chat_messages_via_fts(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_senders_by_room: dict[str, list[str]] = {}
for h in hits:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in hits:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
@@ -5003,13 +5257,12 @@ async def _search_chat_messages_via_fts(
)
h["conversationAvatar"] = conv_avatar
if su:
row = contact_rows.get(su)
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
h["senderDisplayName"] = display_name
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nickname_cache.get(cu, {}),
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -5272,13 +5525,23 @@ async def search_chat_messages(
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
conv_row = contact_rows.get(username)
conv_name = _pick_display_name(conv_row, username)
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in page],
)
for h in page:
su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name
if su:
row = contact_rows.get(su)
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nicknames,
)
return {
"status": "success",
@@ -5360,6 +5623,23 @@ async def search_chat_messages(
)
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
group_senders_by_room: dict[str, list[str]] = {}
for h in page:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in page:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
@@ -5367,8 +5647,12 @@ async def search_chat_messages(
conv_name = _pick_display_name(crow, cu) if cu else ""
h["conversationName"] = conv_name or cu
if su:
row = contact_rows.get(su)
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nickname_cache.get(cu, {}),
)
return {
"status": "success",
@@ -688,6 +688,83 @@ def _lookup_resource_md5_by_server_id(account_dir_str: str, server_id: int, want
pass
@lru_cache(maxsize=4096)
def _lookup_image_md5_by_server_id_from_messages(account_dir_str: str, server_id: int, username: str) -> str:
account_dir_str = str(account_dir_str or "").strip()
username = str(username or "").strip()
if not account_dir_str or not username:
return ""
try:
sid = int(server_id or 0)
except Exception:
sid = 0
if not sid:
return ""
try:
chat_hash = hashlib.md5(username.encode()).hexdigest()
except Exception:
return ""
if not chat_hash:
return ""
table_name = f"Msg_{chat_hash}"
account_dir = Path(account_dir_str)
db_paths: list[Path] = []
try:
for p in account_dir.glob("message_*.db"):
try:
if p.is_file():
db_paths.append(p)
except Exception:
continue
except Exception:
db_paths = []
if not db_paths:
return ""
db_paths.sort(key=lambda p: p.name)
for db_path in db_paths:
try:
conn = sqlite3.connect(str(db_path))
except Exception:
continue
try:
row = conn.execute(
f"SELECT local_type, packed_info_data FROM {table_name} "
"WHERE server_id = ? ORDER BY create_time DESC LIMIT 1",
(sid,),
).fetchone()
except Exception:
row = None
finally:
try:
conn.close()
except Exception:
pass
if not row:
continue
try:
local_type = int(row[0] or 0)
except Exception:
local_type = 0
if local_type != 3:
continue
md5 = _extract_md5_from_packed_info(row[1])
md5_norm = str(md5 or "").strip().lower()
if _is_valid_md5(md5_norm):
return md5_norm
return ""
def _is_safe_http_url(url: str) -> bool:
u = str(url or "").strip()
if not u:
@@ -1062,6 +1139,12 @@ async def get_chat_image(
resource_md5 = _lookup_resource_md5_by_server_id(str(account_dir), int(server_id), want_local_type=3)
if resource_md5:
md5 = resource_md5
elif username:
md5_from_msg = _lookup_image_md5_by_server_id_from_messages(
str(account_dir), int(server_id), str(username)
)
if md5_from_msg:
md5 = md5_from_msg
# md5 模式:优先从解密资源目录读取(更快)
if md5:
+74
View File
@@ -3,6 +3,7 @@ from typing import Optional
from fastapi import APIRouter
from ..key_store import get_account_keys_from_store
from ..key_service import get_db_key_workflow, fetch_and_save_remote_keys
from ..media_helpers import _load_media_keys, _resolve_account_dir
from ..path_fix import PathFixRoute
@@ -51,3 +52,76 @@ async def get_saved_keys(account: Optional[str] = None):
"keys": result,
}
@router.get("/api/get_db_key", summary="自动获取微信数据库密钥")
async def get_wechat_db_key():
"""
自动流程
1. 结束微信进程
2. 启动微信
3. 根据版本注入 Hook
4. 抓取密钥并返回
"""
try:
# 不需要async吧,我相信fastapi的线程池
db_key = get_db_key_workflow()
return {
"status": 0,
"errmsg": "ok",
"data": {
"db_key": db_key
}
}
except TimeoutError:
return {
"status": -1,
"errmsg": "获取超时,请确保微信没有开启自动登录 或者 加快手速",
"data": {}
}
except Exception as e:
return {
"status": -1,
"errmsg": f"获取失败: {str(e)}",
"data": {}
}
@router.get("/api/get_image_key", summary="获取并保存微信图片密钥")
async def get_image_key(account: Optional[str] = None):
"""
通过模拟 Next.js Server Action 协议利用本地微信配置文件换取 AES/XOR 密钥
1. 读取 [wx_dir]/all_users/config/global_config (Blob 1)
2. 同上目录下的global_config.crc
3. 构造 Multipart 包发送至远程服务器
4. 解析返回流自动存入本地数据库
"""
try:
result = await fetch_and_save_remote_keys(account)
return {
"status": 0,
"errmsg": "ok",
"data": {
"xor_key": result["xor_key"],
"aes_key": result["aes_key"],
"nick_name": result.get("nick_name"),
"account": result["wxid"]
}
}
except FileNotFoundError as e:
return {
"status": -1,
"errmsg": f"文件缺失: {str(e)}",
"data": {}
}
except Exception as e:
import traceback
traceback.print_exc()
return {
"status": -1,
"errmsg": f"获取失败: {str(e)}",
"data": {}
}
@@ -1,5 +1,5 @@
from typing import Optional
import psutil
from fastapi import APIRouter
from ..logging_config import get_logger
@@ -71,3 +71,49 @@ async def detect_current_account(data_root_path: Optional[str] = None):
'error': str(e),
'data': None,
}
@router.get("/api/wechat/status", summary="检查微信运行状态")
async def check_wechat_status():
"""
检查系统中是否有 Weixin.exe WeChat.exe 进程在运行
返回: status=0 成功, wx_status={is_running: bool, pid: int, ...}
"""
process_name_targets = ["Weixin.exe", "WeChat.exe"]
wx_status = {
"is_running": False,
"pid": None,
"exe_path": None,
"memory_usage_mb": 0.0
}
try:
for proc in psutil.process_iter(['pid', 'name', 'exe', 'memory_info']):
try:
if proc.info['name'] and proc.info['name'] in process_name_targets:
wx_status["is_running"] = True
wx_status["pid"] = proc.info['pid']
wx_status["exe_path"] = proc.info['exe']
mem = proc.info['memory_info']
if mem:
wx_status["memory_usage_mb"] = round(mem.rss / (1024 * 1024), 2)
break
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
return {
"status": 0,
"errmsg": "ok",
"wx_status": wx_status
}
except Exception as e:
# 即使出错也返回 JSON,但 status 非 0
return {
"status": -1,
"errmsg": f"检查进程失败: {str(e)}",
"wx_status": wx_status
}
+46
View File
@@ -102,6 +102,17 @@ def _load_wcdb_lib() -> ctypes.CDLL:
lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_group_members.restype = ctypes.c_int
# Optional (newer DLLs): wcdb_get_group_nicknames(handle, chatroom_id, out_json)
try:
lib.wcdb_get_group_nicknames.argtypes = [
ctypes.c_int64,
ctypes.c_char_p,
ctypes.POINTER(ctypes.c_char_p),
]
lib.wcdb_get_group_nicknames.restype = ctypes.c_int
except Exception:
pass
# Optional: execute arbitrary SQL on a selected database kind/path.
# Signature: wcdb_exec_query(handle, kind, path, sql, out_json)
try:
@@ -355,6 +366,41 @@ def get_avatar_urls(handle: int, usernames: list[str]) -> dict[str, str]:
return {}
def get_group_members(handle: int, chatroom_id: str) -> list[dict[str, Any]]:
_ensure_initialized()
lib = _load_wcdb_lib()
cid = str(chatroom_id or "").strip()
if not cid:
return []
out_json = _call_out_json(lib.wcdb_get_group_members, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, list):
out: list[dict[str, Any]] = []
for x in decoded:
if isinstance(x, dict):
out.append(x)
return out
return []
def get_group_nicknames(handle: int, chatroom_id: str) -> dict[str, str]:
_ensure_initialized()
lib = _load_wcdb_lib()
fn = getattr(lib, "wcdb_get_group_nicknames", None)
if not fn:
return {}
cid = str(chatroom_id or "").strip()
if not cid:
return {}
out_json = _call_out_json(fn, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, dict):
return {str(k): str(v) for k, v in decoded.items()}
return {}
def exec_query(handle: int, *, kind: str, path: Optional[str], sql: str) -> list[dict[str, Any]]:
"""Execute raw SQL on a specific db kind/path via WCDB.
@@ -122,6 +122,16 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
(3, 1003, 49, 3, 2, 1735689603, '<msg><appmsg><type>2000</type><des>收到转账0.01元</des></appmsg></msg>', None),
(4, 1004, 1, 4, 2, 1735689604, '普通文本消息', None),
(5, 1005, 10000, 5, 2, 1735689605, '系统提示消息', None),
(
6,
1006,
10000,
6,
2,
1735689606,
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“测试好友”撤回了一条消息]]></replacemsg></revokemsg></sysmsg>',
None,
),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
@@ -413,6 +423,37 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_system_revoke_exports_readable_revoker_content(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
message_types=["system"],
include_media=False,
)
self.assertEqual(job.status, "done", msg=job.error)
payload, _, _ = self._load_export_payload(job.zip_path)
revoke_msg = next((m for m in payload.get("messages", []) if int(m.get("serverId") or 0) == 1006), None)
self.assertIsNotNone(revoke_msg)
self.assertEqual(str(revoke_msg.get("renderType") or ""), "system")
self.assertEqual(str(revoke_msg.get("content") or ""), "“测试好友”撤回了一条消息")
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,111 @@
import sqlite3
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
def _seed_session_db(session_db_path: Path) -> None:
conn = sqlite3.connect(str(session_db_path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT PRIMARY KEY,
unread_count INTEGER DEFAULT 0,
is_hidden INTEGER DEFAULT 0,
summary TEXT DEFAULT '',
draft TEXT DEFAULT '',
last_timestamp INTEGER DEFAULT 0,
sort_timestamp INTEGER DEFAULT 0,
last_msg_locald_id INTEGER DEFAULT 0,
last_msg_type INTEGER DEFAULT 0,
last_msg_sub_type INTEGER DEFAULT 0,
last_msg_sender TEXT DEFAULT '',
last_sender_display_name TEXT DEFAULT ''
)
"""
)
conn.commit()
finally:
conn.close()
class TestChatRealtimeSyncAllUpdatesSenderDisplayName(unittest.TestCase):
def test_sync_all_upserts_last_sender_display_name(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
_seed_session_db(account_dir / "session.db")
conn = _DummyConn()
sessions_rows = [
{
"username": "demo@chatroom",
"unread_count": 0,
"is_hidden": 0,
"summary": "hello",
"draft": "",
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片A",
"last_msg_locald_id": 777,
}
]
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
patch.object(chat_router, "_ensure_decrypted_message_tables", return_value={}),
patch.object(chat_router, "_should_keep_session", return_value=True),
):
resp = chat_router.sync_chat_realtime_messages_all(
_DummyRequest(),
account="acc",
max_scan=20,
include_hidden=True,
include_official=True,
)
self.assertEqual(resp.get("status"), "success")
db = sqlite3.connect(str(account_dir / "session.db"))
try:
row = db.execute(
"SELECT last_sender_display_name, last_msg_sender, last_msg_locald_id FROM SessionTable WHERE username = ? LIMIT 1",
("demo@chatroom",),
).fetchone()
finally:
db.close()
self.assertIsNotNone(row)
self.assertEqual(str(row[0] or ""), "群名片A")
self.assertEqual(str(row[1] or ""), "wxid_demo")
self.assertEqual(int(row[2] or 0), 777)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,68 @@
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import (
_build_group_sender_display_name_map,
_normalize_session_preview_text,
_replace_preview_sender_prefix,
)
class TestChatSessionPreviewFormatting(unittest.TestCase):
def test_normalize_session_preview_emoji_label(self):
out = _normalize_session_preview_text("[表情]", is_group=False, sender_display_names={})
self.assertEqual(out, "[动画表情]")
def test_normalize_group_preview_sender_display_name(self):
out = _normalize_session_preview_text(
"wxid_u3gwceqvne2m22: [表情]",
is_group=True,
sender_display_names={"wxid_u3gwceqvne2m22": "食神"},
)
self.assertEqual(out, "食神: [动画表情]")
def test_build_group_sender_display_name_map_from_contact_db(self):
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?)",
("wxid_u3gwceqvne2m22", "", "食神", "", "", ""),
)
conn.commit()
finally:
conn.close()
mapping = _build_group_sender_display_name_map(
contact_db_path,
{"demo@chatroom": "wxid_u3gwceqvne2m22: [动画表情]"},
)
self.assertEqual(mapping.get("wxid_u3gwceqvne2m22"), "食神")
def test_replace_preview_sender_prefix_uses_group_nickname(self):
out = _replace_preview_sender_prefix("去码头整点🍟: [动画表情]", "麻辣香锅")
self.assertEqual(out, "麻辣香锅: [动画表情]")
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,103 @@
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
def _run(self, sessions_rows: list[dict]) -> dict:
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
conn = _DummyConn()
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
patch.object(chat_router, "_load_contact_rows", return_value={}),
patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
patch.object(chat_router, "_should_keep_session", return_value=True),
patch.object(chat_router, "_avatar_url_unified", return_value="/avatar"),
):
return chat_router.list_chat_sessions(
_DummyRequest(),
account="acc",
limit=50,
include_hidden=True,
include_official=True,
preview="latest",
source="realtime",
)
def test_realtime_sessions_group_summary_prefixed_by_sender_display_name(self):
resp = self._run(
[
{
"username": "demo@chatroom",
"summary": "hello",
"draft": "",
"unread_count": 0,
"is_hidden": 0,
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片A",
}
]
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("lastMessage"), "群名片A: hello")
def test_realtime_sessions_group_url_summary_keeps_scheme(self):
resp = self._run(
[
{
"username": "url@chatroom",
"summary": "https://example.com/x",
"draft": "",
"unread_count": 0,
"is_hidden": 0,
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片B",
}
]
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
if __name__ == "__main__":
unittest.main()
+42
View File
@@ -0,0 +1,42 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_system_message_content
class TestChatSystemMessageParsing(unittest.TestCase):
def test_extract_replacemsg_for_revoke(self):
raw_text = (
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“张三”撤回了一条消息]]>'
"</replacemsg></revokemsg></sysmsg>"
)
self.assertEqual(_parse_system_message_content(raw_text), "“张三”撤回了一条消息")
def test_extract_nested_content_in_replacemsg(self):
raw_text = (
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA['
'<content>"黄智欢" 撤回了一条消息</content><revoketime>0</revoketime>'
']]></replacemsg></revokemsg></sysmsg>'
)
self.assertEqual(_parse_system_message_content(raw_text), '"黄智欢" 撤回了一条消息')
def test_extract_revokemsg_text_when_replacemsg_missing(self):
raw_text = "<revokemsg>你撤回了一条消息</revokemsg>"
self.assertEqual(_parse_system_message_content(raw_text), "你撤回了一条消息")
def test_revoke_fallback_when_no_readable_text(self):
raw_text = '<sysmsg type="revokemsg"></sysmsg>'
self.assertEqual(_parse_system_message_content(raw_text), "撤回了一条消息")
def test_normal_system_message_still_cleaned(self):
raw_text = "<sysmsg><template><![CDATA[ 张三 加入了群聊 ]]></template></sysmsg>"
self.assertEqual(_parse_system_message_content(raw_text), "张三 加入了群聊")
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,114 @@
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _load_group_nickname_map_from_contact_db
def _enc_varint(n: int) -> bytes:
v = int(n)
out = bytearray()
while True:
b = v & 0x7F
v >>= 7
if v:
out.append(b | 0x80)
else:
out.append(b)
break
return bytes(out)
def _enc_tag(field_no: int, wire_type: int) -> bytes:
return _enc_varint((int(field_no) << 3) | int(wire_type))
def _enc_len(field_no: int, data: bytes) -> bytes:
b = bytes(data or b"")
return _enc_tag(field_no, 2) + _enc_varint(len(b)) + b
def _member_entry(*, inner: bytes) -> bytes:
# contact.db ext_buffer uses repeated length-delimited submessages; the top-level field number is not important
# for our best-effort parser, so we use field 1.
return _enc_len(1, inner)
class TestGroupNicknameExtBufferParsing(unittest.TestCase):
def test_parse_pattern_a_field1_username_field2_display(self):
chatroom = "demo@chatroom"
username = "wxid_demo_123456"
display = "群名片A"
inner = _enc_len(1, username.encode("utf-8")) + _enc_len(2, display.encode("utf-8"))
ext_buffer = _member_entry(inner=inner)
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.execute(
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
(1, chatroom, "", ext_buffer),
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
self.assertEqual(out.get(username), display)
def test_parse_pattern_b_field4_username_field1_display(self):
chatroom = "demo2@chatroom"
username = "wxid_demo_abcdef"
display = "hjlbingo"
inner = _enc_len(4, username.encode("utf-8")) + _enc_len(1, display.encode("utf-8"))
ext_buffer = _member_entry(inner=inner)
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.execute(
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
(1, chatroom, "", ext_buffer),
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
self.assertEqual(out.get(username), display)
def test_non_chatroom_returns_empty(self):
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, "wxid_not_chatroom", ["wxid_xxx"])
self.assertEqual(out, {})
if __name__ == "__main__":
unittest.main()
+2
View File
@@ -0,0 +1,2 @@
> 这里放wx_key模块的python预编译wheelhttps://github.com/H3CoF6/py_wx_key/releases/
> 解压放入即可
Generated
+46 -1
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.11"
[[package]]
@@ -230,6 +230,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httptools"
version = "0.6.4"
@@ -259,6 +272,21 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/4d/dc/7decab5c404d1d2cdc1bb330b1bf70e83d6af0396fd4fc76fc60c0d522bf/httptools-0.6.4-cp313-cp313-win_amd64.whl", hash = "sha256:28908df1b9bb8187393d5b5db91435ccc9c8e891657f9cbb42a2541b44c82fc8", size = 87682, upload-time = "2024-10-16T19:44:46.46Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]]
name = "idna"
version = "3.10"
@@ -844,7 +872,9 @@ dependencies = [
{ name = "aiofiles" },
{ name = "cryptography" },
{ name = "fastapi" },
{ name = "httpx" },
{ name = "loguru" },
{ name = "packaging" },
{ name = "pilk" },
{ name = "psutil" },
{ name = "pycryptodome" },
@@ -854,6 +884,7 @@ dependencies = [
{ name = "requests" },
{ name = "typing-extensions" },
{ name = "uvicorn", extra = ["standard"] },
{ name = "wx-key" },
{ name = "zstandard" },
]
@@ -867,7 +898,9 @@ requires-dist = [
{ name = "aiofiles", specifier = ">=23.2.1" },
{ name = "cryptography", specifier = ">=41.0.0" },
{ name = "fastapi", specifier = ">=0.104.0" },
{ name = "httpx" },
{ name = "loguru", specifier = ">=0.7.0" },
{ name = "packaging" },
{ name = "pilk", specifier = ">=0.2.4" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pycryptodome", specifier = ">=3.23.0" },
@@ -878,6 +911,7 @@ requires-dist = [
{ name = "requests", specifier = ">=2.32.4" },
{ name = "typing-extensions", specifier = ">=4.8.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0" },
{ name = "wx-key" },
{ name = "zstandard", specifier = ">=0.23.0" },
]
provides-extras = ["build"]
@@ -891,6 +925,17 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/e1/07/c6fe3ad3e685340704d314d765b7912993bcb8dc198f0e7a89382d37974b/win32_setctime-1.2.0-py3-none-any.whl", hash = "sha256:95d644c4e708aba81dc3704a116d8cbc974d70b3bdb8be1d150e36be6e9d1390", size = 4083, upload-time = "2024-12-07T15:28:26.465Z" },
]
[[package]]
name = "wx-key"
version = "1.0.0"
source = { registry = "tools/key_wheels" }
wheels = [
{ path = "wx_key-1.0.0-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-1.0.0-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-1.0.0-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-1.0.0-cp314-cp314-win_amd64.whl" },
]
[[package]]
name = "zstandard"
version = "0.25.0"