fix(chat): 修复导出 ZIP 写入冲突并支持按消息类型导出

- 修复导出时 ZIP entry 写句柄冲突(messages 先写临时文件再写入 ZIP)\n- 新增 message_types 过滤,进度/统计按所选类型计数\n- manifest.json 记录 messageTypes,避免导出含义歧义
This commit is contained in:
2977094657
2025-12-29 13:40:36 +08:00
parent 46e535dde7
commit 97ceed58b6
2 changed files with 285 additions and 152 deletions

View File

@@ -1,10 +1,10 @@
from __future__ import annotations
import heapq
import io
import json
import re
import sqlite3
import tempfile
import threading
import time
import uuid
@@ -87,6 +87,57 @@ def _is_md5(s: str) -> bool:
return bool(re.fullmatch(r"(?i)[0-9a-f]{32}", str(s or "").strip()))
def _normalize_render_type_key(value: Any) -> str:
v = str(value or "").strip()
if not v:
return ""
if v == "redPacket":
return "redpacket"
lower = v.lower()
if lower in {"redpacket", "red_packet", "red-packet", "redenvelope", "red_envelope"}:
return "redpacket"
return lower
def _render_types_to_local_types(render_types: set[str]) -> Optional[set[int]]:
rt = {str(x or "").strip() for x in (render_types or set())}
rt = {x for x in rt if x}
if not rt:
return None
out: set[int] = set()
for k in rt:
if k == "text":
out.add(1)
elif k == "image":
out.add(3)
elif k == "voice":
out.add(34)
elif k == "video":
out.update({43, 62})
elif k == "emoji":
out.add(47)
elif k == "voip":
out.add(50)
elif k == "system":
out.update({10000, 266287972401})
elif k == "quote":
out.add(244813135921)
out.add(49) # Some quote messages are embedded as appmsg (local_type=49).
elif k in {"link", "file", "transfer", "redpacket"}:
out.add(49)
else:
# Unknown type: cannot safely prefilter by local_type.
return None
return out
def _should_estimate_by_local_type(render_types: set[str]) -> bool:
# Only estimate counts when every requested type maps 1:1 to local_type.
# App messages (local_type=49) are heterogeneous and cannot be counted accurately without parsing.
return not bool(render_types & {"link", "file", "transfer", "redpacket", "quote"})
@dataclass
class ExportProgress:
conversations_total: int = 0
@@ -183,6 +234,7 @@ class ChatExportManager:
include_official: bool,
include_media: bool,
media_kinds: list[MediaKind],
message_types: list[str],
allow_process_key_extract: bool,
privacy_mode: bool,
file_name: Optional[str],
@@ -204,6 +256,7 @@ class ChatExportManager:
"includeOfficial": bool(include_official),
"includeMedia": bool(include_media),
"mediaKinds": media_kinds,
"messageTypes": list(dict.fromkeys([str(t or "").strip() for t in (message_types or []) if str(t or "").strip()])),
"allowProcessKeyExtract": bool(allow_process_key_extract),
"privacyMode": bool(privacy_mode),
"fileName": str(file_name or "").strip(),
@@ -267,6 +320,18 @@ class ChatExportManager:
st = int(opts.get("startTime") or 0) or None
et = int(opts.get("endTime") or 0) or None
message_types_raw = opts.get("messageTypes") or []
want_types: Optional[set[str]] = None
if message_types_raw:
parts = [_normalize_render_type_key(x) for x in message_types_raw]
want = {p for p in parts if p}
if want:
want_types = want
local_types = _render_types_to_local_types(want_types) if want_types else None
can_estimate = (want_types is None) or _should_estimate_by_local_type(want_types)
estimate_local_types = local_types if (want_types and can_estimate) else None
target_usernames = _resolve_export_targets(
account_dir=account_dir,
scope=scope,
@@ -391,12 +456,16 @@ class ChatExportManager:
job.progress.current_conversation_messages_total = 0
try:
estimated_total = _estimate_conversation_message_count(
account_dir=account_dir,
conv_username=conv_username,
start_time=st,
end_time=et,
)
if not can_estimate:
estimated_total = 0
else:
estimated_total = _estimate_conversation_message_count(
account_dir=account_dir,
conv_username=conv_username,
start_time=st,
end_time=et,
local_types=estimate_local_types,
)
except Exception:
estimated_total = 0
@@ -430,6 +499,8 @@ class ChatExportManager:
conv_is_group=conv_is_group,
start_time=st,
end_time=et,
want_types=want_types,
local_types=local_types,
resource_conn=resource_conn,
resource_chat_id=chat_id,
head_image_conn=head_image_conn,
@@ -456,6 +527,8 @@ class ChatExportManager:
conv_is_group=conv_is_group,
start_time=st,
end_time=et,
want_types=want_types,
local_types=local_types,
resource_conn=resource_conn,
resource_chat_id=chat_id,
head_image_conn=head_image_conn,
@@ -496,6 +569,7 @@ class ChatExportManager:
"filters": {
"startTime": st,
"endTime": et,
"messageTypes": sorted(want_types) if want_types else None,
"includeHidden": include_hidden,
"includeOfficial": include_official,
},
@@ -612,6 +686,7 @@ def _estimate_conversation_message_count(
conv_username: str,
start_time: Optional[int],
end_time: Optional[int],
local_types: Optional[set[int]] = None,
) -> int:
total = 0
for db_path in _iter_message_db_paths(account_dir):
@@ -623,6 +698,12 @@ def _estimate_conversation_message_count(
quoted = _quote_ident(table)
where = []
params: list[Any] = []
if local_types:
lt = sorted({int(x) for x in local_types if int(x) != 0})
if lt:
placeholders = ",".join(["?"] * len(lt))
where.append(f"local_type IN ({placeholders})")
params.extend(lt)
if start_time is not None:
where.append("create_time >= ?")
params.append(int(start_time))
@@ -658,6 +739,7 @@ def _iter_rows_for_conversation(
conv_username: str,
start_time: Optional[int],
end_time: Optional[int],
local_types: Optional[set[int]] = None,
) -> Iterable[_Row]:
db_paths = _iter_message_db_paths(account_dir)
if not db_paths:
@@ -691,6 +773,12 @@ def _iter_rows_for_conversation(
quoted = _quote_ident(table_name)
where = []
params: list[Any] = []
if local_types:
lt = sorted({int(x) for x in local_types if int(x) != 0})
if lt:
placeholders = ",".join(["?"] * len(lt))
where.append(f"m.local_type IN ({placeholders})")
params.extend(lt)
if start_time is not None:
where.append("m.create_time >= ?")
params.append(int(start_time))
@@ -1100,6 +1188,8 @@ def _write_conversation_json(
conv_is_group: bool,
start_time: Optional[int],
end_time: Optional[int],
want_types: Optional[set[str]],
local_types: Optional[set[int]],
resource_conn: Optional[sqlite3.Connection],
resource_chat_id: Optional[int],
head_image_conn: Optional[sqlite3.Connection],
@@ -1118,97 +1208,116 @@ def _write_conversation_json(
arcname = f"{conv_dir}/messages.json"
exported = 0
with zf.open(arcname, "w") as fp:
tw = io.TextIOWrapper(fp, encoding="utf-8", newline="\n")
tw.write("{\n")
tw.write(" \"schemaVersion\": 1,\n")
tw.write(f" \"exportedAt\": {json.dumps(_now_iso(), ensure_ascii=False)},\n")
tw.write(f" \"account\": {json.dumps('hidden' if privacy_mode else account_dir.name, ensure_ascii=False)},\n")
tw.write(
" \"conversation\": "
+ json.dumps(
{
"username": "" if privacy_mode else conv_username,
"displayName": "已隐藏" if privacy_mode else conv_name,
"avatarPath": "" if privacy_mode else (conv_avatar_path or ""),
"isGroup": bool(conv_is_group),
},
ensure_ascii=False,
# NOTE: Do not keep an entry handle opened while also writing other entries (avatars/media).
# zipfile forbids interleaving writes; stream to a temp file then add it to zip at the end.
with tempfile.TemporaryDirectory(prefix="wechat_chat_export_") as tmp_dir:
tmp_path = Path(tmp_dir) / "messages.json"
with open(tmp_path, "w", encoding="utf-8", newline="\n") as tw:
tw.write("{\n")
tw.write(" \"schemaVersion\": 1,\n")
tw.write(f" \"exportedAt\": {json.dumps(_now_iso(), ensure_ascii=False)},\n")
tw.write(f" \"account\": {json.dumps('hidden' if privacy_mode else account_dir.name, ensure_ascii=False)},\n")
tw.write(
" \"conversation\": "
+ json.dumps(
{
"username": "" if privacy_mode else conv_username,
"displayName": "已隐藏" if privacy_mode else conv_name,
"avatarPath": "" if privacy_mode else (conv_avatar_path or ""),
"isGroup": bool(conv_is_group),
},
ensure_ascii=False,
)
+ ",\n"
)
+ ",\n"
)
tw.write(
" \"filters\": "
+ json.dumps(
{"startTime": int(start_time) if start_time else None, "endTime": int(end_time) if end_time else None},
ensure_ascii=False,
tw.write(
" \"filters\": "
+ json.dumps(
{
"startTime": int(start_time) if start_time else None,
"endTime": int(end_time) if end_time else None,
"messageTypes": sorted(want_types) if want_types else None,
},
ensure_ascii=False,
)
+ ",\n"
)
+ ",\n"
)
tw.write(" \"messages\": [\n")
tw.write(" \"messages\": [\n")
sender_alias_map: dict[str, int] = {}
first = True
for row in _iter_rows_for_conversation(
account_dir=account_dir,
conv_username=conv_username,
start_time=start_time,
end_time=end_time,
):
msg = _parse_message_for_export(
row=row,
sender_alias_map: dict[str, int] = {}
first = True
scanned = 0
for row in _iter_rows_for_conversation(
account_dir=account_dir,
conv_username=conv_username,
is_group=conv_is_group,
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
)
su = str(msg.get("senderUsername") or "").strip()
if privacy_mode:
_privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map)
else:
msg["senderDisplayName"] = resolve_display_name(su) if su else ""
msg["senderAvatarPath"] = (
_materialize_avatar(
zf=zf,
head_image_conn=head_image_conn,
username=su,
avatar_written=avatar_written,
)
if (su and head_image_conn is not None)
else ""
)
if include_media:
_attach_offline_media(
zf=zf,
account_dir=account_dir,
start_time=start_time,
end_time=end_time,
local_types=local_types,
):
scanned += 1
msg = _parse_message_for_export(
row=row,
conv_username=conv_username,
msg=msg,
media_written=media_written,
report=report,
media_kinds=media_kinds,
allow_process_key_extract=allow_process_key_extract,
media_db_path=media_db_path,
lock=lock,
job=job,
is_group=conv_is_group,
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
)
if want_types:
rt_key = _normalize_render_type_key(msg.get("renderType"))
if rt_key not in want_types:
if scanned % 500 == 0 and job.cancel_requested:
raise _JobCancelled()
continue
if not first:
tw.write(",\n")
tw.write(" " + json.dumps(msg, ensure_ascii=False))
first = False
su = str(msg.get("senderUsername") or "").strip()
if privacy_mode:
_privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map)
else:
msg["senderDisplayName"] = resolve_display_name(su) if su else ""
msg["senderAvatarPath"] = (
_materialize_avatar(
zf=zf,
head_image_conn=head_image_conn,
username=su,
avatar_written=avatar_written,
)
if (su and head_image_conn is not None)
else ""
)
exported += 1
with lock:
job.progress.messages_exported += 1
job.progress.current_conversation_messages_exported = exported
if include_media:
_attach_offline_media(
zf=zf,
account_dir=account_dir,
conv_username=conv_username,
msg=msg,
media_written=media_written,
report=report,
media_kinds=media_kinds,
allow_process_key_extract=allow_process_key_extract,
media_db_path=media_db_path,
lock=lock,
job=job,
)
if exported % 200 == 0 and job.cancel_requested:
raise _JobCancelled()
if not first:
tw.write(",\n")
tw.write(" " + json.dumps(msg, ensure_ascii=False))
first = False
tw.write("\n ]\n")
tw.write("}\n")
tw.flush()
exported += 1
with lock:
job.progress.messages_exported += 1
job.progress.current_conversation_messages_exported = exported
if scanned % 500 == 0 and job.cancel_requested:
raise _JobCancelled()
tw.write("\n ]\n")
tw.write("}\n")
tw.flush()
zf.write(str(tmp_path), arcname)
return exported
@@ -1223,6 +1332,8 @@ def _write_conversation_txt(
conv_is_group: bool,
start_time: Optional[int],
end_time: Optional[int],
want_types: Optional[set[str]],
local_types: Optional[set[int]],
resource_conn: Optional[sqlite3.Connection],
resource_chat_id: Optional[int],
head_image_conn: Optional[sqlite3.Connection],
@@ -1241,79 +1352,95 @@ def _write_conversation_txt(
arcname = f"{conv_dir}/messages.txt"
exported = 0
with zf.open(arcname, "w") as fp:
tw = io.TextIOWrapper(fp, encoding="utf-8", newline="\n")
if privacy_mode:
tw.write("会话: 已隐藏\n")
tw.write("账号: hidden\n")
else:
tw.write(f"会话: {conv_name} ({conv_username})\n")
tw.write(f"账号: {account_dir.name}\n")
if conv_avatar_path:
tw.write(f"会话头像: {conv_avatar_path}\n")
if start_time or end_time:
st = _format_ts(int(start_time)) if start_time else "不限"
et = _format_ts(int(end_time)) if end_time else "不限"
tw.write(f"时间范围: {st} ~ {et}\n")
tw.write(f"导出时间: {_now_iso()}\n")
tw.write("\n")
sender_alias_map: dict[str, int] = {}
for row in _iter_rows_for_conversation(
account_dir=account_dir,
conv_username=conv_username,
start_time=start_time,
end_time=end_time,
):
msg = _parse_message_for_export(
row=row,
conv_username=conv_username,
is_group=conv_is_group,
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
)
su = str(msg.get("senderUsername") or "").strip()
# Same as JSON: write to temp file first to avoid zip interleaving writes.
with tempfile.TemporaryDirectory(prefix="wechat_chat_export_") as tmp_dir:
tmp_path = Path(tmp_dir) / "messages.txt"
with open(tmp_path, "w", encoding="utf-8", newline="\n") as tw:
if privacy_mode:
_privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map)
tw.write("会话: 已隐藏\n")
tw.write("账号: hidden\n")
else:
msg["senderDisplayName"] = resolve_display_name(su) if su else ""
msg["senderAvatarPath"] = (
_materialize_avatar(
zf=zf,
head_image_conn=head_image_conn,
username=su,
avatar_written=avatar_written,
)
if (su and head_image_conn is not None)
else ""
)
tw.write(f"会话: {conv_name} ({conv_username})\n")
tw.write(f"账号: {account_dir.name}\n")
if conv_avatar_path:
tw.write(f"会话头像: {conv_avatar_path}\n")
if start_time or end_time:
st = _format_ts(int(start_time)) if start_time else "不限"
et = _format_ts(int(end_time)) if end_time else "不限"
tw.write(f"时间范围: {st} ~ {et}\n")
if want_types:
tw.write(f"消息类型: {', '.join(sorted(want_types))}\n")
tw.write(f"导出时间: {_now_iso()}\n")
tw.write("\n")
if include_media:
_attach_offline_media(
zf=zf,
account_dir=account_dir,
sender_alias_map: dict[str, int] = {}
scanned = 0
for row in _iter_rows_for_conversation(
account_dir=account_dir,
conv_username=conv_username,
start_time=start_time,
end_time=end_time,
local_types=local_types,
):
scanned += 1
msg = _parse_message_for_export(
row=row,
conv_username=conv_username,
msg=msg,
media_written=media_written,
report=report,
media_kinds=media_kinds,
allow_process_key_extract=allow_process_key_extract,
media_db_path=media_db_path,
lock=lock,
job=job,
is_group=conv_is_group,
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
)
if want_types:
rt_key = _normalize_render_type_key(msg.get("renderType"))
if rt_key not in want_types:
if scanned % 500 == 0 and job.cancel_requested:
raise _JobCancelled()
continue
tw.write(_format_message_line_txt(msg=msg) + "\n")
su = str(msg.get("senderUsername") or "").strip()
if privacy_mode:
_privacy_scrub_message(msg, conv_is_group=conv_is_group, sender_alias_map=sender_alias_map)
else:
msg["senderDisplayName"] = resolve_display_name(su) if su else ""
msg["senderAvatarPath"] = (
_materialize_avatar(
zf=zf,
head_image_conn=head_image_conn,
username=su,
avatar_written=avatar_written,
)
if (su and head_image_conn is not None)
else ""
)
exported += 1
with lock:
job.progress.messages_exported += 1
job.progress.current_conversation_messages_exported = exported
if include_media:
_attach_offline_media(
zf=zf,
account_dir=account_dir,
conv_username=conv_username,
msg=msg,
media_written=media_written,
report=report,
media_kinds=media_kinds,
allow_process_key_extract=allow_process_key_extract,
media_db_path=media_db_path,
lock=lock,
job=job,
)
if exported % 200 == 0 and job.cancel_requested:
raise _JobCancelled()
tw.write(_format_message_line_txt(msg=msg) + "\n")
tw.flush()
exported += 1
with lock:
job.progress.messages_exported += 1
job.progress.current_conversation_messages_exported = exported
if scanned % 500 == 0 and job.cancel_requested:
raise _JobCancelled()
tw.flush()
zf.write(str(tmp_path), arcname)
return exported

View File

@@ -15,6 +15,7 @@ router = APIRouter(route_class=PathFixRoute)
ExportFormat = Literal["json", "txt"]
ExportScope = Literal["selected", "all", "groups", "singles"]
MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"]
MessageType = Literal["text", "image", "emoji", "video", "voice", "file", "link", "transfer", "redPacket", "system", "quote", "voip"]
class ChatExportCreateRequest(BaseModel):
@@ -31,6 +32,10 @@ class ChatExportCreateRequest(BaseModel):
default_factory=lambda: ["image", "emoji", "video", "video_thumb", "voice", "file"],
description="打包的媒体类型",
)
message_types: list[MessageType] = Field(
default_factory=list,
description="导出消息类型renderType过滤为空=导出全部消息;可多选(如仅 voice / 仅 transfer / 仅 redPacket 等)",
)
allow_process_key_extract: bool = Field(
False,
description="预留字段:本项目不从微信进程提取媒体密钥,请使用 wx_key 获取并保存/批量解密",
@@ -55,6 +60,7 @@ async def create_chat_export(req: ChatExportCreateRequest):
include_official=req.include_official,
include_media=req.include_media,
media_kinds=req.media_kinds,
message_types=req.message_types,
allow_process_key_extract=req.allow_process_key_extract,
privacy_mode=req.privacy_mode,
file_name=req.file_name,