From 2a1ae2150f257dacd5b34fc11d24824f805934f6 Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Fri, 13 Feb 2026 22:38:28 +0800 Subject: [PATCH] =?UTF-8?q?feat(chat-export):=20=E6=94=AF=E6=8C=81=20HTML?= =?UTF-8?q?=20=E5=AF=BC=E5=87=BA=EF=BC=88=E5=90=88=E5=B9=B6=E6=B6=88?= =?UTF-8?q?=E6=81=AF/=E8=BF=9C=E7=A8=8B=E7=BC=A9=E7=95=A5=E5=9B=BE?= =?UTF-8?q?=E5=8F=AF=E9=80=89=E4=B8=8B=E8=BD=BD=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 导出格式新增 html:生成 index.html + 会话 messages.html,离线浏览 - 支持 chatHistory(合并消息)解析/渲染与弹窗查看 - 图片资源解析增强:MessageResourceInfo 优先 + md5/hdmd5 兜底 - HTML 导出可选下载远程缩略图(仅公网主机/图片类型/5MB 限制) - 修复拍一拍误判、公众号封面样式识别;转账过期状态与前端展示 --- frontend/composables/useApi.js | 1 + frontend/pages/chat/[[username]].vue | 71 +- .../public/assets/images/wechat/overdue.png | Bin 0 -> 1580 bytes .../chat_export_service.py | 3063 ++++++++++++++++- src/wechat_decrypt_tool/chat_helpers.py | 51 +- src/wechat_decrypt_tool/routers/chat.py | 86 + .../routers/chat_export.py | 25 +- ...hat_app_message_type4_patmsg_regression.py | 50 + tests/test_chat_export_chat_history_modal.py | 218 ++ tests/test_chat_export_html_format.py | 353 ++ ...hat_export_image_md5_candidate_fallback.py | 199 ++ ...port_image_md5_prefers_message_resource.py | 235 ++ ...est_chat_export_message_types_semantics.py | 1 + tests/test_chat_export_remote_thumb_option.py | 304 ++ .../test_chat_official_article_cover_style.py | 58 + tests/test_transfer_postprocess.py | 63 +- 16 files changed, 4696 insertions(+), 82 deletions(-) create mode 100644 frontend/public/assets/images/wechat/overdue.png create mode 100644 tests/test_chat_app_message_type4_patmsg_regression.py create mode 100644 tests/test_chat_export_chat_history_modal.py create mode 100644 tests/test_chat_export_html_format.py create mode 100644 tests/test_chat_export_image_md5_candidate_fallback.py create mode 100644 tests/test_chat_export_image_md5_prefers_message_resource.py create mode 100644 tests/test_chat_export_remote_thumb_option.py create mode 100644 tests/test_chat_official_article_cover_style.py diff --git a/frontend/composables/useApi.js b/frontend/composables/useApi.js index c1ce542..f30f7d3 100644 --- a/frontend/composables/useApi.js +++ b/frontend/composables/useApi.js @@ -294,6 +294,7 @@ export const useApi = () => { media_kinds: Array.isArray(data.media_kinds) ? data.media_kinds : ['image', 'emoji', 'video', 'video_thumb', 'voice', 'file'], output_dir: data.output_dir == null ? null : String(data.output_dir || '').trim(), allow_process_key_extract: !!data.allow_process_key_extract, + download_remote_media: !!data.download_remote_media, privacy_mode: !!data.privacy_mode, file_name: data.file_name || null } diff --git a/frontend/pages/chat/[[username]].vue b/frontend/pages/chat/[[username]].vue index 7366a49..6d600f5 100644 --- a/frontend/pages/chat/[[username]].vue +++ b/frontend/pages/chat/[[username]].vue @@ -722,7 +722,7 @@ @click.stop="openChatHistoryModal(message)" >
-
{{ message.title || '聊天记录' }}
+
{{ message.title || '合并消息' }}
- 聊天记录 + 合并消息
+ :class="[{ 'wechat-transfer-received': message.transferReceived, 'wechat-transfer-returned': isTransferReturned(message), 'wechat-transfer-overdue': isTransferOverdue(message) }, message.isSent ? 'wechat-transfer-sent-side' : 'wechat-transfer-received-side']">
+
@@ -1233,7 +1234,7 @@ @click.stop >
-
{{ chatHistoryModalTitle || '聊天记录' }}
+
{{ chatHistoryModalTitle || '合并消息' }}
@@ -1516,6 +1521,19 @@
+
+
HTML 选项
+
+ +
+ 仅 HTML 生效;会在导出时尝试下载远程缩略图并写入 ZIP(已做安全限制)。隐私模式下自动忽略。 +
+
+
+
\n") + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + tw.write('
\n') + + conv_dir_norm = str(conv_dir or "").strip().strip("/").replace("\\", "/") + for item in session_items: + item_conv_dir = str(item.get("convDir") or "").strip().strip("/").replace("\\", "/") + if not item_conv_dir: + continue + + href = f"{rel_root}{item_conv_dir}/messages.html" + item_display_name = str(item.get("displayName") or "").strip() or "会话" + item_avatar_path = str(item.get("avatarPath") or "").strip() + item_avatar_src = rel_path(item_avatar_path) if item_avatar_path else "" + item_last_time = str(item.get("lastTimeText") or "").strip() + item_preview = str(item.get("previewText") or "").strip() + + is_active = False + try: + is_active = (str(item.get("username") or "").strip() == conv_username) or (item_conv_dir == conv_dir_norm) + except Exception: + is_active = item_conv_dir == conv_dir_norm + + safe_char = (item_display_name[:1] or "?").strip() or "?" + classes = ( + "px-3 cursor-pointer transition-colors duration-150 border-b border-gray-100 " + "h-[calc(80px/var(--dpr))] flex items-center" + ) + if is_active: + classes += " bg-[#DEDEDE]" + else: + classes += " hover:bg-[#F5F5F5]" + + item_username = str(item.get("username") or "").strip() + tw.write( + f' \n") + tw.write('
\n') + tw.write( + '
\n' + ) + if item_avatar_src and (not privacy_mode): + tw.write( + f' {esc_attr(item_display_name)}\n' + ) + else: + tw.write( + f'
{esc_text(safe_char)}
\n' + ) + tw.write("
\n") + tw.write("
\n") + tw.write('
\n') + tw.write('
\n') + tw.write( + f'

{esc_text(item_display_name)}

\n' + ) + tw.write('
\n') + tw.write(f' {esc_text(item_last_time)}\n') + tw.write("
\n") + tw.write("
\n") + tw.write( + f'

{render_text_with_emojis(item_preview)}

\n' + ) + tw.write("
\n") + tw.write("
\n") + + tw.write("
\n") + tw.write("\n") + + # Right chat area + tw.write('
\n') + tw.write('
\n') + tw.write('
\n') + tw.write('
\n') + + tw.write('
\n') + tw.write('
\n') + tw.write(f'

{esc_text(chat_title)}

\n') + tw.write("
\n") + tw.write('
\n') + tw.write(f' \n") + tw.write("
\n") + tw.write("
\n") + + tw.write('
\n') + + sender_alias_map: dict[str, int] = {} + prev_ts = 0 + scanned = 0 + for row in _iter_rows_for_conversation( + account_dir=account_dir, + conv_username=conv_username, + start_time=start_time, + end_time=end_time, + local_types=local_types, + ): + scanned += 1 + + msg = _parse_message_for_export( + row=row, + conv_username=conv_username, + is_group=conv_is_group, + resource_conn=resource_conn, + resource_chat_id=resource_chat_id, + sender_alias="", + ) + if not _is_render_type_selected(msg.get("renderType"), want_types): + continue + + sender_username = str(msg.get("senderUsername") or "").strip() + if privacy_mode: + _privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map) + else: + msg["senderDisplayName"] = resolve_display_name(sender_username) if sender_username else "" + msg["senderAvatarPath"] = ( + _materialize_avatar( + zf=zf, + head_image_conn=head_image_conn, + username=sender_username, + avatar_written=avatar_written, + ) + if (sender_username and head_image_conn is not None) + else "" + ) + + if include_media: + _attach_offline_media( + zf=zf, + account_dir=account_dir, + conv_username=conv_username, + msg=msg, + media_written=media_written, + report=report, + media_kinds=media_kinds, + allow_process_key_extract=allow_process_key_extract, + media_db_path=media_db_path, + lock=lock, + job=job, + ) + _remember_offline_media(msg) + + rt = str(msg.get("renderType") or "text").strip() or "text" + create_time_text = str(msg.get("createTimeText") or "").strip() + try: + ts = int(msg.get("createTime") or 0) + except Exception: + ts = 0 + + show_divider = False + if ts and ((prev_ts == 0) or (abs(ts - prev_ts) >= 300)): + show_divider = True + + if show_divider: + divider_text = _format_session_time(ts) + if divider_text: + tw.write('
\n') + tw.write(f'
{esc_text(divider_text)}
\n') + tw.write("
\n") + + # Wrapper (for filter) + tw.write(f'
\n') + + if rt == "system": + tw.write('
\n') + tw.write(f'
{esc_text(msg.get("content") or "")}
\n') + tw.write("
\n") + tw.write("
\n") + exported += 1 + with lock: + job.progress.messages_exported += 1 + job.progress.current_conversation_messages_exported = exported + if ts: + prev_ts = ts + continue + + is_sent = bool(msg.get("isSent")) + row_cls = "wce-msg-row wce-msg-row-sent flex items-center justify-end" if is_sent else "wce-msg-row wce-msg-row-received flex items-center justify-start" + msg_cls = "wce-msg wce-msg-sent flex items-start max-w-md flex-row-reverse" if is_sent else "wce-msg flex items-start max-w-md" + avatar_extra = "wce-avatar-sent ml-3" if is_sent else "wce-avatar-received mr-3" + + tw.write(f'
\n') + tw.write(f'
\n') + + avatar_src = rel_path(str(msg.get("senderAvatarPath") or "").strip()) + display_name = str(msg.get("senderDisplayName") or "").strip() + fallback_char = (display_name or sender_username or "?")[:1] + tw.write(" " + build_avatar_html(src=avatar_src, fallback_text=fallback_char, extra_class=avatar_extra) + "\n") + + align_cls = "items-end" if is_sent else "items-start" + tw.write(f'
\n') + if conv_is_group and (not is_sent) and display_name: + tw.write(f'
{esc_text(display_name)}
\n') + + pos_cls = "right-0" if is_sent else "left-0" + tw.write( + '
{esc_text(create_time_text)}
\n' + ) + + # Message body + bubble_dir_cls = "bg-[#95EC69] text-black bubble-tail-r" if is_sent else "bg-white text-gray-800 bubble-tail-l" + bubble_base_cls = "px-3 py-2 text-sm max-w-sm relative msg-bubble whitespace-pre-wrap break-words leading-relaxed" + bubble_unknown_cls = ( + "px-3 py-2 text-xs max-w-sm relative msg-bubble whitespace-pre-wrap break-words leading-relaxed text-gray-700" + ) + + if rt == "image": + src = offline_path(msg, "image") + if not src: + url = str(msg.get("imageUrl") or "").strip() + src = url if is_http_url(url) else "" + if src: + tw.write('
\n') + tw.write('
\n') + tw.write(f' \n') + tw.write(f' 图片\n') + tw.write(" \n") + tw.write("
\n") + tw.write("
\n") + else: + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + elif rt == "emoji": + src = offline_path(msg, "emoji") + if not src: + url = str(msg.get("emojiUrl") or "").strip() + src = url if is_http_url(url) else "" + if src: + emoji_dir = " flex-row-reverse" if is_sent else "" + tw.write(f'
\n') + tw.write(f' 表情\n') + tw.write("
\n") + else: + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + elif rt == "video": + thumb = offline_path(msg, "video_thumb") + if not thumb: + url = str(msg.get("videoThumbUrl") or "").strip() + thumb = url if is_http_url(url) else "" + video = offline_path(msg, "video") + if not video: + url = str(msg.get("videoUrl") or "").strip() + video = url if is_http_url(url) else "" + if thumb: + tw.write('
\n') + tw.write('
\n') + tw.write(f' 视频\n') + if video: + tw.write(f' \n') + tw.write('
\n') + tw.write(' \n') + tw.write("
\n") + tw.write("
\n") + else: + tw.write('
\n') + tw.write('
\n') + tw.write(' \n') + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + else: + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + elif rt == "voice": + voice = offline_path(msg, "voice") + if voice: + duration_ms = msg.get("voiceLength") + width = get_voice_width(duration_ms) + seconds = get_voice_duration_in_seconds(duration_ms) + voice_dir_cls = "wechat-voice-sent" if is_sent else "wechat-voice-received" + content_dir_cls = " flex-row-reverse" if is_sent else "" + icon_dir_cls = "voice-icon-sent" if is_sent else "voice-icon-received" + voice_id = str(msg.get("id") or "").strip() + + tw.write('
\n') + tw.write( + f'
\n' + ) + tw.write(f'
\n') + tw.write( + f' \n' + ) + tw.write( + ' \n' + ) + tw.write( + ' \n' + ) + tw.write( + ' \n' + ) + tw.write(" \n") + tw.write(f' {esc_text(seconds)}"\n') + tw.write("
\n") + tw.write("
\n") + tw.write(f' \n') + tw.write("
\n") + else: + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + elif rt == "file": + fsrc = offline_path(msg, "file") + title = str(msg.get("title") or msg.get("content") or "文件").strip() + size = str(msg.get("fileSize") or "").strip() + size_text = format_file_size(size) + sent_side_cls = " wechat-special-sent-side" if is_sent else "" + cls = f"wechat-redpacket-card wechat-special-card wechat-file-card msg-radius{sent_side_cls}" + tag = "a" if fsrc else "div" + attrs = f' href="{esc_attr(fsrc)}" download' if fsrc else "" + tw.write(f' <{tag}{attrs} class="{esc_attr(cls)}">\n') + tw.write('
\n') + tw.write('
\n') + tw.write(f' {esc_text(title or "文件")}\n') + if size_text: + tw.write(f' {esc_text(size_text)}\n') + tw.write("
\n") + tw.write(f' \n') + tw.write("
\n") + tw.write('
\n') + tw.write(f' \n') + tw.write(" 微信电脑版\n") + tw.write("
\n") + tw.write(f" \n") + elif rt == "link": + url = str(msg.get("url") or "").strip() + safe_url = url if is_http_url(url) else "" + if safe_url: + heading = str(msg.get("title") or msg.get("content") or safe_url).strip() + abstract = str(msg.get("content") or "").strip() + preview = str(msg.get("thumbUrl") or "").strip() + preview_url = "" + if is_http_url(preview): + local = maybe_download_remote_image(preview) + preview_url = local or preview + variant = str(msg.get("linkStyle") or "").strip().lower() + + from_text = get_link_from_text(msg, url=safe_url) + from_avatar_text = first_glyph(from_text) or "\u200B" + from_text = from_text or "\u200B" + sent_side_cls = " wechat-special-sent-side" if is_sent else "" + + if variant == "cover": + cls = f"wechat-link-card-cover wechat-special-card msg-radius{sent_side_cls}" + tw.write( + f' \n' + ) + if preview_url: + tw.write(' \n") + else: + tw.write(' \n") + tw.write(f' \n') + tw.write(" \n") + else: + cls = f"wechat-link-card wechat-special-card msg-radius{sent_side_cls}" + tw.write( + f' \n' + ) + tw.write(' \n") + tw.write(' \n") + tw.write(" \n") + else: + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + elif rt == "voip": + voip_dir_cls = "wechat-voip-sent" if is_sent else "wechat-voip-received" + content_dir_cls = " flex-row-reverse" if is_sent else "" + voip_type = str(msg.get("voipType") or "").strip().lower() + icon = "wechat-video-light.png" if voip_type == "video" else "wechat-audio-light.png" + tw.write(f'
\n') + tw.write(f'
\n') + tw.write(f' \n') + tw.write(f' {esc_text(msg.get("content") or "通话")}\n') + tw.write("
\n") + tw.write("
\n") + elif rt == "quote": + tw.write( + f'
{render_text_with_emojis(msg.get("content") or "")}
\n' + ) + + qt = str(msg.get("quoteTitle") or "").strip() + qc = str(msg.get("quoteContent") or "").strip() + qthumb = str(msg.get("quoteThumbUrl") or "").strip() + qtype = str(msg.get("quoteType") or "").strip() + qsid_raw = str(msg.get("quoteServerId") or "").strip() + qsid = int(qsid_raw) if qsid_raw.isdigit() else 0 + + def is_quoted_voice() -> bool: + if qtype == "34": + return True + return (qc == "[语音]") and bool(qsid_raw) + + def is_quoted_image() -> bool: + if qtype == "3": + return True + return (qc == "[图片]") and bool(qsid_raw) + + def is_quoted_link() -> bool: + if qtype == "49": + return True + return bool(re.match(r"^\[链接\]\s*", qc)) + + def get_quoted_link_text() -> str: + if not qc: + return "" + return re.sub(r"^\[链接\]\s*", "", qc).strip() or qc + + quoted_voice = is_quoted_voice() + quoted_image = is_quoted_image() + quoted_link = is_quoted_link() + + quote_voice_url = "" + if include_media and ("voice" in media_kinds) and quoted_voice and qsid: + try: + arc, is_new = _materialize_voice( + zf=zf, + media_db_path=media_db_path, + server_id=int(qsid), + media_written=media_written, + ) + except Exception: + arc, is_new = "", False + if arc: + quote_voice_url = rel_path(arc) + if is_new: + with lock: + job.progress.media_copied += 1 + + quote_image_url = "" + if include_media and ("image" in media_kinds) and quoted_image and qsid and resource_conn is not None: + md5_hit = "" + try: + md5_hit = _lookup_resource_md5( + resource_conn, + resource_chat_id, + message_local_type=3, + server_id=int(qsid), + local_id=0, + create_time=0, + ) + except Exception: + md5_hit = "" + + if md5_hit: + try: + arc, is_new = _materialize_media( + zf=zf, + account_dir=account_dir, + conv_username=conv_username, + kind="image", + md5=str(md5_hit or "").strip().lower(), + file_id="", + media_written=media_written, + suggested_name="", + ) + except Exception: + arc, is_new = "", False + if arc: + quote_image_url = rel_path(arc) + if is_new: + with lock: + job.progress.media_copied += 1 + + qthumb_url = "" + if is_http_url(qthumb): + qthumb_local = maybe_download_remote_image(qthumb) if download_remote_media else "" + qthumb_url = qthumb_local or qthumb + + if qt or qc: + tw.write( + '
\n' + ) + tw.write('
\n') + if quoted_voice: + seconds = get_voice_duration_in_seconds(msg.get("quoteVoiceLength")) + disabled = not bool(quote_voice_url) + btn_cls = "flex items-center gap-1 min-w-0 hover:opacity-80" + if disabled: + btn_cls += " opacity-60 cursor-not-allowed" + dis_attr = " disabled" if disabled else "" + tw.write('
\n') + if qt: + tw.write(f' {esc_text(qt)}:\n') + tw.write( + f' \n") + if quote_voice_url: + tw.write( + f' \n' + ) + tw.write("
\n") + else: + tw.write('
\n') + if quoted_link: + link_text = get_quoted_link_text() + tw.write('
\n') + if qt: + tw.write(f' {esc_text(qt)}:\n') + if link_text: + ml = ' class="ml-1"' if qt else "" + tw.write(f' 🔗 {esc_text(link_text)}\n') + tw.write("
\n") + else: + hide_qc = quoted_image and qt and bool(quote_image_url) + tw.write('
\n') + if qt: + tw.write(f' {esc_text(qt)}:\n') + if qc and (not hide_qc): + ml = ' class="ml-1"' if qt else "" + tw.write(f' {esc_text(qc)}\n') + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + + if quoted_link and qthumb_url: + tw.write( + f' \n' + ) + tw.write( + f' 引用链接缩略图\n' + ) + tw.write(" \n") + + if (not quoted_link) and quoted_image and quote_image_url: + tw.write( + f' \n' + ) + tw.write( + f' 引用图片\n' + ) + tw.write(" \n") + + tw.write("
\n") + elif rt == "chatHistory": + title = str(msg.get("title") or "").strip() or "合并消息" + record_item = str(msg.get("recordItem") or "").strip() + record_item_b64 = "" + if record_item: + try: + record_item_b64 = base64.b64encode(record_item.encode("utf-8", errors="replace")).decode("ascii") + except Exception: + record_item_b64 = "" + + if record_item and include_media and (not privacy_mode): + try: + for m in _CHAT_HISTORY_MD5_TAG_RE.findall(record_item): + _ensure_chat_history_md5(m) + except Exception: + pass + if resource_conn is not None: + try: + server_map = page_media_index.get("serverMd5") + if not isinstance(server_map, dict): + server_map = {} + page_media_index["serverMd5"] = server_map + + for sid_raw in _CHAT_HISTORY_SERVER_ID_TAG_RE.findall(record_item): + sid_text = str(sid_raw or "").strip() + if not sid_text or sid_text in server_map: + continue + if (len(sid_text) > 24) or (not sid_text.isdigit()): + continue + sid = int(sid_text) + if sid <= 0: + continue + + md5_hit = "" + try: + md5_hit = _lookup_resource_md5( + resource_conn, + None, # do NOT filter by chat_id: merged-forward records come from other chats + 0, # do NOT filter by local_type + int(sid), + 0, + 0, + ) + except Exception: + md5_hit = "" + + md5_hit = str(md5_hit or "").strip().lower() + if not _is_md5(md5_hit): + continue + if _ensure_chat_history_md5(md5_hit): + server_map[sid_text] = md5_hit + except Exception: + pass + if download_remote_media: + try: + for u in _CHAT_HISTORY_URL_TAG_RE.findall(record_item): + maybe_download_remote_image(u) + except Exception: + pass + + lines = get_chat_history_preview_lines(msg) + sent_side_cls = " wechat-special-sent-side" if is_sent else "" + cls = f"wechat-chat-history-card wechat-special-card msg-radius{sent_side_cls} cursor-pointer" + tw.write( + f'
\n' + ) + tw.write('
\n') + tw.write(f'
{esc_text(title)}
\n') + if lines: + tw.write('
\n') + for line in lines: + tw.write(f'
{esc_text(line)}
\n') + tw.write("
\n") + tw.write("
\n") + tw.write('
合并消息
\n') + tw.write("
\n") + elif rt == "transfer": + received = is_transfer_received(msg) + returned = is_transfer_returned(msg) + overdue = is_transfer_overdue(msg) + side_cls = "wechat-transfer-sent-side" if is_sent else "wechat-transfer-received-side" + cls_parts = ["wechat-transfer-card", "msg-radius", side_cls] + if received: + cls_parts.append("wechat-transfer-received") + if returned: + cls_parts.append("wechat-transfer-returned") + if overdue: + cls_parts.append("wechat-transfer-overdue") + cls = " ".join(cls_parts) + if returned: + icon = "wechat-returned.png" + elif overdue: + icon = "overdue.png" + elif received: + icon = "wechat-trans-icon2.png" + else: + icon = "wechat-trans-icon1.png" + amount = format_transfer_amount(msg.get("amount")) + status = get_transfer_title(msg, is_sent=is_sent) + tw.write(f'
\n') + tw.write('
\n') + tw.write(f' \n') + tw.write('
\n') + if amount: + tw.write(f' ¥{esc_text(amount)}\n') + tw.write(f' {esc_text(status)}\n') + tw.write("
\n") + tw.write("
\n") + tw.write('
微信转账
\n') + tw.write("
\n") + elif rt == "redPacket": + received = False + cls_parts = ["wechat-redpacket-card", "wechat-special-card", "msg-radius"] + if received: + cls_parts.append("wechat-redpacket-received") + if is_sent: + cls_parts.append("wechat-special-sent-side") + icon = "wechat-trans-icon4.png" if received else "wechat-trans-icon3.png" + tw.write(f'
\n') + tw.write('
\n') + tw.write(f' \n') + tw.write('
\n') + tw.write(f' {esc_text(get_red_packet_text(msg))}\n') + if received: + tw.write(' 已领取\n') + tw.write("
\n") + tw.write("
\n") + tw.write('
微信红包
\n') + tw.write("
\n") + elif rt == "text": + tw.write(f'
{render_text_with_emojis(msg.get("content") or "")}
\n') + else: + content = str(msg.get("content") or "").strip() + if not content: + content = f"[{str(msg.get('type') or 'unknown')}] 消息" + tw.write(f'
{render_text_with_emojis(content)}
\n') + + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + + exported += 1 + with lock: + job.progress.messages_exported += 1 + job.progress.current_conversation_messages_exported = exported + if ts: + prev_ts = ts + + if scanned % 500 == 0 and job.cancel_requested: + raise _JobCancelled() + + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + tw.write("
\n") + tw.write("\n") + tw.write("\n") + + try: + media_index_payload = json.dumps(page_media_index, ensure_ascii=False) + except Exception: + media_index_payload = "{}" + media_index_payload = media_index_payload.replace("{media_index_payload}\n') + + tw.write( + '\n") + + tw.write("\n") + tw.write("\n") + tw.flush() + + zf.write(str(tmp_path), arcname) + + return exported + + def _format_message_line_txt(*, msg: dict[str, Any]) -> str: ts = int(msg.get("createTime") or 0) time_text = _format_ts(ts) @@ -1685,9 +4480,16 @@ def _privacy_scrub_message( for k in ( "title", "url", + "from", + "fromUsername", + "linkType", + "linkStyle", "thumbUrl", + "recordItem", "imageMd5", "imageFileId", + "imageMd5Candidates", + "imageFileIdCandidates", "imageUrl", "emojiMd5", "emojiUrl", @@ -1698,6 +4500,11 @@ def _privacy_scrub_message( "videoUrl", "videoThumbUrl", "voiceLength", + "quoteUsername", + "quoteServerId", + "quoteType", + "quoteThumbUrl", + "quoteVoiceLength", "quoteTitle", "quoteContent", "amount", @@ -1752,25 +4559,88 @@ def _attach_offline_media( offline: list[dict[str, Any]] = [] if rt == "image" and "image" in media_kinds: - md5 = str(msg.get("imageMd5") or "").strip().lower() - file_id = str(msg.get("imageFileId") or "").strip() - arc, is_new = _materialize_media( - zf=zf, - account_dir=account_dir, - conv_username=conv_username, - kind="image", - md5=md5 if _is_md5(md5) else "", - file_id=file_id, - media_written=media_written, - suggested_name="", - ) + primary_md5 = str(msg.get("imageMd5") or "").strip().lower() + primary_file_id = str(msg.get("imageFileId") or "").strip() + + md5_candidates_raw = msg.get("imageMd5Candidates") or [] + file_id_candidates_raw = msg.get("imageFileIdCandidates") or [] + md5_candidates = md5_candidates_raw if isinstance(md5_candidates_raw, list) else [] + file_id_candidates = file_id_candidates_raw if isinstance(file_id_candidates_raw, list) else [] + + md5s: list[str] = [] + file_ids: list[str] = [] + + def add_md5(v: Any) -> None: + s = str(v or "").strip().lower() + if _is_md5(s) and s not in md5s: + md5s.append(s) + + def add_file_id(v: Any) -> None: + s = str(v or "").strip() + if s and s not in file_ids: + file_ids.append(s) + + add_md5(primary_md5) + for v in md5_candidates: + add_md5(v) + + add_file_id(primary_file_id) + for v in file_id_candidates: + add_file_id(v) + + arc = "" + is_new = False + used_md5 = "" + used_file_id = "" + + # Prefer md5-based resolution first (more reliable), then fall back to file_id search. + for md5 in md5s: + arc, is_new = _materialize_media( + zf=zf, + account_dir=account_dir, + conv_username=conv_username, + kind="image", + md5=md5, + file_id="", + media_written=media_written, + suggested_name="", + ) + if arc: + used_md5 = md5 + break + + if not arc: + for file_id in file_ids: + arc, is_new = _materialize_media( + zf=zf, + account_dir=account_dir, + conv_username=conv_username, + kind="image", + md5="", + file_id=file_id, + media_written=media_written, + suggested_name="", + ) + if arc: + used_file_id = file_id + break + if arc: - offline.append({"kind": "image", "path": arc, "md5": md5, "fileId": file_id}) + # Keep primary fields in sync with what actually resolved. + try: + if used_md5: + msg["imageMd5"] = used_md5 + if used_file_id: + msg["imageFileId"] = used_file_id + except Exception: + pass + + offline.append({"kind": "image", "path": arc, "md5": used_md5 or primary_md5, "fileId": used_file_id or primary_file_id}) if is_new: with lock: job.progress.media_copied += 1 else: - record_missing("image", md5 or file_id) + record_missing("image", primary_md5 or primary_file_id) if rt == "emoji" and "emoji" in media_kinds: md5 = str(msg.get("emojiMd5") or "").strip().lower() @@ -2045,20 +4915,27 @@ def _materialize_media( except Exception: return "", False + try: + with open(src, "rb") as f: + head = f.read(64) + except Exception: + head = b"" + + head_mt = _detect_image_media_type(head[:32]) + looks_like_mp4 = len(head) >= 8 and head[4:8] == b"ftyp" + ext = src.suffix.lstrip(".").lower() if not ext: - try: - head = src.read_bytes()[:32] - except Exception: - head = b"" - mt = _detect_image_media_type(head) - if mt.startswith("image/"): - ext = mt.split("/", 1)[-1] - elif len(head) >= 8 and head[4:8] == b"ftyp": + if head_mt.startswith("image/"): + ext = head_mt.split("/", 1)[-1] + elif looks_like_mp4: ext = "mp4" else: ext = "dat" + if ext == "jpeg": + ext = "jpg" + folder = "misc" if kind == "image": folder = "images" @@ -2080,10 +4957,62 @@ def _materialize_media( arc_name = arc_name[:160] arc = f"media/{folder}/{arc_name}" - try: - zf.write(src, arcname=arc) - except Exception: - return "", False + should_stream_copy = False + if kind == "file": + should_stream_copy = True + elif kind in {"image", "emoji", "video_thumb"}: + should_stream_copy = ( + (ext == "jpg" and head_mt == "image/jpeg") + or (ext == "png" and head_mt == "image/png") + or (ext == "gif" and head_mt == "image/gif") + or (ext == "webp" and head_mt == "image/webp") + ) + elif kind == "video": + should_stream_copy = ext == "mp4" and looks_like_mp4 + + if should_stream_copy or (kind not in {"image", "emoji", "video", "video_thumb"}): + try: + zf.write(src, arcname=arc) + except Exception: + return "", False + else: + try: + data, mt = _read_and_maybe_decrypt_media(src, account_dir=account_dir) + except Exception: + try: + zf.write(src, arcname=arc) + except Exception: + return "", False + media_written[key] = arc + return arc, True + + mt = str(mt or "").strip() + if mt == "image/png": + ext2 = "png" + elif mt == "image/jpeg": + ext2 = "jpg" + elif mt == "image/gif": + ext2 = "gif" + elif mt == "image/webp": + ext2 = "webp" + elif mt == "video/mp4": + ext2 = "mp4" + else: + ext2 = "dat" if mt == "application/octet-stream" else (ext or "dat") + + if ext2 != ext: + if nice and kind == "file": + arc_name = f"{nice}_{ident}.{ext2}" if ext2 else f"{nice}_{ident}" + else: + arc_name = f"{ident}.{ext2}" if ext2 else ident + if len(arc_name) > 160: + arc_name = arc_name[:160] + arc = f"media/{folder}/{arc_name}" + + try: + zf.writestr(arc, data) + except Exception: + return "", False media_written[key] = arc return arc, True diff --git a/src/wechat_decrypt_tool/chat_helpers.py b/src/wechat_decrypt_tool/chat_helpers.py index 8efdfc3..8c4301f 100644 --- a/src/wechat_decrypt_tool/chat_helpers.py +++ b/src/wechat_decrypt_tool/chat_helpers.py @@ -8,7 +8,7 @@ from collections import Counter from datetime import datetime from pathlib import Path from typing import Any, Optional -from urllib.parse import quote, urlparse +from urllib.parse import parse_qs, quote, urlparse from fastapi import HTTPException @@ -634,6 +634,32 @@ def _is_mp_weixin_article_url(url: str) -> bool: return "mp.weixin.qq.com/" in lu +def _is_mp_weixin_feed_article_url(url: str) -> bool: + """Detect WeChat's PC feed/recommendation mp.weixin.qq.com share URLs. + + These links often carry an `exptype` like: + masonry_feed_brief_content_elite_for_pcfeeds_u2i + + WeChat desktop tends to render them in a cover-card style (image + bottom title), + so we use this as a hint to choose the 'cover' linkStyle. + """ + + u = str(url or "").strip() + if not u: + return False + + try: + parsed = urlparse(u) + q = parse_qs(parsed.query or "") + for v in (q.get("exptype") or []): + if "masonry_feed" in str(v or "").lower(): + return True + except Exception: + pass + + return "exptype=masonry_feed" in u.lower() + + def _classify_link_share(*, app_type: int, url: str, source_username: str, desc: str) -> tuple[str, str]: src = str(source_username or "").strip().lower() is_official_article = bool( @@ -647,7 +673,15 @@ def _classify_link_share(*, app_type: int, url: str, source_username: str, desc: hashtag_count = len(re.findall(r"#[^#\s]+", d)) # 公众号文章中「封面图 + 底栏标题」卡片特征:摘要以 #话题# 风格为主。 - link_style = "cover" if (is_official_article and (d.startswith("#") or hashtag_count >= 2)) else "default" + cover_like = bool( + is_official_article + and ( + d.startswith("#") + or hashtag_count >= 2 + or _is_mp_weixin_feed_article_url(url) + ) + ) + link_style = "cover" if cover_like else "default" return link_type, link_style @@ -948,8 +982,12 @@ def _parse_app_message(text: str) -> dict[str, Any]: "recordItem": record_item or "", } - if app_type in (5, 68) and url: - thumb_url = _normalize_xml_url(_extract_xml_tag_text(text, "thumburl")) + if app_type in (4, 5, 68) and url: + # Many appmsg link cards (notably Bilibili shares with 4) include a metadata block. + # DO NOT treat " dict[str, Any]: "quoteVoiceLength": quote_voice_length, } - if app_type == 62 or ".... + # Be strict here: lots of non-pat appmsg payloads still carry a nested ... metadata block. + patmsg_attr = bool(re.search(r"<(sysmsg|appmsg)\b[^>]*\btype=['\"]patmsg['\"]", lower)) + if app_type == 62 or patmsg_attr: return {"renderType": "system", "content": "[拍一拍]"} if app_type == 2000 or ( diff --git a/src/wechat_decrypt_tool/routers/chat.py b/src/wechat_decrypt_tool/routers/chat.py index 278b229..c896066 100644 --- a/src/wechat_decrypt_tool/routers/chat.py +++ b/src/wechat_decrypt_tool/routers/chat.py @@ -2742,6 +2742,90 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None: # - 将原始转账消息(1/8)回填为“已被接收” # - 若同一 transferId 同时存在原始消息与 paysubtype=3 消息,则将 paysubtype=3 的那条校正为“已收款” + def _is_transfer_expired_system_message(text: Any) -> bool: + content = str(text or "").strip() + if not content: + return False + if "转账" not in content or "过期" not in content: + return False + if "未接收" in content and ("24小时" in content or "二十四小时" in content): + return True + return "已过期" in content and ("收款方" in content or "转账" in content) + + def _mark_pending_transfers_expired_by_system_messages() -> set[str]: + expired_system_times: list[int] = [] + pending_candidates: list[tuple[int, int]] = [] # (index, createTime) + + for idx, msg in enumerate(merged): + rt = str(msg.get("renderType") or "").strip() + if rt == "system": + if _is_transfer_expired_system_message(msg.get("content")): + try: + ts = int(msg.get("createTime") or 0) + except Exception: + ts = 0 + if ts > 0: + expired_system_times.append(ts) + continue + + if rt != "transfer": + continue + + pst = str(msg.get("paySubType") or "").strip() + if pst not in ("1", "8"): + continue + + try: + ts = int(msg.get("createTime") or 0) + except Exception: + ts = 0 + if ts <= 0: + continue + + pending_candidates.append((idx, ts)) + + if not expired_system_times or not pending_candidates: + return set() + + used_pending_indexes: set[int] = set() + expired_transfer_ids: set[str] = set() + + # 过期系统提示通常出现在转账发起约 24 小时后。 + # 为避免误匹配,要求时间差落在 [22h, 26h] 范围内,并选择最接近 24h 的待收款消息。 + for sys_ts in sorted(expired_system_times): + best_index = -1 + best_distance = 10**9 + + for idx, transfer_ts in pending_candidates: + if idx in used_pending_indexes: + continue + delta = sys_ts - transfer_ts + if delta < 0: + continue + if delta < 22 * 3600 or delta > 26 * 3600: + continue + + distance = abs(delta - 24 * 3600) + if distance < best_distance: + best_distance = distance + best_index = idx + + if best_index < 0: + continue + + used_pending_indexes.add(best_index) + transfer_msg = merged[best_index] + transfer_msg["paySubType"] = "10" + transfer_msg["transferStatus"] = "已过期" + + tid = str(transfer_msg.get("transferId") or "").strip() + if tid: + expired_transfer_ids.add(tid) + + return expired_transfer_ids + + expired_transfer_ids = _mark_pending_transfers_expired_by_system_messages() + returned_transfer_ids: set[str] = set() # 退还状态的 transferId received_transfer_ids: set[str] = set() # 已收款状态的 transferId returned_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于退还回退匹配 @@ -2828,6 +2912,8 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None: tid = str(m.get("transferId") or "").strip() if not tid or tid not in pending_transfer_ids: continue + if tid in expired_transfer_ids: + continue mid = str(m.get("id") or "").strip() if mid and mid in backfilled_message_ids: continue diff --git a/src/wechat_decrypt_tool/routers/chat_export.py b/src/wechat_decrypt_tool/routers/chat_export.py index 7a94f10..3082294 100644 --- a/src/wechat_decrypt_tool/routers/chat_export.py +++ b/src/wechat_decrypt_tool/routers/chat_export.py @@ -12,17 +12,31 @@ from ..path_fix import PathFixRoute router = APIRouter(route_class=PathFixRoute) -ExportFormat = Literal["json", "txt"] +ExportFormat = Literal["json", "txt", "html"] ExportScope = Literal["selected", "all", "groups", "singles"] MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"] -MessageType = Literal["text", "image", "emoji", "video", "voice", "file", "link", "transfer", "redPacket", "system", "quote", "voip"] +MessageType = Literal[ + "text", + "image", + "emoji", + "video", + "voice", + "chatHistory", + "file", + "link", + "transfer", + "redPacket", + "system", + "quote", + "voip", +] class ChatExportCreateRequest(BaseModel): account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)") scope: ExportScope = Field("selected", description="导出范围:selected=指定会话;all=全部;groups=仅群聊;singles=仅单聊") usernames: list[str] = Field(default_factory=list, description="会话 username 列表(scope=selected 时使用)") - format: ExportFormat = Field("json", description="导出格式:json 或 txt(zip 内每个会话一个文件)") + format: ExportFormat = Field("json", description="导出格式:json/txt/html(zip 内每个会话一个文件;html 可离线打开 index.html 查看)") start_time: Optional[int] = Field(None, description="起始时间(Unix 秒,含)") end_time: Optional[int] = Field(None, description="结束时间(Unix 秒,含)") include_hidden: bool = Field(False, description="是否包含隐藏会话(scope!=selected 时)") @@ -41,6 +55,10 @@ class ChatExportCreateRequest(BaseModel): False, description="预留字段:本项目不从微信进程提取媒体密钥,请使用 wx_key 获取并保存/批量解密", ) + download_remote_media: bool = Field( + False, + description="HTML 导出时允许联网下载链接/引用缩略图等远程媒体(提高离线完整性)", + ) privacy_mode: bool = Field( False, description="隐私模式导出:隐藏会话/用户名/内容,不打包头像与媒体", @@ -64,6 +82,7 @@ async def create_chat_export(req: ChatExportCreateRequest): message_types=req.message_types, output_dir=req.output_dir, allow_process_key_extract=req.allow_process_key_extract, + download_remote_media=req.download_remote_media, privacy_mode=req.privacy_mode, file_name=req.file_name, ) diff --git a/tests/test_chat_app_message_type4_patmsg_regression.py b/tests/test_chat_app_message_type4_patmsg_regression.py new file mode 100644 index 0000000..d5e7777 --- /dev/null +++ b/tests/test_chat_app_message_type4_patmsg_regression.py @@ -0,0 +1,50 @@ +import sys +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from wechat_decrypt_tool.chat_helpers import _parse_app_message + + +class TestChatAppMessageType4PatMsgRegression(unittest.TestCase): + def test_type4_link_with_patmsg_metadata_is_not_misclassified_as_pat(self): + raw_text = ( + "" + '' + "【中配】抽象可能让你的代码变差 - CodeAesthetic" + "UP主:黑纹白斑马" + "4" + "https://b23.tv/au68guF" + "哔哩哔哩" + "3057020100044b30" + "" + "" + "" + ) + + parsed = _parse_app_message(raw_text) + self.assertEqual(parsed.get("renderType"), "link") + self.assertEqual(parsed.get("url"), "https://b23.tv/au68guF") + self.assertEqual(parsed.get("title"), "【中配】抽象可能让你的代码变差 - CodeAesthetic") + self.assertEqual(parsed.get("from"), "哔哩哔哩") + self.assertNotEqual(parsed.get("content"), "[拍一拍]") + + def test_type62_is_still_pat(self): + raw_text = '"A" 拍了拍 "B"62' + parsed = _parse_app_message(raw_text) + self.assertEqual(parsed.get("renderType"), "system") + self.assertEqual(parsed.get("content"), "[拍一拍]") + + def test_sysmsg_type_patmsg_attr_is_still_pat(self): + raw_text = 'bar' + parsed = _parse_app_message(raw_text) + self.assertEqual(parsed.get("renderType"), "system") + self.assertEqual(parsed.get("content"), "[拍一拍]") + + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/test_chat_export_chat_history_modal.py b/tests/test_chat_export_chat_history_modal.py new file mode 100644 index 0000000..192e487 --- /dev/null +++ b/tests/test_chat_export_chat_history_modal.py @@ -0,0 +1,218 @@ +import os +import hashlib +import sqlite3 +import sys +import unittest +import zipfile +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportChatHistoryModal(unittest.TestCase): + _MD5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + def _reload_export_modules(self): + import wechat_decrypt_tool.app_paths as app_paths + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.media_helpers as media_helpers + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(app_paths) + importlib.reload(chat_helpers) + importlib.reload(media_helpers) + importlib.reload(chat_export_service) + return chat_export_service + + def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (account, "", "我", "", 1, 0, "", ""), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (username, "", "测试好友", "", 1, 0, "", ""), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path, *, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute( + "INSERT INTO SessionTable VALUES (?, ?, ?)", + (username, 0, 1735689600), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account)) + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username)) + + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + + record_item = ( + "" + "" + "" + "2" + f"{self._MD5}" + "" + "" + "" + ) + chat_history_xml = ( + "" + "19" + "聊天记录" + "记录预览" + f"" + "" + ) + + conn.execute( + f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (1, 1001, 49, 1, 2, 1735689601, chat_history_xml, None), + ) + conn.commit() + finally: + conn.close() + + def _seed_media_files(self, account_dir: Path) -> None: + resource_root = account_dir / "resource" + (resource_root / "aa").mkdir(parents=True, exist_ok=True) + (resource_root / "aa" / f"{self._MD5}.jpg").write_bytes(b"\xff\xd8\xff\xd9") + + def _prepare_account(self, root: Path, *, account: str, username: str) -> Path: + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db", account=account, username=username) + self._seed_session_db(account_dir / "session.db", username=username) + self._seed_message_db(account_dir / "message_0.db", account=account, username=username) + self._seed_media_files(account_dir) + return account_dir + + def _create_job(self, manager, *, account: str, username: str): + job = manager.create_job( + account=account, + scope="selected", + usernames=[username], + export_format="html", + start_time=None, + end_time=None, + include_hidden=False, + include_official=False, + include_media=True, + media_kinds=["image"], + message_types=["chatHistory", "image"], + output_dir=None, + allow_process_key_extract=False, + download_remote_media=False, + privacy_mode=False, + file_name=None, + ) + + for _ in range(200): + latest = manager.get_job(job.export_id) + if latest and latest.status in {"done", "error", "cancelled"}: + return latest + import time as _time + + _time.sleep(0.05) + self.fail("export job did not finish in time") + + def test_chat_history_modal_has_media_index_and_record_item(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + self._prepare_account(root, account=account, username=username) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username) + self.assertEqual(job.status, "done", msg=job.error) + + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + self.assertIn(f"media/images/{self._MD5}.jpg", names) + + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + html_text = zf.read(html_path).decode("utf-8") + self.assertIn('id="chatHistoryModal"', html_text) + self.assertIn('data-wce-chat-history="1"', html_text) + self.assertIn('data-record-item-b64="', html_text) + self.assertIn('id="wceMediaIndex"', html_text) + self.assertIn(self._MD5, html_text) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data diff --git a/tests/test_chat_export_html_format.py b/tests/test_chat_export_html_format.py new file mode 100644 index 0000000..d8e5bcf --- /dev/null +++ b/tests/test_chat_export_html_format.py @@ -0,0 +1,353 @@ +import os +import json +import hashlib +import sqlite3 +import sys +import unittest +import zipfile +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportHtmlFormat(unittest.TestCase): + _FILE_MD5 = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + _VOICE_SERVER_ID = 2001 + + def _reload_export_modules(self): + import wechat_decrypt_tool.app_paths as app_paths + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.media_helpers as media_helpers + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(app_paths) + importlib.reload(chat_helpers) + importlib.reload(media_helpers) + importlib.reload(chat_export_service) + return chat_export_service + + def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (account, "", "我", "", 1, 0, "", ""), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (username, "", "测试好友", "", 1, 0, "", ""), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path, *, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute( + "INSERT INTO SessionTable VALUES (?, ?, ?)", + (username, 0, 1735689600), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account)) + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username)) + + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + + image_xml = '' + voice_xml = '' + file_md5 = self._FILE_MD5 + file_xml = ( + "" + "6" + "demo.pdf" + "2048" + f"{file_md5}" + "" + ) + link_xml = ( + "" + "5" + "示例链接" + "这是描述" + "https://example.com/" + "https://example.com/thumb.jpg" + "gh_test" + "测试公众号" + "" + ) + chat_history_xml = ( + "" + "19" + "聊天记录" + "记录预览" + "张三: hi\n李四: ok" + "" + ) + transfer_xml = ( + "" + "2000" + "微信转账" + "" + "转账备注" + "¥1.23" + "3" + "transfer_123" + "" + "" + ) + red_packet_xml = ( + "" + "2001" + "红包" + "" + "恭喜发财,大吉大利" + "微信红包" + "" + "" + ) + voip_xml = ( + "" + "1" + "语音通话" + "" + ) + quote_voice_xml = ( + "" + "57" + "回复语音" + "" + "34" + f"{self._VOICE_SERVER_ID}" + "wxid_friend" + "测试好友" + "wxid_friend:3000:1:" + "" + "" + ) + rows = [ + (1, 1001, 3, 1, 2, 1735689601, image_xml, None), + (2, 1002, 1, 2, 2, 1735689602, "普通文本消息[微笑]", None), + (3, 1003, 49, 3, 1, 1735689603, transfer_xml, None), + (4, 1004, 49, 4, 2, 1735689604, red_packet_xml, None), + (5, 1005, 49, 5, 1, 1735689605, file_xml, None), + (6, 1006, 49, 6, 2, 1735689606, link_xml, None), + (7, 1007, 49, 7, 2, 1735689607, chat_history_xml, None), + (8, 1008, 50, 8, 2, 1735689608, voip_xml, None), + (9, self._VOICE_SERVER_ID, 34, 9, 1, 1735689609, voice_xml, None), + (10, 1010, 49, 10, 1, 1735689610, quote_voice_xml, None), + ] + conn.executemany( + f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + finally: + conn.close() + + def _seed_media_files(self, account_dir: Path) -> None: + resource_root = account_dir / "resource" + (resource_root / "aa").mkdir(parents=True, exist_ok=True) + (resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9") + (resource_root / "bb").mkdir(parents=True, exist_ok=True) + (resource_root / "bb" / f"{self._FILE_MD5}.dat").write_bytes(b"dummy") + + conn = sqlite3.connect(str(account_dir / "media_0.db")) + try: + conn.execute( + """ + CREATE TABLE VoiceInfo ( + svr_id INTEGER, + create_time INTEGER, + voice_data BLOB + ) + """ + ) + conn.execute( + "INSERT INTO VoiceInfo VALUES (?, ?, ?)", + (self._VOICE_SERVER_ID, 1735689609, b"SILK_VOICE_DATA"), + ) + conn.commit() + finally: + conn.close() + + def _prepare_account(self, root: Path, *, account: str, username: str) -> Path: + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db", account=account, username=username) + self._seed_session_db(account_dir / "session.db", username=username) + self._seed_message_db(account_dir / "message_0.db", account=account, username=username) + self._seed_media_files(account_dir) + return account_dir + + def _create_job(self, manager, *, account: str, username: str): + job = manager.create_job( + account=account, + scope="selected", + usernames=[username], + export_format="html", + start_time=None, + end_time=None, + include_hidden=False, + include_official=False, + include_media=True, + media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"], + message_types=[], + output_dir=None, + allow_process_key_extract=False, + download_remote_media=False, + privacy_mode=False, + file_name=None, + ) + + for _ in range(200): + latest = manager.get_job(job.export_id) + if latest and latest.status in {"done", "error", "cancelled"}: + return latest + import time as _time + + _time.sleep(0.05) + self.fail("export job did not finish in time") + + def test_html_export_contains_index_and_conversation_page(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + self._prepare_account(root, account=account, username=username) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username) + self.assertEqual(job.status, "done", msg=job.error) + + self.assertTrue(job.zip_path and job.zip_path.exists()) + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + + self.assertIn("index.html", names) + self.assertIn("assets/wechat-chat-export.css", names) + self.assertIn("assets/wechat-chat-export.js", names) + + manifest = json.loads(zf.read("manifest.json").decode("utf-8")) + self.assertEqual(manifest.get("format"), "html") + + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + + html_text = zf.read(html_path).decode("utf-8") + self.assertIn('data-wce-rail-avatar="1"', html_text) + self.assertIn('data-wce-session-list="1"', html_text) + self.assertIn('id="sessionSearchInput"', html_text) + self.assertIn('data-wce-time-divider="1"', html_text) + self.assertIn('id="messageTypeFilter"', html_text) + self.assertIn('value="chatHistory"', html_text) + self.assertIn('id="chatHistoryModal"', html_text) + self.assertIn('data-wce-chat-history="1"', html_text) + self.assertIn('data-record-item-b64="', html_text) + self.assertIn('id="wceMediaIndex"', html_text) + self.assertIn('data-wce-quote-voice-btn="1"', html_text) + self.assertNotIn('title="刷新消息"', html_text) + self.assertNotIn('title="导出聊天记录"', html_text) + self.assertNotIn("搜索聊天记录", html_text) + self.assertNotIn("朋友圈", html_text) + self.assertNotIn("年度总结", html_text) + self.assertNotIn("设置", html_text) + self.assertNotIn("隐私模式", html_text) + + self.assertTrue(any(n.startswith("media/images/") for n in names)) + self.assertIn("../../media/images/", html_text) + + self.assertIn("wechat-transfer-card", html_text) + self.assertIn("wechat-redpacket-card", html_text) + self.assertIn("wechat-chat-history-card", html_text) + self.assertIn("wechat-voip-bubble", html_text) + self.assertIn("wechat-link-card", html_text) + self.assertIn("wechat-file-card", html_text) + self.assertIn("wechat-voice-wrapper", html_text) + + css_text = zf.read("assets/wechat-chat-export.css").decode("utf-8", errors="ignore") + self.assertIn("wechat-transfer-card", css_text) + self.assertNotIn("wechat-transfer-card[data-v-", css_text) + + js_text = zf.read("assets/wechat-chat-export.js").decode("utf-8", errors="ignore") + self.assertIn("wechat-voice-bubble", js_text) + self.assertIn("voice-playing", js_text) + self.assertIn("data-wce-quote-voice-btn", js_text) + + self.assertIn("assets/images/wechat/wechat-trans-icon1.png", names) + self.assertIn("assets/images/wechat/zip.png", names) + self.assertIn("assets/images/wechat/WeChat-Icon-Logo.wine.svg", names) + self.assertTrue(any(n.startswith("fonts/") and n.endswith(".woff2") for n in names)) + self.assertIn("wxemoji/Expression_1@2x.png", names) + self.assertIn("../../wxemoji/Expression_1@2x.png", html_text) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data diff --git a/tests/test_chat_export_image_md5_candidate_fallback.py b/tests/test_chat_export_image_md5_candidate_fallback.py new file mode 100644 index 0000000..401b716 --- /dev/null +++ b/tests/test_chat_export_image_md5_candidate_fallback.py @@ -0,0 +1,199 @@ +import os +import hashlib +import sqlite3 +import sys +import unittest +import zipfile +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportImageMd5CandidateFallback(unittest.TestCase): + def _reload_export_modules(self): + import wechat_decrypt_tool.app_paths as app_paths + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.media_helpers as media_helpers + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(app_paths) + importlib.reload(chat_helpers) + importlib.reload(media_helpers) + importlib.reload(chat_export_service) + return chat_export_service + + def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (account, "", "我", "", 1, 0, "", ""), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (username, "", "测试好友", "", 1, 0, "", ""), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path, *, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute( + "INSERT INTO SessionTable VALUES (?, ?, ?)", + (username, 0, 1735689600), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account)) + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username)) + + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + + good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + bad_md5 = "ffffffffffffffffffffffffffffffff" + image_xml = f'' + + conn.execute( + f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (1, 1001, 3, 1, 2, 1735689601, image_xml, None), + ) + conn.commit() + finally: + conn.close() + + def _seed_decrypted_resource(self, account_dir: Path) -> None: + resource_root = account_dir / "resource" + (resource_root / "aa").mkdir(parents=True, exist_ok=True) + (resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9") + + def _prepare_account(self, root: Path, *, account: str, username: str) -> Path: + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db", account=account, username=username) + self._seed_session_db(account_dir / "session.db", username=username) + self._seed_message_db(account_dir / "message_0.db", account=account, username=username) + self._seed_decrypted_resource(account_dir) + return account_dir + + def _create_job(self, manager, *, account: str, username: str): + job = manager.create_job( + account=account, + scope="selected", + usernames=[username], + export_format="html", + start_time=None, + end_time=None, + include_hidden=False, + include_official=False, + include_media=True, + media_kinds=["image"], + message_types=[], + output_dir=None, + allow_process_key_extract=False, + download_remote_media=False, + privacy_mode=False, + file_name=None, + ) + + for _ in range(200): + latest = manager.get_job(job.export_id) + if latest and latest.status in {"done", "error", "cancelled"}: + return latest + import time as _time + + _time.sleep(0.05) + self.fail("export job did not finish in time") + + def test_falls_back_to_secondary_md5_candidate(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + self._prepare_account(root, account=account, username=username) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username) + self.assertEqual(job.status, "done", msg=job.error) + + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + self.assertIn("media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", names) + self.assertFalse(any("ffffffffffffffffffffffffffffffff" in n for n in names if n.startswith("media/images/"))) + + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + html_text = zf.read(html_path).decode("utf-8", errors="ignore") + self.assertIn("../../media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", html_text) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data + diff --git a/tests/test_chat_export_image_md5_prefers_message_resource.py b/tests/test_chat_export_image_md5_prefers_message_resource.py new file mode 100644 index 0000000..1b9d942 --- /dev/null +++ b/tests/test_chat_export_image_md5_prefers_message_resource.py @@ -0,0 +1,235 @@ +import os +import hashlib +import sqlite3 +import sys +import unittest +import zipfile +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportImageMd5PrefersMessageResource(unittest.TestCase): + def _reload_export_modules(self): + import wechat_decrypt_tool.app_paths as app_paths + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.media_helpers as media_helpers + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(app_paths) + importlib.reload(chat_helpers) + importlib.reload(media_helpers) + importlib.reload(chat_export_service) + return chat_export_service + + def _seed_source_info(self, account_dir: Path) -> None: + wxid_dir = account_dir / "_wxid_dummy" + db_storage_dir = account_dir / "_db_storage_dummy" + wxid_dir.mkdir(parents=True, exist_ok=True) + db_storage_dir.mkdir(parents=True, exist_ok=True) + (account_dir / "_source.json").write_text( + '{"wxid_dir": "' + str(wxid_dir).replace("\\", "\\\\") + '", "db_storage_path": "' + str(db_storage_dir).replace("\\", "\\\\") + '"}', + encoding="utf-8", + ) + + def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (account, "", "我", "", 1, 0, "", ""), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (username, "", "测试好友", "", 1, 0, "", ""), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path, *, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute( + "INSERT INTO SessionTable VALUES (?, ?, ?)", + (username, 0, 1735689600), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str, username: str, bad_md5: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account)) + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username)) + + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + + image_xml = f'' + conn.execute( + f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (1, 1001, 3, 1, 2, 1735689601, image_xml, None), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_resource_db(self, path: Path, *, good_md5: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE MessageResourceInfo ( + message_id INTEGER, + message_svr_id INTEGER, + message_local_type INTEGER, + chat_id INTEGER, + message_local_id INTEGER, + message_create_time INTEGER, + packed_info BLOB + ) + """ + ) + # packed_info may contain multiple tokens; include a realistic *.dat reference so the extractor prefers it. + packed_info = f"{good_md5}_t.dat".encode("ascii") + conn.execute( + "INSERT INTO MessageResourceInfo VALUES (?, ?, ?, ?, ?, ?, ?)", + (1, 1001, 3, 0, 1, 1735689601, packed_info), + ) + conn.commit() + finally: + conn.close() + + def _seed_decrypted_resource(self, account_dir: Path, *, good_md5: str) -> None: + resource_root = account_dir / "resource" + (resource_root / good_md5[:2]).mkdir(parents=True, exist_ok=True) + # Minimal JPEG payload (valid SOI/EOI). + (resource_root / good_md5[:2] / f"{good_md5}.jpg").write_bytes(b"\xff\xd8\xff\xd9") + + def _prepare_account(self, root: Path, *, account: str, username: str, bad_md5: str, good_md5: str) -> Path: + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + self._seed_source_info(account_dir) + self._seed_contact_db(account_dir / "contact.db", account=account, username=username) + self._seed_session_db(account_dir / "session.db", username=username) + self._seed_message_db(account_dir / "message_0.db", account=account, username=username, bad_md5=bad_md5) + self._seed_message_resource_db(account_dir / "message_resource.db", good_md5=good_md5) + self._seed_decrypted_resource(account_dir, good_md5=good_md5) + return account_dir + + def _create_job(self, manager, *, account: str, username: str): + job = manager.create_job( + account=account, + scope="selected", + usernames=[username], + export_format="html", + start_time=None, + end_time=None, + include_hidden=False, + include_official=False, + include_media=True, + media_kinds=["image"], + message_types=["image"], + output_dir=None, + allow_process_key_extract=False, + download_remote_media=False, + privacy_mode=False, + file_name=None, + ) + + for _ in range(200): + latest = manager.get_job(job.export_id) + if latest and latest.status in {"done", "error", "cancelled"}: + return latest + import time as _time + + _time.sleep(0.05) + self.fail("export job did not finish in time") + + def test_prefers_message_resource_md5_over_xml_md5(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + bad_md5 = "ffffffffffffffffffffffffffffffff" + good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + self._prepare_account(root, account=account, username=username, bad_md5=bad_md5, good_md5=good_md5) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username) + self.assertEqual(job.status, "done", msg=job.error) + + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + self.assertIn(f"media/images/{good_md5}.jpg", names) + + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + html_text = zf.read(html_path).decode("utf-8", errors="ignore") + self.assertIn(f"../../media/images/{good_md5}.jpg", html_text) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data + diff --git a/tests/test_chat_export_message_types_semantics.py b/tests/test_chat_export_message_types_semantics.py index d641bb8..7152753 100644 --- a/tests/test_chat_export_message_types_semantics.py +++ b/tests/test_chat_export_message_types_semantics.py @@ -198,6 +198,7 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase): message_types=message_types, output_dir=None, allow_process_key_extract=False, + download_remote_media=False, privacy_mode=privacy_mode, file_name=None, ) diff --git a/tests/test_chat_export_remote_thumb_option.py b/tests/test_chat_export_remote_thumb_option.py new file mode 100644 index 0000000..e587fef --- /dev/null +++ b/tests/test_chat_export_remote_thumb_option.py @@ -0,0 +1,304 @@ +import os +import hashlib +import sqlite3 +import sys +import unittest +import zipfile +import importlib +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import mock + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class _FakeResponse: + def __init__(self, body: bytes, *, content_type: str) -> None: + self.status_code = 200 + self.headers = { + "Content-Type": str(content_type or "").strip(), + "Content-Length": str(len(body)), + } + self._body = body + + def iter_content(self, chunk_size=65536): + data = self._body or b"" + for i in range(0, len(data), int(chunk_size or 65536)): + yield data[i : i + int(chunk_size or 65536)] + + def close(self): + return None + + +class TestChatExportRemoteThumbOption(unittest.TestCase): + def _reload_export_modules(self): + import wechat_decrypt_tool.app_paths as app_paths + import wechat_decrypt_tool.chat_helpers as chat_helpers + import wechat_decrypt_tool.media_helpers as media_helpers + import wechat_decrypt_tool.chat_export_service as chat_export_service + + importlib.reload(app_paths) + importlib.reload(chat_helpers) + importlib.reload(media_helpers) + importlib.reload(chat_export_service) + return chat_export_service + + def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (account, "", "我", "", 1, 0, "", ""), + ) + conn.execute( + "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (username, "", "测试好友", "", 1, 0, "", ""), + ) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path, *, username: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute( + "INSERT INTO SessionTable VALUES (?, ?, ?)", + (username, 0, 1735689600), + ) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str, username: str) -> tuple[str, str]: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account)) + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username)) + + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + + link_thumb = "https://1.1.1.1/thumb.png" + quote_thumb = "https://1.1.1.1/quote.png" + + link_xml = ( + "" + "5" + "示例链接" + "这是描述" + "https://example.com/" + f"{link_thumb}" + "" + ) + quote_xml = ( + "" + "57" + "回复" + "" + "49" + "8888" + "wxid_other" + "对方" + "" + "5被引用链接https://example.com/" + f"{quote_thumb}" + "" + "" + "" + "" + ) + + rows = [ + (1, 1001, 49, 1, 2, 1735689601, link_xml, None), + (2, 1002, 49, 2, 2, 1735689602, quote_xml, None), + ] + conn.executemany( + f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + rows, + ) + conn.commit() + return link_thumb, quote_thumb + finally: + conn.close() + + def _prepare_account(self, root: Path, *, account: str, username: str) -> tuple[Path, str, str]: + account_dir = root / "output" / "databases" / account + account_dir.mkdir(parents=True, exist_ok=True) + + self._seed_contact_db(account_dir / "contact.db", account=account, username=username) + self._seed_session_db(account_dir / "session.db", username=username) + link_thumb, quote_thumb = self._seed_message_db(account_dir / "message_0.db", account=account, username=username) + return account_dir, link_thumb, quote_thumb + + def _create_job(self, manager, *, account: str, username: str, download_remote_media: bool): + job = manager.create_job( + account=account, + scope="selected", + usernames=[username], + export_format="html", + start_time=None, + end_time=None, + include_hidden=False, + include_official=False, + include_media=True, + media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"], + message_types=["link", "quote", "image"], + output_dir=None, + allow_process_key_extract=False, + download_remote_media=download_remote_media, + privacy_mode=False, + file_name=None, + ) + + for _ in range(200): + latest = manager.get_job(job.export_id) + if latest and latest.status in {"done", "error", "cancelled"}: + return latest + import time as _time + + _time.sleep(0.05) + self.fail("export job did not finish in time") + + def test_remote_thumb_disabled_does_not_download(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + _, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + + with mock.patch.object( + svc.requests, + "get", + side_effect=AssertionError("requests.get should not be called when download_remote_media=False"), + ) as m_get: + job = self._create_job( + svc.CHAT_EXPORT_MANAGER, + account=account, + username=username, + download_remote_media=False, + ) + self.assertEqual(job.status, "done", msg=job.error) + self.assertEqual(m_get.call_count, 0) + + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + html_text = zf.read(html_path).decode("utf-8") + self.assertIn(f'src="{link_thumb}"', html_text) + self.assertIn(f'src="{quote_thumb}"', html_text) + self.assertFalse(any(n.startswith("media/remote/") for n in names)) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data + + def test_remote_thumb_enabled_downloads_and_rewrites(self): + with TemporaryDirectory() as td: + root = Path(td) + account = "wxid_test" + username = "wxid_friend" + _, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username) + + prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR") + try: + os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) + svc = self._reload_export_modules() + + fake_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde" + + def _fake_get(url, **_kwargs): + return _FakeResponse(fake_png, content_type="image/png") + + with mock.patch.object(svc.requests, "get", side_effect=_fake_get) as m_get: + job = self._create_job( + svc.CHAT_EXPORT_MANAGER, + account=account, + username=username, + download_remote_media=True, + ) + self.assertEqual(job.status, "done", msg=job.error) + self.assertGreaterEqual(m_get.call_count, 1) + + with zipfile.ZipFile(job.zip_path, "r") as zf: + names = set(zf.namelist()) + html_path = next((n for n in names if n.endswith("/messages.html")), "") + self.assertTrue(html_path) + html_text = zf.read(html_path).decode("utf-8") + + h1 = hashlib.sha256(link_thumb.encode("utf-8", errors="ignore")).hexdigest() + arc1 = f"media/remote/{h1[:32]}.png" + self.assertIn(arc1, names) + self.assertIn(f"../../{arc1}", html_text) + self.assertNotIn(f'src="{link_thumb}"', html_text) + + h2 = hashlib.sha256(quote_thumb.encode("utf-8", errors="ignore")).hexdigest() + arc2 = f"media/remote/{h2[:32]}.png" + self.assertIn(arc2, names) + self.assertIn(f"../../{arc2}", html_text) + self.assertNotIn(f'src="{quote_thumb}"', html_text) + finally: + if prev_data is None: + os.environ.pop("WECHAT_TOOL_DATA_DIR", None) + else: + os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data + diff --git a/tests/test_chat_official_article_cover_style.py b/tests/test_chat_official_article_cover_style.py new file mode 100644 index 0000000..40f303c --- /dev/null +++ b/tests/test_chat_official_article_cover_style.py @@ -0,0 +1,58 @@ +import sys +import unittest +from pathlib import Path + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from wechat_decrypt_tool.chat_helpers import _parse_app_message + + +class TestChatOfficialArticleCoverStyle(unittest.TestCase): + def test_mp_weixin_feed_url_is_cover_style(self): + raw_text = ( + "" + "" + "时尚穿搭:「这样的jk你喜欢吗」" + "这样的jk你喜欢吗?" + "5" + "" + "http://mp.weixin.qq.com/s?__biz=MzkxOTY4MjIxOA==&mid=2247508015&idx=1&sn=931dce677c6e70b4365792b14e7e8ff0" + "&exptype=masonry_feed_brief_content_elite_for_pcfeeds_u2i&ranksessionid=1770868256_1&req_id=1770867949535989#rd" + "" + "https://mmbiz.qpic.cn/sz_mmbiz_jpg/foo/640?wx_fmt=jpeg&wxfrom=401" + "甜图社" + "gh_abc123" + "" + "" + ) + + parsed = _parse_app_message(raw_text) + self.assertEqual(parsed.get("renderType"), "link") + self.assertEqual(parsed.get("linkType"), "official_article") + self.assertEqual(parsed.get("linkStyle"), "cover") + + def test_mp_weixin_non_feed_url_keeps_default_style(self): + raw_text = ( + "" + "" + "普通分享" + "这样的jk你喜欢吗?" + "5" + "http://mp.weixin.qq.com/s?__biz=foo&mid=1&idx=1&sn=bar#rd" + "甜图社" + "gh_abc123" + "" + "" + ) + + parsed = _parse_app_message(raw_text) + self.assertEqual(parsed.get("renderType"), "link") + self.assertEqual(parsed.get("linkType"), "official_article") + self.assertEqual(parsed.get("linkStyle"), "default") + + +if __name__ == "__main__": + unittest.main() + diff --git a/tests/test_transfer_postprocess.py b/tests/test_transfer_postprocess.py index 965ceea..3f6fea4 100644 --- a/tests/test_transfer_postprocess.py +++ b/tests/test_transfer_postprocess.py @@ -62,7 +62,68 @@ class TestTransferPostprocess(unittest.TestCase): self.assertEqual(merged[0].get("transferStatus"), "已被接收") + def test_pending_transfer_marked_expired_by_system_message(self): + merged = [ + { + "id": "message_0:Msg_x:100", + "renderType": "transfer", + "paySubType": "1", + "transferId": "t-expired-1", + "amount": "¥500.00", + "createTime": 1770742598, + "isSent": True, + "transferStatus": "转账", + }, + { + "id": "message_0:Msg_x:101", + "renderType": "system", + "type": 10000, + "createTime": 1770829000, + "content": "收款方24小时内未接收你的转账,已过期", + }, + ] + + chat_router._postprocess_transfer_messages(merged) + + self.assertEqual(merged[0].get("paySubType"), "10") + self.assertEqual(merged[0].get("transferStatus"), "已过期") + + def test_expired_matching_wins_over_amount_time_received_fallback(self): + merged = [ + { + "id": "message_0:Msg_x:200", + "renderType": "transfer", + "paySubType": "1", + "transferId": "t-expired-2", + "amount": "¥500.00", + "createTime": 1770742598, + "isSent": True, + "transferStatus": "", + }, + { + "id": "message_0:Msg_x:201", + "renderType": "transfer", + "paySubType": "3", + "transferId": "t-other", + "amount": "¥500.00", + "createTime": 1770828800, + "isSent": False, + "transferStatus": "已收款", + }, + { + "id": "message_0:Msg_x:202", + "renderType": "system", + "type": 10000, + "createTime": 1770829000, + "content": "收款方24小时内未接收你的转账,已过期", + }, + ] + + chat_router._postprocess_transfer_messages(merged) + + self.assertEqual(merged[0].get("paySubType"), "10") + self.assertEqual(merged[0].get("transferStatus"), "已过期") + if __name__ == "__main__": unittest.main() -