feat(chat-export): 支持 HTML 导出(合并消息/远程缩略图可选下载)

- 导出格式新增 html:生成 index.html + 会话 messages.html,离线浏览

- 支持 chatHistory(合并消息)解析/渲染与弹窗查看

- 图片资源解析增强:MessageResourceInfo 优先 + md5/hdmd5 兜底

- HTML 导出可选下载远程缩略图(仅公网主机/图片类型/5MB 限制)

- 修复拍一拍误判、公众号封面样式识别;转账过期状态与前端展示
This commit is contained in:
2977094657
2026-02-13 22:38:28 +08:00
parent 1c94c0e174
commit 2a1ae2150f
16 changed files with 4696 additions and 82 deletions

View File

@@ -294,6 +294,7 @@ export const useApi = () => {
media_kinds: Array.isArray(data.media_kinds) ? data.media_kinds : ['image', 'emoji', 'video', 'video_thumb', 'voice', 'file'], media_kinds: Array.isArray(data.media_kinds) ? data.media_kinds : ['image', 'emoji', 'video', 'video_thumb', 'voice', 'file'],
output_dir: data.output_dir == null ? null : String(data.output_dir || '').trim(), output_dir: data.output_dir == null ? null : String(data.output_dir || '').trim(),
allow_process_key_extract: !!data.allow_process_key_extract, allow_process_key_extract: !!data.allow_process_key_extract,
download_remote_media: !!data.download_remote_media,
privacy_mode: !!data.privacy_mode, privacy_mode: !!data.privacy_mode,
file_name: data.file_name || null file_name: data.file_name || null
} }

View File

@@ -722,7 +722,7 @@
@click.stop="openChatHistoryModal(message)" @click.stop="openChatHistoryModal(message)"
> >
<div class="wechat-chat-history-body"> <div class="wechat-chat-history-body">
<div class="wechat-chat-history-title">{{ message.title || '聊天记录' }}</div> <div class="wechat-chat-history-title">{{ message.title || '合并消息' }}</div>
<div class="wechat-chat-history-preview" v-if="getChatHistoryPreviewLines(message).length"> <div class="wechat-chat-history-preview" v-if="getChatHistoryPreviewLines(message).length">
<div <div
v-for="(line, idx) in getChatHistoryPreviewLines(message)" v-for="(line, idx) in getChatHistoryPreviewLines(message)"
@@ -734,14 +734,15 @@
</div> </div>
</div> </div>
<div class="wechat-chat-history-bottom"> <div class="wechat-chat-history-bottom">
<span>聊天记录</span> <span>合并消息</span>
</div> </div>
</div> </div>
<div v-else-if="message.renderType === 'transfer'" <div v-else-if="message.renderType === 'transfer'"
class="wechat-transfer-card msg-radius" class="wechat-transfer-card msg-radius"
:class="[{ 'wechat-transfer-received': message.transferReceived, 'wechat-transfer-returned': isTransferReturned(message) }, message.isSent ? 'wechat-transfer-sent-side' : 'wechat-transfer-received-side']"> :class="[{ 'wechat-transfer-received': message.transferReceived, 'wechat-transfer-returned': isTransferReturned(message), 'wechat-transfer-overdue': isTransferOverdue(message) }, message.isSent ? 'wechat-transfer-sent-side' : 'wechat-transfer-received-side']">
<div class="wechat-transfer-content"> <div class="wechat-transfer-content">
<img src="/assets/images/wechat/wechat-returned.png" v-if="isTransferReturned(message)" class="wechat-transfer-icon" alt=""> <img src="/assets/images/wechat/wechat-returned.png" v-if="isTransferReturned(message)" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/overdue.png" v-else-if="isTransferOverdue(message)" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/wechat-trans-icon2.png" v-else-if="message.transferReceived" class="wechat-transfer-icon" alt=""> <img src="/assets/images/wechat/wechat-trans-icon2.png" v-else-if="message.transferReceived" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/wechat-trans-icon1.png" v-else class="wechat-transfer-icon" alt=""> <img src="/assets/images/wechat/wechat-trans-icon1.png" v-else class="wechat-transfer-icon" alt="">
<div class="wechat-transfer-info"> <div class="wechat-transfer-info">
@@ -1233,7 +1234,7 @@
@click.stop @click.stop
> >
<div class="px-4 py-3 bg-neutral-100 border-b border-gray-200 flex items-center justify-between"> <div class="px-4 py-3 bg-neutral-100 border-b border-gray-200 flex items-center justify-between">
<div class="text-sm text-[#161616] truncate">{{ chatHistoryModalTitle || '聊天记录' }}</div> <div class="text-sm text-[#161616] truncate">{{ chatHistoryModalTitle || '合并消息' }}</div>
<button <button
type="button" type="button"
class="p-2 rounded hover:bg-black/5" class="p-2 rounded hover:bg-black/5"
@@ -1495,6 +1496,10 @@
<input type="radio" value="txt" v-model="exportFormat" class="hidden" /> <input type="radio" value="txt" v-model="exportFormat" class="hidden" />
<span>TXT</span> <span>TXT</span>
</label> </label>
<label class="flex items-center gap-1.5 px-3 py-1.5 rounded-md border cursor-pointer transition-colors" :class="exportFormat === 'html' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'">
<input type="radio" value="html" v-model="exportFormat" class="hidden" />
<span>HTML</span>
</label>
</div> </div>
</div> </div>
@@ -1516,6 +1521,19 @@
</div> </div>
</div> </div>
<div v-if="exportFormat === 'html'" class="mt-3">
<div class="text-sm font-medium text-gray-800 mb-2">HTML 选项</div>
<div class="p-3 bg-gray-50 rounded-md border border-gray-200">
<label class="flex items-start gap-2 text-sm text-gray-700">
<input type="checkbox" v-model="exportDownloadRemoteMedia" :disabled="privacyMode" />
<span>允许联网下载链接/引用缩略图提高离线完整性</span>
</label>
<div class="mt-1 text-xs text-gray-500">
HTML 生效会在导出时尝试下载远程缩略图并写入 ZIP已做安全限制隐私模式下自动忽略
</div>
</div>
</div>
<div v-if="exportScope === 'selected'" class="mt-3"> <div v-if="exportScope === 'selected'" class="mt-3">
<div class="flex items-center gap-2 mb-2"> <div class="flex items-center gap-2 mb-2">
<button <button
@@ -2010,6 +2028,7 @@ const messageTypeFilterOptions = [
{ value: 'emoji', label: '表情' }, { value: 'emoji', label: '表情' },
{ value: 'video', label: '视频' }, { value: 'video', label: '视频' },
{ value: 'voice', label: '语音' }, { value: 'voice', label: '语音' },
{ value: 'chatHistory', label: '合并消息' },
{ value: 'transfer', label: '转账' }, { value: 'transfer', label: '转账' },
{ value: 'redPacket', label: '红包' }, { value: 'redPacket', label: '红包' },
{ value: 'file', label: '文件' }, { value: 'file', label: '文件' },
@@ -2488,13 +2507,15 @@ const exportError = ref('')
// current: 当前会话(映射为 selected + 单个 username // current: 当前会话(映射为 selected + 单个 username
const exportScope = ref('current') // current | selected | all | groups | singles const exportScope = ref('current') // current | selected | all | groups | singles
const exportFormat = ref('json') // json | txt const exportFormat = ref('json') // json | txt | html
const exportDownloadRemoteMedia = ref(true)
const exportMessageTypeOptions = [ const exportMessageTypeOptions = [
{ value: 'text', label: '文本' }, { value: 'text', label: '文本' },
{ value: 'image', label: '图片' }, { value: 'image', label: '图片' },
{ value: 'emoji', label: '表情' }, { value: 'emoji', label: '表情' },
{ value: 'video', label: '视频' }, { value: 'video', label: '视频' },
{ value: 'voice', label: '语音' }, { value: 'voice', label: '语音' },
{ value: 'chatHistory', label: '合并消息' },
{ value: 'transfer', label: '转账' }, { value: 'transfer', label: '转账' },
{ value: 'redPacket', label: '红包' }, { value: 'redPacket', label: '红包' },
{ value: 'file', label: '文件' }, { value: 'file', label: '文件' },
@@ -3063,6 +3084,15 @@ const startChatExport = async () => {
const selectedTypeSet = new Set(messageTypes.map((t) => String(t || '').trim())) const selectedTypeSet = new Set(messageTypes.map((t) => String(t || '').trim()))
const mediaKindSet = new Set() const mediaKindSet = new Set()
if (selectedTypeSet.has('chatHistory')) {
// 合并消息内部可能包含任意媒体类型;即使只勾选了 chatHistory也需要打包媒体才可离线查看。
mediaKindSet.add('image')
mediaKindSet.add('emoji')
mediaKindSet.add('video')
mediaKindSet.add('video_thumb')
mediaKindSet.add('voice')
mediaKindSet.add('file')
}
if (selectedTypeSet.has('image')) mediaKindSet.add('image') if (selectedTypeSet.has('image')) mediaKindSet.add('image')
if (selectedTypeSet.has('emoji')) mediaKindSet.add('emoji') if (selectedTypeSet.has('emoji')) mediaKindSet.add('emoji')
if (selectedTypeSet.has('video')) { if (selectedTypeSet.has('video')) {
@@ -3091,6 +3121,7 @@ const startChatExport = async () => {
message_types: messageTypes, message_types: messageTypes,
include_media: includeMedia, include_media: includeMedia,
media_kinds: mediaKinds, media_kinds: mediaKinds,
download_remote_media: exportFormat.value === 'html' && !!exportDownloadRemoteMedia.value,
output_dir: isDesktopExportRuntime() ? String(exportFolder.value || '').trim() : null, output_dir: isDesktopExportRuntime() ? String(exportFolder.value || '').trim() : null,
privacy_mode: !!privacyMode.value, privacy_mode: !!privacyMode.value,
file_name: exportFileName.value || null file_name: exportFileName.value || null
@@ -4017,6 +4048,16 @@ const isTransferReturned = (message) => {
return text.includes('退回') || text.includes('退还') return text.includes('退回') || text.includes('退还')
} }
const isTransferOverdue = (message) => {
const paySubType = String(message?.paySubType || '').trim()
if (paySubType === '10') return true
const s = String(message?.transferStatus || '').trim()
const c = String(message?.content || '').trim()
const text = `${s} ${c}`.trim()
if (!text) return false
return text.includes('过期')
}
const getTransferTitle = (message) => { const getTransferTitle = (message) => {
const paySubType = String(message.paySubType || '').trim() const paySubType = String(message.paySubType || '').trim()
// paysubtype 含义: // paysubtype 含义:
@@ -4952,7 +4993,7 @@ const openChatHistoryQuote = (rec) => {
const openChatHistoryModal = (message) => { const openChatHistoryModal = (message) => {
if (!process.client) return if (!process.client) return
chatHistoryModalTitle.value = String(message?.title || '聊天记录') chatHistoryModalTitle.value = String(message?.title || '合并消息')
const recordItem = String(message?.recordItem || '').trim() const recordItem = String(message?.recordItem || '').trim()
const parsed = parseChatHistoryRecord(recordItem) const parsed = parseChatHistoryRecord(recordItem)
@@ -6100,6 +6141,24 @@ const LinkCard = defineComponent({
color: #fff; color: #fff;
} }
/* 过期的转账样式 */
.wechat-transfer-overdue {
background: #E9CFB3;
}
.wechat-transfer-overdue::after {
background: #E9CFB3;
}
.wechat-transfer-overdue .wechat-transfer-amount,
.wechat-transfer-overdue .wechat-transfer-status {
color: #fff;
}
.wechat-transfer-overdue .wechat-transfer-bottom span {
color: #fff;
}
/* 红包消息样式 - 微信风格 */ /* 红包消息样式 - 微信风格 */
.wechat-redpacket-card { .wechat-redpacket-card {
width: 210px; width: 210px;

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ from collections import Counter
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
from urllib.parse import quote, urlparse from urllib.parse import parse_qs, quote, urlparse
from fastapi import HTTPException from fastapi import HTTPException
@@ -634,6 +634,32 @@ def _is_mp_weixin_article_url(url: str) -> bool:
return "mp.weixin.qq.com/" in lu return "mp.weixin.qq.com/" in lu
def _is_mp_weixin_feed_article_url(url: str) -> bool:
"""Detect WeChat's PC feed/recommendation mp.weixin.qq.com share URLs.
These links often carry an `exptype` like:
masonry_feed_brief_content_elite_for_pcfeeds_u2i
WeChat desktop tends to render them in a cover-card style (image + bottom title),
so we use this as a hint to choose the 'cover' linkStyle.
"""
u = str(url or "").strip()
if not u:
return False
try:
parsed = urlparse(u)
q = parse_qs(parsed.query or "")
for v in (q.get("exptype") or []):
if "masonry_feed" in str(v or "").lower():
return True
except Exception:
pass
return "exptype=masonry_feed" in u.lower()
def _classify_link_share(*, app_type: int, url: str, source_username: str, desc: str) -> tuple[str, str]: def _classify_link_share(*, app_type: int, url: str, source_username: str, desc: str) -> tuple[str, str]:
src = str(source_username or "").strip().lower() src = str(source_username or "").strip().lower()
is_official_article = bool( is_official_article = bool(
@@ -647,7 +673,15 @@ def _classify_link_share(*, app_type: int, url: str, source_username: str, desc:
hashtag_count = len(re.findall(r"#[^#\s]+", d)) hashtag_count = len(re.findall(r"#[^#\s]+", d))
# 公众号文章中「封面图 + 底栏标题」卡片特征:摘要以 #话题# 风格为主。 # 公众号文章中「封面图 + 底栏标题」卡片特征:摘要以 #话题# 风格为主。
link_style = "cover" if (is_official_article and (d.startswith("#") or hashtag_count >= 2)) else "default" cover_like = bool(
is_official_article
and (
d.startswith("#")
or hashtag_count >= 2
or _is_mp_weixin_feed_article_url(url)
)
)
link_style = "cover" if cover_like else "default"
return link_type, link_style return link_type, link_style
@@ -948,8 +982,12 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"recordItem": record_item or "", "recordItem": record_item or "",
} }
if app_type in (5, 68) and url: if app_type in (4, 5, 68) and url:
thumb_url = _normalize_xml_url(_extract_xml_tag_text(text, "thumburl")) # Many appmsg link cards (notably Bilibili shares with <type>4</type>) include a <patMsg> metadata block.
# DO NOT treat "<patmsg" presence as a pat message: it would misclassify normal link cards as "[拍一拍]".
thumb_url = _normalize_xml_url(
_extract_xml_tag_text(text, "thumburl") or _extract_xml_tag_text(text, "cdnthumburl")
)
link_type, link_style = _classify_link_share( link_type, link_style = _classify_link_share(
app_type=app_type, app_type=app_type,
url=url, url=url,
@@ -1093,7 +1131,10 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"quoteVoiceLength": quote_voice_length, "quoteVoiceLength": quote_voice_length,
} }
if app_type == 62 or "<patmsg" in lower or 'type="patmsg"' in lower or "type='patmsg'" in lower: # Some versions may mark pat messages via sysmsg/appmsg tag attribute: <sysmsg type="patmsg">...</sysmsg>.
# Be strict here: lots of non-pat appmsg payloads still carry a nested <patMsg>...</patMsg> metadata block.
patmsg_attr = bool(re.search(r"<(sysmsg|appmsg)\b[^>]*\btype=['\"]patmsg['\"]", lower))
if app_type == 62 or patmsg_attr:
return {"renderType": "system", "content": "[拍一拍]"} return {"renderType": "system", "content": "[拍一拍]"}
if app_type == 2000 or ( if app_type == 2000 or (

View File

@@ -2742,6 +2742,90 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None:
# - 将原始转账消息1/8回填为“已被接收” # - 将原始转账消息1/8回填为“已被接收”
# - 若同一 transferId 同时存在原始消息与 paysubtype=3 消息,则将 paysubtype=3 的那条校正为“已收款” # - 若同一 transferId 同时存在原始消息与 paysubtype=3 消息,则将 paysubtype=3 的那条校正为“已收款”
def _is_transfer_expired_system_message(text: Any) -> bool:
content = str(text or "").strip()
if not content:
return False
if "转账" not in content or "过期" not in content:
return False
if "未接收" in content and ("24小时" in content or "二十四小时" in content):
return True
return "已过期" in content and ("收款方" in content or "转账" in content)
def _mark_pending_transfers_expired_by_system_messages() -> set[str]:
expired_system_times: list[int] = []
pending_candidates: list[tuple[int, int]] = [] # (index, createTime)
for idx, msg in enumerate(merged):
rt = str(msg.get("renderType") or "").strip()
if rt == "system":
if _is_transfer_expired_system_message(msg.get("content")):
try:
ts = int(msg.get("createTime") or 0)
except Exception:
ts = 0
if ts > 0:
expired_system_times.append(ts)
continue
if rt != "transfer":
continue
pst = str(msg.get("paySubType") or "").strip()
if pst not in ("1", "8"):
continue
try:
ts = int(msg.get("createTime") or 0)
except Exception:
ts = 0
if ts <= 0:
continue
pending_candidates.append((idx, ts))
if not expired_system_times or not pending_candidates:
return set()
used_pending_indexes: set[int] = set()
expired_transfer_ids: set[str] = set()
# 过期系统提示通常出现在转账发起约 24 小时后。
# 为避免误匹配,要求时间差落在 [22h, 26h] 范围内,并选择最接近 24h 的待收款消息。
for sys_ts in sorted(expired_system_times):
best_index = -1
best_distance = 10**9
for idx, transfer_ts in pending_candidates:
if idx in used_pending_indexes:
continue
delta = sys_ts - transfer_ts
if delta < 0:
continue
if delta < 22 * 3600 or delta > 26 * 3600:
continue
distance = abs(delta - 24 * 3600)
if distance < best_distance:
best_distance = distance
best_index = idx
if best_index < 0:
continue
used_pending_indexes.add(best_index)
transfer_msg = merged[best_index]
transfer_msg["paySubType"] = "10"
transfer_msg["transferStatus"] = "已过期"
tid = str(transfer_msg.get("transferId") or "").strip()
if tid:
expired_transfer_ids.add(tid)
return expired_transfer_ids
expired_transfer_ids = _mark_pending_transfers_expired_by_system_messages()
returned_transfer_ids: set[str] = set() # 退还状态的 transferId returned_transfer_ids: set[str] = set() # 退还状态的 transferId
received_transfer_ids: set[str] = set() # 已收款状态的 transferId received_transfer_ids: set[str] = set() # 已收款状态的 transferId
returned_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于退还回退匹配 returned_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于退还回退匹配
@@ -2828,6 +2912,8 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None:
tid = str(m.get("transferId") or "").strip() tid = str(m.get("transferId") or "").strip()
if not tid or tid not in pending_transfer_ids: if not tid or tid not in pending_transfer_ids:
continue continue
if tid in expired_transfer_ids:
continue
mid = str(m.get("id") or "").strip() mid = str(m.get("id") or "").strip()
if mid and mid in backfilled_message_ids: if mid and mid in backfilled_message_ids:
continue continue

View File

@@ -12,17 +12,31 @@ from ..path_fix import PathFixRoute
router = APIRouter(route_class=PathFixRoute) router = APIRouter(route_class=PathFixRoute)
ExportFormat = Literal["json", "txt"] ExportFormat = Literal["json", "txt", "html"]
ExportScope = Literal["selected", "all", "groups", "singles"] ExportScope = Literal["selected", "all", "groups", "singles"]
MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"] MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"]
MessageType = Literal["text", "image", "emoji", "video", "voice", "file", "link", "transfer", "redPacket", "system", "quote", "voip"] MessageType = Literal[
"text",
"image",
"emoji",
"video",
"voice",
"chatHistory",
"file",
"link",
"transfer",
"redPacket",
"system",
"quote",
"voip",
]
class ChatExportCreateRequest(BaseModel): class ChatExportCreateRequest(BaseModel):
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)") account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
scope: ExportScope = Field("selected", description="导出范围selected=指定会话all=全部groups=仅群聊singles=仅单聊") scope: ExportScope = Field("selected", description="导出范围selected=指定会话all=全部groups=仅群聊singles=仅单聊")
usernames: list[str] = Field(default_factory=list, description="会话 username 列表scope=selected 时使用)") usernames: list[str] = Field(default_factory=list, description="会话 username 列表scope=selected 时使用)")
format: ExportFormat = Field("json", description="导出格式json 或 txtzip 内每个会话一个文件)") format: ExportFormat = Field("json", description="导出格式json/txt/htmlzip 内每个会话一个文件html 可离线打开 index.html 查看")
start_time: Optional[int] = Field(None, description="起始时间Unix 秒,含)") start_time: Optional[int] = Field(None, description="起始时间Unix 秒,含)")
end_time: Optional[int] = Field(None, description="结束时间Unix 秒,含)") end_time: Optional[int] = Field(None, description="结束时间Unix 秒,含)")
include_hidden: bool = Field(False, description="是否包含隐藏会话scope!=selected 时)") include_hidden: bool = Field(False, description="是否包含隐藏会话scope!=selected 时)")
@@ -41,6 +55,10 @@ class ChatExportCreateRequest(BaseModel):
False, False,
description="预留字段:本项目不从微信进程提取媒体密钥,请使用 wx_key 获取并保存/批量解密", description="预留字段:本项目不从微信进程提取媒体密钥,请使用 wx_key 获取并保存/批量解密",
) )
download_remote_media: bool = Field(
False,
description="HTML 导出时允许联网下载链接/引用缩略图等远程媒体(提高离线完整性)",
)
privacy_mode: bool = Field( privacy_mode: bool = Field(
False, False,
description="隐私模式导出:隐藏会话/用户名/内容,不打包头像与媒体", description="隐私模式导出:隐藏会话/用户名/内容,不打包头像与媒体",
@@ -64,6 +82,7 @@ async def create_chat_export(req: ChatExportCreateRequest):
message_types=req.message_types, message_types=req.message_types,
output_dir=req.output_dir, output_dir=req.output_dir,
allow_process_key_extract=req.allow_process_key_extract, allow_process_key_extract=req.allow_process_key_extract,
download_remote_media=req.download_remote_media,
privacy_mode=req.privacy_mode, privacy_mode=req.privacy_mode,
file_name=req.file_name, file_name=req.file_name,
) )

View File

@@ -0,0 +1,50 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_app_message
class TestChatAppMessageType4PatMsgRegression(unittest.TestCase):
def test_type4_link_with_patmsg_metadata_is_not_misclassified_as_pat(self):
raw_text = (
"<msg>"
'<appmsg appid="wxcb8d4298c6a09bcb" sdkver="0">'
"<title>【中配】抽象可能让你的代码变差 - CodeAesthetic</title>"
"<des>UP主黑纹白斑马</des>"
"<type>4</type>"
"<url>https://b23.tv/au68guF</url>"
"<appname>哔哩哔哩</appname>"
"<appattach><cdnthumburl>3057020100044b30</cdnthumburl></appattach>"
"<patMsg><chatUser /></patMsg>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("url"), "https://b23.tv/au68guF")
self.assertEqual(parsed.get("title"), "【中配】抽象可能让你的代码变差 - CodeAesthetic")
self.assertEqual(parsed.get("from"), "哔哩哔哩")
self.assertNotEqual(parsed.get("content"), "[拍一拍]")
def test_type62_is_still_pat(self):
raw_text = '<msg><appmsg><title>"A" 拍了拍 "B"</title><type>62</type></appmsg></msg>'
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "system")
self.assertEqual(parsed.get("content"), "[拍一拍]")
def test_sysmsg_type_patmsg_attr_is_still_pat(self):
raw_text = '<sysmsg type="patmsg"><foo>bar</foo></sysmsg>'
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "system")
self.assertEqual(parsed.get("content"), "[拍一拍]")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,218 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportChatHistoryModal(unittest.TestCase):
_MD5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
record_item = (
"<recorditem>"
"<datalist>"
"<dataitem>"
"<datatype>2</datatype>"
f"<fullmd5>{self._MD5}</fullmd5>"
"</dataitem>"
"</datalist>"
"</recorditem>"
)
chat_history_xml = (
"<msg><appmsg>"
"<type>19</type>"
"<title>聊天记录</title>"
"<des>记录预览</des>"
f"<recorditem><![CDATA[{record_item}]]></recorditem>"
"</appmsg></msg>"
)
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 49, 1, 2, 1735689601, chat_history_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_media_files(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / f"{self._MD5}.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_media_files(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=["chatHistory", "image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_chat_history_modal_has_media_index_and_record_item(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn(f"media/images/{self._MD5}.jpg", names)
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn('id="chatHistoryModal"', html_text)
self.assertIn('data-wce-chat-history="1"', html_text)
self.assertIn('data-record-item-b64="', html_text)
self.assertIn('id="wceMediaIndex"', html_text)
self.assertIn(self._MD5, html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,353 @@
import os
import json
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportHtmlFormat(unittest.TestCase):
_FILE_MD5 = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
_VOICE_SERVER_ID = 2001
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
image_xml = '<msg><img md5="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" cdnthumburl="img_file_id_1" /></msg>'
voice_xml = '<msg><voicemsg voicelength="3000" /></msg>'
file_md5 = self._FILE_MD5
file_xml = (
"<msg><appmsg>"
"<type>6</type>"
"<title>demo.pdf</title>"
"<totallen>2048</totallen>"
f"<md5>{file_md5}</md5>"
"</appmsg></msg>"
)
link_xml = (
"<msg><appmsg>"
"<type>5</type>"
"<title>示例链接</title>"
"<des>这是描述</des>"
"<url>https://example.com/</url>"
"<thumburl>https://example.com/thumb.jpg</thumburl>"
"<sourceusername>gh_test</sourceusername>"
"<sourcedisplayname>测试公众号</sourcedisplayname>"
"</appmsg></msg>"
)
chat_history_xml = (
"<msg><appmsg>"
"<type>19</type>"
"<title>聊天记录</title>"
"<des>记录预览</des>"
"<recorditem><desc>张三: hi\n李四: ok</desc></recorditem>"
"</appmsg></msg>"
)
transfer_xml = (
"<msg><appmsg>"
"<type>2000</type>"
"<title>微信转账</title>"
"<wcpayinfo>"
"<pay_memo>转账备注</pay_memo>"
"<feedesc>¥1.23</feedesc>"
"<paysubtype>3</paysubtype>"
"<transferid>transfer_123</transferid>"
"</wcpayinfo>"
"</appmsg></msg>"
)
red_packet_xml = (
"<msg><appmsg>"
"<type>2001</type>"
"<title>红包</title>"
"<wcpayinfo>"
"<sendertitle>恭喜发财,大吉大利</sendertitle>"
"<senderdes>微信红包</senderdes>"
"</wcpayinfo>"
"</appmsg></msg>"
)
voip_xml = (
"<msg><VoIPBubbleMsg>"
"<room_type>1</room_type>"
"<msg>语音通话</msg>"
"</VoIPBubbleMsg></msg>"
)
quote_voice_xml = (
"<msg><appmsg>"
"<type>57</type>"
"<title>回复语音</title>"
"<refermsg>"
"<type>34</type>"
f"<svrid>{self._VOICE_SERVER_ID}</svrid>"
"<fromusr>wxid_friend</fromusr>"
"<displayname>测试好友</displayname>"
"<content>wxid_friend:3000:1:</content>"
"</refermsg>"
"</appmsg></msg>"
)
rows = [
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
(2, 1002, 1, 2, 2, 1735689602, "普通文本消息[微笑]", None),
(3, 1003, 49, 3, 1, 1735689603, transfer_xml, None),
(4, 1004, 49, 4, 2, 1735689604, red_packet_xml, None),
(5, 1005, 49, 5, 1, 1735689605, file_xml, None),
(6, 1006, 49, 6, 2, 1735689606, link_xml, None),
(7, 1007, 49, 7, 2, 1735689607, chat_history_xml, None),
(8, 1008, 50, 8, 2, 1735689608, voip_xml, None),
(9, self._VOICE_SERVER_ID, 34, 9, 1, 1735689609, voice_xml, None),
(10, 1010, 49, 10, 1, 1735689610, quote_voice_xml, None),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
finally:
conn.close()
def _seed_media_files(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9")
(resource_root / "bb").mkdir(parents=True, exist_ok=True)
(resource_root / "bb" / f"{self._FILE_MD5}.dat").write_bytes(b"dummy")
conn = sqlite3.connect(str(account_dir / "media_0.db"))
try:
conn.execute(
"""
CREATE TABLE VoiceInfo (
svr_id INTEGER,
create_time INTEGER,
voice_data BLOB
)
"""
)
conn.execute(
"INSERT INTO VoiceInfo VALUES (?, ?, ?)",
(self._VOICE_SERVER_ID, 1735689609, b"SILK_VOICE_DATA"),
)
conn.commit()
finally:
conn.close()
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_media_files(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"],
message_types=[],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_html_export_contains_index_and_conversation_page(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
self.assertTrue(job.zip_path and job.zip_path.exists())
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn("index.html", names)
self.assertIn("assets/wechat-chat-export.css", names)
self.assertIn("assets/wechat-chat-export.js", names)
manifest = json.loads(zf.read("manifest.json").decode("utf-8"))
self.assertEqual(manifest.get("format"), "html")
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn('data-wce-rail-avatar="1"', html_text)
self.assertIn('data-wce-session-list="1"', html_text)
self.assertIn('id="sessionSearchInput"', html_text)
self.assertIn('data-wce-time-divider="1"', html_text)
self.assertIn('id="messageTypeFilter"', html_text)
self.assertIn('value="chatHistory"', html_text)
self.assertIn('id="chatHistoryModal"', html_text)
self.assertIn('data-wce-chat-history="1"', html_text)
self.assertIn('data-record-item-b64="', html_text)
self.assertIn('id="wceMediaIndex"', html_text)
self.assertIn('data-wce-quote-voice-btn="1"', html_text)
self.assertNotIn('title="刷新消息"', html_text)
self.assertNotIn('title="导出聊天记录"', html_text)
self.assertNotIn("搜索聊天记录", html_text)
self.assertNotIn("朋友圈", html_text)
self.assertNotIn("年度总结", html_text)
self.assertNotIn("设置", html_text)
self.assertNotIn("隐私模式", html_text)
self.assertTrue(any(n.startswith("media/images/") for n in names))
self.assertIn("../../media/images/", html_text)
self.assertIn("wechat-transfer-card", html_text)
self.assertIn("wechat-redpacket-card", html_text)
self.assertIn("wechat-chat-history-card", html_text)
self.assertIn("wechat-voip-bubble", html_text)
self.assertIn("wechat-link-card", html_text)
self.assertIn("wechat-file-card", html_text)
self.assertIn("wechat-voice-wrapper", html_text)
css_text = zf.read("assets/wechat-chat-export.css").decode("utf-8", errors="ignore")
self.assertIn("wechat-transfer-card", css_text)
self.assertNotIn("wechat-transfer-card[data-v-", css_text)
js_text = zf.read("assets/wechat-chat-export.js").decode("utf-8", errors="ignore")
self.assertIn("wechat-voice-bubble", js_text)
self.assertIn("voice-playing", js_text)
self.assertIn("data-wce-quote-voice-btn", js_text)
self.assertIn("assets/images/wechat/wechat-trans-icon1.png", names)
self.assertIn("assets/images/wechat/zip.png", names)
self.assertIn("assets/images/wechat/WeChat-Icon-Logo.wine.svg", names)
self.assertTrue(any(n.startswith("fonts/") and n.endswith(".woff2") for n in names))
self.assertIn("wxemoji/Expression_1@2x.png", names)
self.assertIn("../../wxemoji/Expression_1@2x.png", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,199 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportImageMd5CandidateFallback(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
bad_md5 = "ffffffffffffffffffffffffffffffff"
image_xml = f'<msg><img md5="{bad_md5}" hdmd5="{good_md5}" cdnthumburl="img_file_id_1" /></msg>'
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_decrypted_resource(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_decrypted_resource(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=[],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_falls_back_to_secondary_md5_candidate(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn("media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", names)
self.assertFalse(any("ffffffffffffffffffffffffffffffff" in n for n in names if n.startswith("media/images/")))
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8", errors="ignore")
self.assertIn("../../media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,235 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportImageMd5PrefersMessageResource(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_source_info(self, account_dir: Path) -> None:
wxid_dir = account_dir / "_wxid_dummy"
db_storage_dir = account_dir / "_db_storage_dummy"
wxid_dir.mkdir(parents=True, exist_ok=True)
db_storage_dir.mkdir(parents=True, exist_ok=True)
(account_dir / "_source.json").write_text(
'{"wxid_dir": "' + str(wxid_dir).replace("\\", "\\\\") + '", "db_storage_path": "' + str(db_storage_dir).replace("\\", "\\\\") + '"}',
encoding="utf-8",
)
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str, bad_md5: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
image_xml = f'<msg><img md5="{bad_md5}" /></msg>'
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_message_resource_db(self, path: Path, *, good_md5: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE MessageResourceInfo (
message_id INTEGER,
message_svr_id INTEGER,
message_local_type INTEGER,
chat_id INTEGER,
message_local_id INTEGER,
message_create_time INTEGER,
packed_info BLOB
)
"""
)
# packed_info may contain multiple tokens; include a realistic *.dat reference so the extractor prefers it.
packed_info = f"{good_md5}_t.dat".encode("ascii")
conn.execute(
"INSERT INTO MessageResourceInfo VALUES (?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 0, 1, 1735689601, packed_info),
)
conn.commit()
finally:
conn.close()
def _seed_decrypted_resource(self, account_dir: Path, *, good_md5: str) -> None:
resource_root = account_dir / "resource"
(resource_root / good_md5[:2]).mkdir(parents=True, exist_ok=True)
# Minimal JPEG payload (valid SOI/EOI).
(resource_root / good_md5[:2] / f"{good_md5}.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str, bad_md5: str, good_md5: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_source_info(account_dir)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username, bad_md5=bad_md5)
self._seed_message_resource_db(account_dir / "message_resource.db", good_md5=good_md5)
self._seed_decrypted_resource(account_dir, good_md5=good_md5)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=["image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_prefers_message_resource_md5_over_xml_md5(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
bad_md5 = "ffffffffffffffffffffffffffffffff"
good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
self._prepare_account(root, account=account, username=username, bad_md5=bad_md5, good_md5=good_md5)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn(f"media/images/{good_md5}.jpg", names)
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8", errors="ignore")
self.assertIn(f"../../media/images/{good_md5}.jpg", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -198,6 +198,7 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
message_types=message_types, message_types=message_types,
output_dir=None, output_dir=None,
allow_process_key_extract=False, allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=privacy_mode, privacy_mode=privacy_mode,
file_name=None, file_name=None,
) )

View File

@@ -0,0 +1,304 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class _FakeResponse:
def __init__(self, body: bytes, *, content_type: str) -> None:
self.status_code = 200
self.headers = {
"Content-Type": str(content_type or "").strip(),
"Content-Length": str(len(body)),
}
self._body = body
def iter_content(self, chunk_size=65536):
data = self._body or b""
for i in range(0, len(data), int(chunk_size or 65536)):
yield data[i : i + int(chunk_size or 65536)]
def close(self):
return None
class TestChatExportRemoteThumbOption(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> tuple[str, str]:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
link_thumb = "https://1.1.1.1/thumb.png"
quote_thumb = "https://1.1.1.1/quote.png"
link_xml = (
"<msg><appmsg>"
"<type>5</type>"
"<title>示例链接</title>"
"<des>这是描述</des>"
"<url>https://example.com/</url>"
f"<thumburl>{link_thumb}</thumburl>"
"</appmsg></msg>"
)
quote_xml = (
"<msg><appmsg>"
"<type>57</type>"
"<title>回复</title>"
"<refermsg>"
"<type>49</type>"
"<svrid>8888</svrid>"
"<fromusr>wxid_other</fromusr>"
"<displayname>对方</displayname>"
"<content>"
"<msg><appmsg><type>5</type><title>被引用链接</title><url>https://example.com/</url>"
f"<thumburl>{quote_thumb}</thumburl>"
"</appmsg></msg>"
"</content>"
"</refermsg>"
"</appmsg></msg>"
)
rows = [
(1, 1001, 49, 1, 2, 1735689601, link_xml, None),
(2, 1002, 49, 2, 2, 1735689602, quote_xml, None),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
return link_thumb, quote_thumb
finally:
conn.close()
def _prepare_account(self, root: Path, *, account: str, username: str) -> tuple[Path, str, str]:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
link_thumb, quote_thumb = self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
return account_dir, link_thumb, quote_thumb
def _create_job(self, manager, *, account: str, username: str, download_remote_media: bool):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"],
message_types=["link", "quote", "image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=download_remote_media,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_remote_thumb_disabled_does_not_download(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
with mock.patch.object(
svc.requests,
"get",
side_effect=AssertionError("requests.get should not be called when download_remote_media=False"),
) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=False,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertEqual(m_get.call_count, 0)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn(f'src="{link_thumb}"', html_text)
self.assertIn(f'src="{quote_thumb}"', html_text)
self.assertFalse(any(n.startswith("media/remote/") for n in names))
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_remote_thumb_enabled_downloads_and_rewrites(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
fake_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde"
def _fake_get(url, **_kwargs):
return _FakeResponse(fake_png, content_type="image/png")
with mock.patch.object(svc.requests, "get", side_effect=_fake_get) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=True,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertGreaterEqual(m_get.call_count, 1)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
h1 = hashlib.sha256(link_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc1 = f"media/remote/{h1[:32]}.png"
self.assertIn(arc1, names)
self.assertIn(f"../../{arc1}", html_text)
self.assertNotIn(f'src="{link_thumb}"', html_text)
h2 = hashlib.sha256(quote_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc2 = f"media/remote/{h2[:32]}.png"
self.assertIn(arc2, names)
self.assertIn(f"../../{arc2}", html_text)
self.assertNotIn(f'src="{quote_thumb}"', html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,58 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_app_message
class TestChatOfficialArticleCoverStyle(unittest.TestCase):
def test_mp_weixin_feed_url_is_cover_style(self):
raw_text = (
"<msg>"
"<appmsg>"
"<title>时尚穿搭「这样的jk你喜欢吗」</title>"
"<des>这样的jk你喜欢吗</des>"
"<type>5</type>"
"<url>"
"http://mp.weixin.qq.com/s?__biz=MzkxOTY4MjIxOA==&amp;mid=2247508015&amp;idx=1&amp;sn=931dce677c6e70b4365792b14e7e8ff0"
"&amp;exptype=masonry_feed_brief_content_elite_for_pcfeeds_u2i&amp;ranksessionid=1770868256_1&amp;req_id=1770867949535989#rd"
"</url>"
"<thumburl>https://mmbiz.qpic.cn/sz_mmbiz_jpg/foo/640?wx_fmt=jpeg&amp;wxfrom=401</thumburl>"
"<sourcedisplayname>甜图社</sourcedisplayname>"
"<sourceusername>gh_abc123</sourceusername>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "cover")
def test_mp_weixin_non_feed_url_keeps_default_style(self):
raw_text = (
"<msg>"
"<appmsg>"
"<title>普通分享</title>"
"<des>这样的jk你喜欢吗</des>"
"<type>5</type>"
"<url>http://mp.weixin.qq.com/s?__biz=foo&amp;mid=1&amp;idx=1&amp;sn=bar#rd</url>"
"<sourcedisplayname>甜图社</sourcedisplayname>"
"<sourceusername>gh_abc123</sourceusername>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "default")
if __name__ == "__main__":
unittest.main()

View File

@@ -62,7 +62,68 @@ class TestTransferPostprocess(unittest.TestCase):
self.assertEqual(merged[0].get("transferStatus"), "已被接收") self.assertEqual(merged[0].get("transferStatus"), "已被接收")
def test_pending_transfer_marked_expired_by_system_message(self):
merged = [
{
"id": "message_0:Msg_x:100",
"renderType": "transfer",
"paySubType": "1",
"transferId": "t-expired-1",
"amount": "¥500.00",
"createTime": 1770742598,
"isSent": True,
"transferStatus": "转账",
},
{
"id": "message_0:Msg_x:101",
"renderType": "system",
"type": 10000,
"createTime": 1770829000,
"content": "收款方24小时内未接收你的转账已过期",
},
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("paySubType"), "10")
self.assertEqual(merged[0].get("transferStatus"), "已过期")
def test_expired_matching_wins_over_amount_time_received_fallback(self):
merged = [
{
"id": "message_0:Msg_x:200",
"renderType": "transfer",
"paySubType": "1",
"transferId": "t-expired-2",
"amount": "¥500.00",
"createTime": 1770742598,
"isSent": True,
"transferStatus": "",
},
{
"id": "message_0:Msg_x:201",
"renderType": "transfer",
"paySubType": "3",
"transferId": "t-other",
"amount": "¥500.00",
"createTime": 1770828800,
"isSent": False,
"transferStatus": "已收款",
},
{
"id": "message_0:Msg_x:202",
"renderType": "system",
"type": 10000,
"createTime": 1770829000,
"content": "收款方24小时内未接收你的转账已过期",
},
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("paySubType"), "10")
self.assertEqual(merged[0].get("transferStatus"), "已过期")
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()