diff --git a/src/wechat_decrypt_tool/chat_helpers.py b/src/wechat_decrypt_tool/chat_helpers.py
new file mode 100644
index 0000000..21becc1
--- /dev/null
+++ b/src/wechat_decrypt_tool/chat_helpers.py
@@ -0,0 +1,1064 @@
+import base64
+import hashlib
+import html
+import os
+import re
+import sqlite3
+from collections import Counter
+from datetime import datetime
+from pathlib import Path
+from typing import Any, Optional
+from urllib.parse import quote
+
+from fastapi import HTTPException
+
+from .logging_config import get_logger
+
+try:
+ import zstandard as zstd # type: ignore
+except Exception:
+ zstd = None
+
+logger = get_logger(__name__)
+
+_REPO_ROOT = Path(__file__).resolve().parents[2]
+_OUTPUT_DATABASES_DIR = _REPO_ROOT / "output" / "databases"
+_DEBUG_SESSIONS = os.environ.get("WECHAT_TOOL_DEBUG_SESSIONS", "0") == "1"
+
+
+def _list_decrypted_accounts() -> list[str]:
+ if not _OUTPUT_DATABASES_DIR.exists():
+ return []
+
+ accounts: list[str] = []
+ for p in _OUTPUT_DATABASES_DIR.iterdir():
+ if not p.is_dir():
+ continue
+ if (p / "session.db").exists() and (p / "contact.db").exists():
+ accounts.append(p.name)
+
+ accounts.sort()
+ return accounts
+
+
+def _resolve_account_dir(account: Optional[str]) -> Path:
+ accounts = _list_decrypted_accounts()
+ if not accounts:
+ raise HTTPException(
+ status_code=404,
+ detail="No decrypted databases found. Please decrypt first.",
+ )
+
+ selected = account or accounts[0]
+ base = _OUTPUT_DATABASES_DIR.resolve()
+ candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
+
+ if candidate != base and base not in candidate.parents:
+ raise HTTPException(status_code=400, detail="Invalid account path.")
+
+ if not candidate.exists() or not candidate.is_dir():
+ raise HTTPException(status_code=404, detail="Account not found.")
+
+ if not (candidate / "session.db").exists():
+ raise HTTPException(status_code=404, detail="session.db not found for this account.")
+ if not (candidate / "contact.db").exists():
+ raise HTTPException(status_code=404, detail="contact.db not found for this account.")
+
+ return candidate
+
+
+def _should_keep_session(username: str, include_official: bool) -> bool:
+ if not username:
+ return False
+
+ if not include_official and username.startswith("gh_"):
+ return False
+
+ if username.startswith(("weixin", "qqmail", "fmessage", "medianote", "floatbottle", "newsapp")):
+ return False
+
+ if "@kefu.openim" in username:
+ return False
+ if "@openim" in username:
+ return False
+ if "service_" in username:
+ return False
+
+ if username in {
+ "brandsessionholder",
+ "brandservicesessionholder",
+ "notifymessage",
+ "opencustomerservicemsg",
+ "notification_messages",
+ "userexperience_alarm",
+ }:
+ return False
+
+ return username.endswith("@chatroom") or username.startswith("wxid_") or ("@" not in username)
+
+
+def _format_session_time(ts: Optional[int]) -> str:
+ if not ts:
+ return ""
+ try:
+ dt = datetime.fromtimestamp(int(ts))
+ now = datetime.now()
+ if dt.date() == now.date():
+ return dt.strftime("%H:%M")
+ return dt.strftime("%m/%d")
+ except Exception:
+ return ""
+
+
+def _infer_last_message_brief(msg_type: Optional[int], sub_type: Optional[int]) -> str:
+ t = int(msg_type or 0)
+ s = int(sub_type or 0)
+
+ if t == 1:
+ return "[Text]"
+ if t == 3:
+ return "[Image]"
+ if t == 34:
+ return "[Voice]"
+ if t == 42:
+ return "[Contact Card]"
+ if t == 43:
+ return "[Video]"
+ if t == 47:
+ return "[Emoji]"
+ if t == 48:
+ return "[Location]"
+ if t == 49:
+ if s == 5:
+ return "[Link]"
+ if s == 6:
+ return "[File]"
+ if s in (33, 36):
+ return "[Mini Program]"
+ if s == 57:
+ return "[Quote]"
+ if s in (63, 88):
+ return "[Live]"
+ if s == 87:
+ return "[Announcement]"
+ if s == 2000:
+ return "[Transfer]"
+ if s == 2003:
+ return "[Red Packet]"
+ if s == 19:
+ return "[Chat History]"
+ return "[App Message]"
+ if t == 10000:
+ return "[System]"
+ return "[Message]"
+
+
+def _infer_message_brief_by_local_type(local_type: Optional[int]) -> str:
+ t = int(local_type or 0)
+ if t == 1:
+ return ""
+ if t == 3:
+ return "[Image]"
+ if t == 34:
+ return "[Voice]"
+ if t == 43:
+ return "[Video]"
+ if t == 47:
+ return "[Emoji]"
+ if t == 48:
+ return "[Location]"
+ if t == 50:
+ return "[VoIP]"
+ if t == 10000:
+ return "[System]"
+ if t == 244813135921:
+ return "[Quote]"
+ if t == 17179869233:
+ return "[Link]"
+ if t == 21474836529:
+ return "[Article]"
+ if t == 154618822705:
+ return "[Mini Program]"
+ if t == 12884901937:
+ return "[Music]"
+ if t == 8594229559345:
+ return "[Red Packet]"
+ if t == 81604378673:
+ return "[Chat History]"
+ if t == 266287972401:
+ return "[Pat]"
+ if t == 8589934592049:
+ return "[Transfer]"
+ if t == 270582939697:
+ return "[Live]"
+ if t == 25769803825:
+ return "[File]"
+ return "[Message]"
+
+
+def _quote_ident(ident: str) -> str:
+ return '"' + ident.replace('"', '""') + '"'
+
+
+def _resolve_msg_table_name(conn: sqlite3.Connection, username: str) -> Optional[str]:
+ if not username:
+ return None
+ md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest()
+ expected = f"msg_{md5_hex}".lower()
+ expected_chat = f"chat_{md5_hex}".lower()
+
+ rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
+ names = [r[0] for r in rows if r and r[0]]
+
+ for name in names:
+ if str(name).lower() == expected:
+ return str(name)
+
+ for name in names:
+ if str(name).lower() == expected_chat:
+ return str(name)
+
+ for name in names:
+ ln = str(name).lower()
+ if ln.startswith("msg_") and md5_hex in ln:
+ return str(name)
+ if ln.startswith("chat_") and md5_hex in ln:
+ return str(name)
+
+ for name in names:
+ if md5_hex in str(name).lower():
+ return str(name)
+
+ partial = md5_hex[:24]
+ for name in names:
+ if partial in str(name).lower():
+ return str(name)
+
+ return None
+
+
+def _query_head_image_usernames(head_image_db_path: Path, usernames: list[str]) -> set[str]:
+ uniq = list(dict.fromkeys([u for u in usernames if u]))
+ if not uniq:
+ return set()
+ if not head_image_db_path.exists():
+ return set()
+
+ conn = sqlite3.connect(str(head_image_db_path))
+ try:
+ placeholders = ",".join(["?"] * len(uniq))
+ rows = conn.execute(
+ f"SELECT username FROM head_image WHERE username IN ({placeholders})",
+ uniq,
+ ).fetchall()
+ return {str(r[0]) for r in rows if r and r[0]}
+ finally:
+ conn.close()
+
+
+def _build_avatar_url(account_dir_name: str, username: str) -> str:
+ return f"/api/chat/avatar?account={quote(account_dir_name)}&username={quote(username)}"
+
+
+def _decode_sqlite_text(value: Any) -> str:
+ if value is None:
+ return ""
+ if isinstance(value, bytes):
+ try:
+ return value.decode("utf-8", errors="ignore")
+ except Exception:
+ return ""
+ if isinstance(value, memoryview):
+ try:
+ return bytes(value).decode("utf-8", errors="ignore")
+ except Exception:
+ return ""
+ return str(value)
+
+
+def _is_mostly_printable_text(s: str) -> bool:
+ if not s:
+ return False
+ sample = s[:600]
+ if not sample:
+ return False
+ printable = sum(1 for ch in sample if ch.isprintable() or ch in {"\n", "\r", "\t"})
+ return (printable / len(sample)) >= 0.85
+
+
+def _looks_like_xml(s: str) -> bool:
+ if not s:
+ return False
+ t = s.lstrip()
+ if t.startswith('"') and t.endswith('"'):
+ t = t.strip('"').lstrip()
+ return t.startswith("<")
+
+
+def _decode_message_content(compress_value: Any, message_value: Any) -> str:
+ msg_text = _decode_sqlite_text(message_value)
+
+ if isinstance(message_value, (bytes, bytearray, memoryview)):
+ raw = bytes(message_value) if isinstance(message_value, memoryview) else message_value
+ if raw.startswith(b"\x28\xb5\x2f\xfd") and zstd is not None:
+ try:
+ out = zstd.decompress(raw)
+ s = out.decode("utf-8", errors="ignore")
+ s = html.unescape(s.strip())
+ if _looks_like_xml(s) or _is_mostly_printable_text(s):
+ msg_text = s
+ except Exception:
+ pass
+
+ if compress_value is None:
+ return msg_text
+
+ def try_decode_text_blob(text: str) -> Optional[str]:
+ t = (text or "").strip()
+ if not t:
+ return None
+
+ if len(t) >= 16 and len(t) % 2 == 0 and re.fullmatch(r"[0-9a-fA-F]+", t):
+ try:
+ raw = bytes.fromhex(t)
+ if zstd is not None:
+ try:
+ out = zstd.decompress(raw)
+ s2 = out.decode("utf-8", errors="ignore")
+ s2 = html.unescape(s2.strip())
+ if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
+ return s2
+ except Exception:
+ pass
+ s2 = raw.decode("utf-8", errors="ignore")
+ s2 = html.unescape(s2.strip())
+ if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
+ return s2
+ except Exception:
+ return None
+
+ if len(t) >= 24 and len(t) % 4 == 0 and re.fullmatch(r"[A-Za-z0-9+/=]+", t):
+ try:
+ raw = base64.b64decode(t)
+ if zstd is not None:
+ try:
+ out = zstd.decompress(raw)
+ s2 = out.decode("utf-8", errors="ignore")
+ s2 = html.unescape(s2.strip())
+ if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
+ return s2
+ except Exception:
+ pass
+ s2 = raw.decode("utf-8", errors="ignore")
+ s2 = html.unescape(s2.strip())
+ if _looks_like_xml(s2) or _is_mostly_printable_text(s2):
+ return s2
+ except Exception:
+ return None
+
+ return None
+
+ if isinstance(compress_value, str):
+ s = html.unescape(compress_value.strip())
+ s2 = try_decode_text_blob(s)
+ if s2:
+ return s2
+ if _looks_like_xml(s) or _is_mostly_printable_text(s):
+ return s
+ return msg_text
+
+ data: Optional[bytes] = None
+ if isinstance(compress_value, memoryview):
+ data = bytes(compress_value)
+ elif isinstance(compress_value, (bytes, bytearray)):
+ data = bytes(compress_value)
+
+ if not data:
+ return msg_text
+
+ if zstd is not None:
+ try:
+ out = zstd.decompress(data)
+ s = out.decode("utf-8", errors="ignore")
+ s = html.unescape(s.strip())
+ if _looks_like_xml(s) or _is_mostly_printable_text(s):
+ return s
+ except Exception:
+ pass
+
+ try:
+ s = data.decode("utf-8", errors="ignore")
+ s = html.unescape(s.strip())
+ s2 = try_decode_text_blob(s)
+ if s2:
+ return s2
+ if _looks_like_xml(s) or _is_mostly_printable_text(s):
+ return s
+ except Exception:
+ pass
+
+ return msg_text
+
+
+_MD5_HEX_RE = re.compile(rb"(?i)[0-9a-f]{32}")
+
+
+def _extract_md5_from_blob(blob: Any) -> str:
+ if blob is None:
+ return ""
+ if isinstance(blob, memoryview):
+ data = bytes(blob)
+ elif isinstance(blob, (bytes, bytearray)):
+ data = bytes(blob)
+ else:
+ try:
+ data = bytes(blob)
+ except Exception:
+ return ""
+
+ if not data:
+ return ""
+ m = _MD5_HEX_RE.findall(data)
+ if not m:
+ return ""
+ best = Counter([x.lower() for x in m]).most_common(1)[0][0]
+ try:
+ return best.decode("ascii", errors="ignore")
+ except Exception:
+ return ""
+
+
+def _resource_lookup_chat_id(resource_conn: sqlite3.Connection, username: str) -> Optional[int]:
+ if not username:
+ return None
+ try:
+ row = resource_conn.execute(
+ "SELECT rowid FROM ChatName2Id WHERE user_name = ? LIMIT 1",
+ (username,),
+ ).fetchone()
+ if row and row[0] is not None:
+ return int(row[0])
+ except Exception:
+ return None
+ return None
+
+
+def _lookup_resource_md5(
+ resource_conn: sqlite3.Connection,
+ chat_id: Optional[int],
+ message_local_type: int,
+ server_id: int,
+ local_id: int,
+ create_time: int,
+) -> str:
+ if server_id <= 0 and local_id <= 0:
+ return ""
+
+ where_chat = ""
+ params_prefix: list[Any] = []
+ if chat_id is not None and int(chat_id) > 0:
+ where_chat = " AND chat_id = ?"
+ params_prefix.append(int(chat_id))
+
+ where_type = ""
+ if int(message_local_type) > 0:
+ where_type = " AND message_local_type = ?"
+ params_prefix.append(int(message_local_type))
+
+ try:
+ if server_id > 0:
+ row = resource_conn.execute(
+ "SELECT packed_info FROM MessageResourceInfo WHERE message_svr_id = ?"
+ + where_chat
+ + where_type
+ + " ORDER BY message_id DESC LIMIT 1",
+ [int(server_id)] + params_prefix,
+ ).fetchone()
+ if row and row[0] is not None:
+ md5 = _extract_md5_from_blob(row[0])
+ if md5:
+ return md5
+ except Exception:
+ pass
+
+ try:
+ if local_id > 0 and create_time > 0:
+ row = resource_conn.execute(
+ "SELECT packed_info FROM MessageResourceInfo WHERE message_local_id = ? AND message_create_time = ?"
+ + where_chat
+ + where_type
+ + " ORDER BY message_id DESC LIMIT 1",
+ [int(local_id), int(create_time)] + params_prefix,
+ ).fetchone()
+ if row and row[0] is not None:
+ return _extract_md5_from_blob(row[0])
+ except Exception:
+ pass
+
+ return ""
+
+
+def _strip_cdata(s: str) -> str:
+ if not s:
+ return ""
+ out = s.replace("", "")
+ return out.strip()
+
+
+def _extract_xml_tag_text(xml_text: str, tag: str) -> str:
+ if not xml_text or not tag:
+ return ""
+ m = re.search(
+ rf"<{re.escape(tag)}>(.*?){re.escape(tag)}>",
+ xml_text,
+ flags=re.IGNORECASE | re.DOTALL,
+ )
+ if not m:
+ return ""
+ return _strip_cdata(m.group(1) or "")
+
+
+def _extract_xml_attr(xml_text: str, attr: str) -> str:
+ if not xml_text or not attr:
+ return ""
+ m = re.search(rf"{re.escape(attr)}\s*=\s*['\"]([^'\"]+)['\"]", xml_text, flags=re.IGNORECASE)
+ return (m.group(1) or "").strip() if m else ""
+
+
+def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str:
+ v = _extract_xml_tag_text(xml_text, name)
+ if v:
+ return v
+ return _extract_xml_attr(xml_text, name)
+
+
+def _extract_refermsg_block(xml_text: str) -> str:
+ if not xml_text:
+ return ""
+ m = re.search(r"(]*>.*?)", xml_text, flags=re.IGNORECASE | re.DOTALL)
+ return (m.group(1) or "").strip() if m else ""
+
+
+def _infer_transfer_status_text(
+ is_sent: bool,
+ paysubtype: str,
+ receivestatus: str,
+ sendertitle: str,
+ receivertitle: str,
+ senderdes: str,
+ receiverdes: str,
+) -> str:
+ t = str(paysubtype or "").strip()
+ rs = str(receivestatus or "").strip()
+
+ if rs == "1":
+ return "已收款"
+ if rs == "2":
+ return "已退还"
+ if rs == "3":
+ return "已过期"
+
+ if t == "4":
+ return "已退还"
+ if t == "9":
+ return "已被退还"
+ if t == "10":
+ return "已过期"
+
+ if t == "8":
+ return "发起转账"
+ if t == "3":
+ return "已收款" if is_sent else "已被接收"
+ if t == "1":
+ return "转账"
+
+ title = sendertitle if is_sent else receivertitle
+ if title:
+ return title
+ des = senderdes if is_sent else receiverdes
+ if des:
+ return des
+ return "转账"
+
+
+def _split_group_sender_prefix(text: str) -> tuple[str, str]:
+ if not text:
+ return "", text
+ sep = text.find(":\n")
+ if sep <= 0:
+ return "", text
+ prefix = text[:sep].strip()
+ body = text[sep + 2 :].lstrip("\n")
+ if not prefix or len(prefix) > 128:
+ return "", text
+ if re.search(r"\s", prefix):
+ return "", text
+ if prefix.startswith("wxid_") or prefix.endswith("@chatroom") or "@" in prefix:
+ return prefix, body
+ return "", text
+
+
+def _extract_sender_from_group_xml(xml_text: str) -> str:
+ if not xml_text:
+ return ""
+
+ v = _extract_xml_tag_text(xml_text, "fromusername")
+ if v:
+ return v
+ v = _extract_xml_attr(xml_text, "fromusername")
+ if v:
+ return v
+ return ""
+
+
+def _parse_pat_message(text: str, contact_rows: dict[str, sqlite3.Row]) -> str:
+ template = _extract_xml_tag_text(text, "template")
+ if not template:
+ return "[拍一拍]"
+ wxids = list({m.group(1) for m in re.finditer(r"\$\{([^}]+)\}", template) if m.group(1)})
+ rendered = template
+ for wxid in wxids:
+ row = contact_rows.get(wxid)
+ name = _pick_display_name(row, wxid)
+ rendered = rendered.replace(f"${{{wxid}}}", name)
+ return rendered.strip() or "[拍一拍]"
+
+
+def _parse_quote_message(text: str) -> str:
+ title = _extract_xml_tag_text(text, "title")
+ if title:
+ return title
+ refer = _extract_xml_tag_text(text, "content")
+ if refer:
+ return refer
+ return "[引用消息]"
+
+
+def _parse_app_message(text: str) -> dict[str, Any]:
+ app_type_raw = _extract_xml_tag_text(text, "type")
+ try:
+ app_type = int(str(app_type_raw or "0").strip() or "0")
+ except Exception:
+ app_type = 0
+ title = _extract_xml_tag_text(text, "title")
+ des = _extract_xml_tag_text(text, "des")
+ url = _extract_xml_tag_text(text, "url")
+
+ if "" in text.lower():
+ return {"renderType": "system", "content": "[拍一拍]"}
+
+ if app_type in (5, 68) and url:
+ thumb_url = _extract_xml_tag_text(text, "thumburl")
+ return {
+ "renderType": "link",
+ "content": des or title or "[链接]",
+ "title": title or des or "",
+ "url": url,
+ "thumbUrl": thumb_url or "",
+ }
+
+ if app_type in (6, 74):
+ file_name = title or ""
+ total_len = _extract_xml_tag_text(text, "totallen")
+ file_md5 = (
+ _extract_xml_tag_or_attr(text, "md5")
+ or _extract_xml_tag_or_attr(text, "filemd5")
+ or _extract_xml_tag_or_attr(text, "file_md5")
+ )
+ return {
+ "renderType": "file",
+ "content": f"[文件] {file_name}".strip(),
+ "title": file_name,
+ "size": total_len or "",
+ "fileMd5": file_md5 or "",
+ }
+
+ if app_type == 57 or "]*>.*?)",
+ "",
+ text,
+ flags=re.IGNORECASE | re.DOTALL,
+ )
+ except Exception:
+ text_wo_refer = text
+
+ reply_text = _extract_xml_tag_text(text_wo_refer, "title") or _extract_xml_tag_text(text, "title")
+ refer_displayname = _extract_xml_tag_or_attr(refer_block, "displayname")
+ refer_content = _extract_xml_tag_text(refer_block, "content")
+ refer_type = _extract_xml_tag_or_attr(refer_block, "type")
+
+ rt = (reply_text or "").strip()
+ rc = (refer_content or "").strip()
+ if rt and rc:
+ if rc == rt:
+ refer_content = ""
+ else:
+ lines = [ln.strip() for ln in rc.splitlines()]
+ if lines and lines[0] == rt:
+ refer_content = "\n".join(rc.splitlines()[1:]).lstrip()
+ elif rc.startswith(rt):
+ rest = rc[len(rt) :].lstrip()
+ refer_content = rest
+
+ t = str(refer_type or "").strip()
+ if t == "3":
+ refer_content = "[图片]"
+ elif t == "47":
+ refer_content = "[表情]"
+ elif t == "43" or t == "62":
+ refer_content = "[视频]"
+ elif t == "34":
+ refer_content = "[语音]"
+ elif t == "49" and refer_content:
+ refer_content = f"[链接] {refer_content}".strip()
+
+ return {
+ "renderType": "quote",
+ "content": reply_text or "[引用消息]",
+ "quoteTitle": refer_displayname or "",
+ "quoteContent": refer_content or "",
+ }
+
+ if app_type == 2000 or (
+ " list[Path]:
+ if not account_dir.exists():
+ return []
+
+ candidates: list[Path] = []
+ for p in account_dir.glob("*.db"):
+ n = p.name
+ ln = n.lower()
+ if ln in {"session.db", "contact.db", "head_image.db"}:
+ continue
+ if ln == "message_resource.db":
+ continue
+ if ln == "message_fts.db":
+ continue
+
+ if re.match(r"^message(_\d+)?\.db$", ln):
+ candidates.append(p)
+ continue
+ if re.match(r"^biz_message(_\d+)?\.db$", ln):
+ candidates.append(p)
+ continue
+ if "message" in ln and ln.endswith(".db"):
+ candidates.append(p)
+ continue
+ candidates.sort(key=lambda x: x.name)
+ return candidates
+
+
+def _resolve_msg_table_name_by_map(lower_to_actual: dict[str, str], username: str) -> Optional[str]:
+ if not username:
+ return None
+ md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest()
+ expected = f"msg_{md5_hex}".lower()
+ expected_chat = f"chat_{md5_hex}".lower()
+
+ if expected in lower_to_actual:
+ return lower_to_actual[expected]
+ if expected_chat in lower_to_actual:
+ return lower_to_actual[expected_chat]
+
+ for ln, actual in lower_to_actual.items():
+ if ln.startswith("msg_") and md5_hex in ln:
+ return actual
+ if ln.startswith("chat_") and md5_hex in ln:
+ return actual
+
+ for ln, actual in lower_to_actual.items():
+ if md5_hex in ln:
+ return actual
+
+ partial = md5_hex[:24]
+ for ln, actual in lower_to_actual.items():
+ if partial in ln:
+ return actual
+
+ return None
+
+
+def _build_latest_message_preview(
+ username: str,
+ local_type: int,
+ raw_text: str,
+ is_group: bool,
+ sender_username: str = "",
+) -> str:
+ raw_text = (raw_text or "").strip()
+ sender_prefix = ""
+ if is_group and raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
+ sender_prefix, raw_text = _split_group_sender_prefix(raw_text)
+ if is_group and (not sender_prefix) and sender_username:
+ sender_prefix = str(sender_username).strip()
+
+ content_text = ""
+ if local_type == 10000:
+ if "revokemsg" in raw_text:
+ content_text = "撤回了一条消息"
+ else:
+ content_text = re.sub(r"?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
+ content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
+ elif local_type == 244813135921:
+ parsed = _parse_app_message(raw_text)
+ qt = str(parsed.get("quoteTitle") or "").strip()
+ qc = str(parsed.get("quoteContent") or "").strip()
+ c0 = str(parsed.get("content") or "").strip()
+ content_text = qc or c0 or qt or "[引用消息]"
+ elif local_type == 49:
+ parsed = _parse_app_message(raw_text)
+ rt = str(parsed.get("renderType") or "")
+ content_text = str(parsed.get("content") or "")
+ title_text = str(parsed.get("title") or "").strip()
+ if rt == "file" and title_text:
+ content_text = title_text
+ if (not content_text) and rt == "transfer":
+ content_text = (
+ str(parsed.get("senderTitle") or "")
+ or str(parsed.get("receiverTitle") or "")
+ or "转账"
+ )
+ if not content_text:
+ content_text = title_text or str(parsed.get("url") or "")
+ elif local_type == 25769803825:
+ parsed = _parse_app_message(raw_text)
+ title_text = str(parsed.get("title") or "").strip()
+ content_text = title_text or str(parsed.get("content") or "").strip() or "[文件]"
+ elif local_type == 3:
+ content_text = "[图片]"
+ elif local_type == 34:
+ duration = _extract_xml_attr(raw_text, "voicelength")
+ content_text = f"[语音 {duration}秒]" if duration else "[语音]"
+ elif local_type == 43 or local_type == 62:
+ content_text = "[视频]"
+ elif local_type == 47:
+ content_text = "[表情]"
+ else:
+ if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
+ content_text = raw_text
+ else:
+ content_text = _infer_message_brief_by_local_type(local_type)
+
+ content_text = (content_text or "").strip() or _infer_message_brief_by_local_type(local_type)
+ content_text = re.sub(r"\s+", " ", content_text).strip()
+ if sender_prefix and content_text:
+ return f"{sender_prefix}: {content_text}"
+ return content_text
+
+
+def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
+ if not usernames:
+ return {}
+
+ db_paths = _iter_message_db_paths(account_dir)
+ if not db_paths:
+ return {}
+
+ remaining = {u for u in usernames if u}
+ best: dict[str, tuple[tuple[int, int, int], str]] = {}
+
+ if _DEBUG_SESSIONS:
+ logger.info(
+ f"[sessions.preview] account_dir={account_dir} usernames={len(remaining)} dbs={len(db_paths)}"
+ )
+ logger.info(
+ f"[sessions.preview] db_paths={', '.join([p.name for p in db_paths[:8]])}{'...' if len(db_paths) > 8 else ''}"
+ )
+
+ for db_path in db_paths:
+ conn = sqlite3.connect(str(db_path))
+ conn.row_factory = sqlite3.Row
+ try:
+ rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
+ names = [str(r[0]) for r in rows if r and r[0]]
+ lower_to_actual = {n.lower(): n for n in names}
+
+ found: dict[str, str] = {}
+ for u in list(remaining):
+ tn = _resolve_msg_table_name_by_map(lower_to_actual, u)
+ if tn:
+ found[u] = tn
+
+ if not found:
+ continue
+
+ conn.text_factory = bytes
+ for u, tn in found.items():
+ quoted = _quote_ident(tn)
+ try:
+ try:
+ r = conn.execute(
+ "SELECT "
+ "m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
+ "n.user_name AS sender_username "
+ f"FROM {quoted} m "
+ "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid "
+ "ORDER BY m.create_time DESC, m.sort_seq DESC, m.local_id DESC "
+ "LIMIT 1"
+ ).fetchone()
+ except Exception:
+ r = conn.execute(
+ "SELECT "
+ "local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
+ f"FROM {quoted} "
+ "ORDER BY create_time DESC, sort_seq DESC, local_id DESC "
+ "LIMIT 1"
+ ).fetchone()
+ except Exception as e:
+ if _DEBUG_SESSIONS:
+ logger.info(
+ f"[sessions.preview] db={db_path.name} username={u} table={tn} query_failed={e}"
+ )
+ continue
+ if r is None:
+ continue
+
+ local_type = int(r["local_type"] or 0)
+ create_time = int(r["create_time"] or 0)
+ sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0
+ local_id = int(r["local_id"] or 0)
+ sort_key = (create_time, sort_seq, local_id)
+
+ raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip()
+ sender_username = _decode_sqlite_text(r["sender_username"]).strip()
+ preview = _build_latest_message_preview(
+ username=u,
+ local_type=local_type,
+ raw_text=raw_text,
+ is_group=bool(u.endswith("@chatroom")),
+ sender_username=sender_username,
+ )
+ if not preview:
+ continue
+
+ prev = best.get(u)
+ if prev is None or sort_key > prev[0]:
+ best[u] = (sort_key, preview)
+ finally:
+ conn.close()
+
+ previews = {u: v[1] for u, v in best.items() if v and v[1]}
+ if _DEBUG_SESSIONS:
+ logger.info(
+ f"[sessions.preview] built_previews={len(previews)} remaining_without_preview={len(remaining - set(previews.keys()))}"
+ )
+ return previews
+
+
+def _pick_display_name(contact_row: Optional[sqlite3.Row], fallback_username: str) -> str:
+ if contact_row is None:
+ return fallback_username
+
+ for key in ("remark", "nick_name", "alias"):
+ try:
+ v = contact_row[key]
+ except Exception:
+ v = None
+ if isinstance(v, str) and v.strip():
+ return v.strip()
+
+ return fallback_username
+
+
+def _pick_avatar_url(contact_row: Optional[sqlite3.Row]) -> Optional[str]:
+ if contact_row is None:
+ return None
+
+ for key in ("big_head_url", "small_head_url"):
+ try:
+ v = contact_row[key]
+ except Exception:
+ v = None
+ if isinstance(v, str) and v.strip():
+ return v.strip()
+
+ return None
+
+
+def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, sqlite3.Row]:
+ uniq = list(dict.fromkeys([u for u in usernames if u]))
+ if not uniq:
+ return {}
+
+ result: dict[str, sqlite3.Row] = {}
+
+ conn = sqlite3.connect(str(contact_db_path))
+ conn.row_factory = sqlite3.Row
+ try:
+ def query_table(table: str, targets: list[str]) -> None:
+ if not targets:
+ return
+ placeholders = ",".join(["?"] * len(targets))
+ sql = f"""
+ SELECT username, remark, nick_name, alias, big_head_url, small_head_url
+ FROM {table}
+ WHERE username IN ({placeholders})
+ """
+ rows = conn.execute(sql, targets).fetchall()
+ for r in rows:
+ result[r["username"]] = r
+
+ query_table("contact", uniq)
+ missing = [u for u in uniq if u not in result]
+ query_table("stranger", missing)
+ return result
+ finally:
+ conn.close()
diff --git a/src/wechat_decrypt_tool/media_helpers.py b/src/wechat_decrypt_tool/media_helpers.py
new file mode 100644
index 0000000..46cf4fa
--- /dev/null
+++ b/src/wechat_decrypt_tool/media_helpers.py
@@ -0,0 +1,1550 @@
+import ctypes
+import hashlib
+import json
+import mimetypes
+import os
+import re
+import sqlite3
+import struct
+import threading
+from collections import Counter
+from concurrent.futures import ThreadPoolExecutor
+from functools import lru_cache
+from pathlib import Path
+from typing import Any, Optional
+
+from fastapi import HTTPException
+
+from .logging_config import get_logger
+
+logger = get_logger(__name__)
+
+try:
+ import psutil # type: ignore
+except Exception:
+ psutil = None
+
+
+# 仓库根目录(用于定位 output/databases)
+_REPO_ROOT = Path(__file__).resolve().parents[2]
+_OUTPUT_DATABASES_DIR = _REPO_ROOT / "output" / "databases"
+_PACKAGE_ROOT = Path(__file__).resolve().parent
+
+
+def _list_decrypted_accounts() -> list[str]:
+ """列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)"""
+ if not _OUTPUT_DATABASES_DIR.exists():
+ return []
+
+ accounts: list[str] = []
+ for p in _OUTPUT_DATABASES_DIR.iterdir():
+ if not p.is_dir():
+ continue
+ if (p / "session.db").exists() and (p / "contact.db").exists():
+ accounts.append(p.name)
+
+ accounts.sort()
+ return accounts
+
+
+def _resolve_account_dir(account: Optional[str]) -> Path:
+ """解析账号目录,并进行路径安全校验(防止路径穿越)"""
+ accounts = _list_decrypted_accounts()
+ if not accounts:
+ raise HTTPException(
+ status_code=404,
+ detail="No decrypted databases found. Please decrypt first.",
+ )
+
+ selected = account or accounts[0]
+ base = _OUTPUT_DATABASES_DIR.resolve()
+ candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
+
+ if candidate != base and base not in candidate.parents:
+ raise HTTPException(status_code=400, detail="Invalid account path.")
+
+ if not candidate.exists() or not candidate.is_dir():
+ raise HTTPException(status_code=404, detail="Account not found.")
+
+ if not (candidate / "session.db").exists():
+ raise HTTPException(status_code=404, detail="session.db not found for this account.")
+ if not (candidate / "contact.db").exists():
+ raise HTTPException(status_code=404, detail="contact.db not found for this account.")
+
+ return candidate
+
+
+def _detect_image_media_type(data: bytes) -> str:
+ if not data:
+ return "application/octet-stream"
+
+ if data.startswith(b"\x89PNG\r\n\x1a\n"):
+ return "image/png"
+ if data.startswith(b"\xff\xd8\xff"):
+ return "image/jpeg"
+ if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
+ return "image/gif"
+ if data.startswith(b"RIFF") and data[8:12] == b"WEBP":
+ return "image/webp"
+ return "application/octet-stream"
+
+
+class _WxAMConfig(ctypes.Structure):
+ _fields_ = [
+ ("mode", ctypes.c_int),
+ ("reserved", ctypes.c_int),
+ ]
+
+
+@lru_cache(maxsize=1)
+def _get_wxam_decoder():
+ if os.name != "nt":
+ return None
+ dll_path = _PACKAGE_ROOT / "native" / "VoipEngine.dll"
+ if not dll_path.exists():
+ logger.warning(f"WxAM decoder DLL not found: {dll_path}")
+ return None
+ try:
+ voip_engine = ctypes.WinDLL(str(dll_path))
+ fn = voip_engine.wxam_dec_wxam2pic_5
+ fn.argtypes = [
+ ctypes.c_int64,
+ ctypes.c_int,
+ ctypes.c_int64,
+ ctypes.POINTER(ctypes.c_int),
+ ctypes.c_int64,
+ ]
+ fn.restype = ctypes.c_int64
+ logger.info(f"WxAM decoder loaded: {dll_path}")
+ return fn
+ except Exception as e:
+ logger.warning(f"Failed to load WxAM decoder DLL: {dll_path} ({e})")
+ return None
+
+
+def _wxgf_to_image_bytes(data: bytes) -> Optional[bytes]:
+ if not data or not data.startswith(b"wxgf"):
+ return None
+ fn = _get_wxam_decoder()
+ if fn is None:
+ return None
+
+ max_output_size = 52 * 1024 * 1024
+ for mode in (0, 3):
+ try:
+ config = _WxAMConfig()
+ config.mode = int(mode)
+ config.reserved = 0
+
+ input_buffer = ctypes.create_string_buffer(data, len(data))
+ output_buffer = ctypes.create_string_buffer(max_output_size)
+ output_size = ctypes.c_int(max_output_size)
+
+ result = fn(
+ ctypes.addressof(input_buffer),
+ int(len(data)),
+ ctypes.addressof(output_buffer),
+ ctypes.byref(output_size),
+ ctypes.addressof(config),
+ )
+ if result != 0 or output_size.value <= 0:
+ continue
+ out = output_buffer.raw[: int(output_size.value)]
+ if _detect_image_media_type(out[:32]) != "application/octet-stream":
+ return out
+ except Exception:
+ continue
+ return None
+
+
+def _try_strip_media_prefix(data: bytes) -> tuple[bytes, str]:
+ if not data:
+ return data, "application/octet-stream"
+
+ try:
+ head = data[: min(len(data), 256 * 1024)]
+ except Exception:
+ head = data
+
+ # wxgf container
+ try:
+ idx = head.find(b"wxgf")
+ except Exception:
+ idx = -1
+ if idx >= 0 and idx <= 128 * 1024:
+ try:
+ payload = data[idx:]
+ converted = _wxgf_to_image_bytes(payload)
+ if converted:
+ mtw = _detect_image_media_type(converted[:32])
+ if mtw != "application/octet-stream":
+ return converted, mtw
+ except Exception:
+ pass
+
+ # common image/video headers with small prefix
+ sigs: list[tuple[bytes, str]] = [
+ (b"\x89PNG\r\n\x1a\n", "image/png"),
+ (b"\xff\xd8\xff", "image/jpeg"),
+ (b"GIF87a", "image/gif"),
+ (b"GIF89a", "image/gif"),
+ ]
+ for sig, mt in sigs:
+ try:
+ j = head.find(sig)
+ except Exception:
+ j = -1
+ if j >= 0 and j <= 128 * 1024:
+ sliced = data[j:]
+ mt2 = _detect_image_media_type(sliced[:32])
+ if mt2 != "application/octet-stream":
+ return sliced, mt2
+
+ try:
+ j = head.find(b"RIFF")
+ except Exception:
+ j = -1
+ if j >= 0 and j <= 128 * 1024:
+ sliced = data[j:]
+ try:
+ if len(sliced) >= 12 and sliced[8:12] == b"WEBP":
+ return sliced, "image/webp"
+ except Exception:
+ pass
+
+ try:
+ j = head.find(b"ftyp")
+ except Exception:
+ j = -1
+ if j >= 4 and j <= 128 * 1024:
+ sliced = data[j - 4 :]
+ try:
+ if len(sliced) >= 8 and sliced[4:8] == b"ftyp":
+ return sliced, "video/mp4"
+ except Exception:
+ pass
+
+ return data, "application/octet-stream"
+
+
+def _load_account_source_info(account_dir: Path) -> dict[str, Any]:
+ p = account_dir / "_source.json"
+ if not p.exists():
+ return {}
+ try:
+ return json.loads(p.read_text(encoding="utf-8"))
+ except Exception:
+ return {}
+
+
+def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
+ try:
+ home = Path.home()
+ except Exception:
+ return None
+
+ roots = [
+ home / "Documents" / "xwechat_files",
+ home / "Documents" / "WeChat Files",
+ ]
+
+ # Exact match first
+ for root in roots:
+ c = root / account_name
+ try:
+ if c.exists() and c.is_dir():
+ return c
+ except Exception:
+ continue
+
+ # Then try prefix match: wxid_xxx_yyyy
+ for root in roots:
+ try:
+ if not root.exists() or not root.is_dir():
+ continue
+ for p in root.iterdir():
+ if not p.is_dir():
+ continue
+ if p.name.startswith(account_name + "_"):
+ return p
+ except Exception:
+ continue
+ return None
+
+
+def _resolve_account_wxid_dir(account_dir: Path) -> Optional[Path]:
+ info = _load_account_source_info(account_dir)
+ wxid_dir = str(info.get("wxid_dir") or "").strip()
+ if wxid_dir:
+ try:
+ p = Path(wxid_dir)
+ if p.exists() and p.is_dir():
+ return p
+ except Exception:
+ pass
+ return _guess_wxid_dir_from_common_paths(account_dir.name)
+
+
+def _resolve_account_db_storage_dir(account_dir: Path) -> Optional[Path]:
+ info = _load_account_source_info(account_dir)
+ db_storage_path = str(info.get("db_storage_path") or "").strip()
+ if db_storage_path:
+ try:
+ p = Path(db_storage_path)
+ if p.exists() and p.is_dir():
+ return p
+ except Exception:
+ pass
+
+ wxid_dir = _resolve_account_wxid_dir(account_dir)
+ if wxid_dir:
+ c = wxid_dir / "db_storage"
+ try:
+ if c.exists() and c.is_dir():
+ return c
+ except Exception:
+ pass
+ return None
+
+
+def _quote_ident(ident: str) -> str:
+ return '"' + ident.replace('"', '""') + '"'
+
+
+def _resolve_hardlink_table_name(conn: sqlite3.Connection, prefix: str) -> Optional[str]:
+ rows = conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name DESC",
+ (f"{prefix}%",),
+ ).fetchall()
+ if not rows:
+ return None
+ return str(rows[0][0]) if rows[0] and rows[0][0] else None
+
+
+def _resolve_hardlink_dir2id_table_name(conn: sqlite3.Connection) -> Optional[str]:
+ rows = conn.execute(
+ "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'dir2id%' ORDER BY name DESC"
+ ).fetchall()
+ if not rows:
+ return None
+ return str(rows[0][0]) if rows[0] and rows[0][0] else None
+
+
+def _resolve_media_path_from_hardlink(
+ hardlink_db_path: Path,
+ wxid_dir: Path,
+ md5: str,
+ kind: str,
+ username: Optional[str],
+ extra_roots: Optional[list[Path]] = None,
+) -> Optional[Path]:
+ if not hardlink_db_path.exists():
+ return None
+
+ kind_key = str(kind or "").lower().strip()
+ prefixes: list[str]
+ if kind_key == "image":
+ prefixes = ["image_hardlink_info"]
+ elif kind_key == "emoji":
+ prefixes = [
+ "emoji_hardlink_info",
+ "emotion_hardlink_info",
+ "image_hardlink_info",
+ ]
+ elif kind_key == "video" or kind_key == "video_thumb":
+ prefixes = ["video_hardlink_info"]
+ elif kind_key == "file":
+ prefixes = ["file_hardlink_info"]
+ else:
+ return None
+
+ conn = sqlite3.connect(str(hardlink_db_path))
+ conn.row_factory = sqlite3.Row
+ try:
+ for prefix in prefixes:
+ table_name = _resolve_hardlink_table_name(conn, prefix)
+ if not table_name:
+ continue
+
+ quoted = _quote_ident(table_name)
+ try:
+ row = conn.execute(
+ f"SELECT dir1, dir2, file_name FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1",
+ (md5,),
+ ).fetchone()
+ except Exception:
+ row = None
+ if not row:
+ continue
+
+ dir1 = str(row["dir1"] or "").strip()
+ dir2 = str(row["dir2"] or "").strip()
+ file_name = str(row["file_name"] or "").strip()
+ if not dir1 or not dir2 or not file_name:
+ continue
+
+ dir_name = dir2
+ dir2id_table = _resolve_hardlink_dir2id_table_name(conn)
+
+ if dir2id_table:
+ try:
+ drow = conn.execute(
+ f"SELECT username FROM {_quote_ident(dir2id_table)} WHERE rowid = ? LIMIT 1",
+ (int(dir2),),
+ ).fetchone()
+ if drow and drow[0]:
+ dir_name = str(drow[0])
+ except Exception:
+ if username:
+ try:
+ drow = conn.execute(
+ f"SELECT dir_name FROM {_quote_ident(dir2id_table)} WHERE dir_id = ? AND username = ? LIMIT 1",
+ (dir2, username),
+ ).fetchone()
+ if drow and drow[0]:
+ dir_name = str(drow[0])
+ except Exception:
+ pass
+
+ roots: list[Path] = []
+ for r in [wxid_dir] + (extra_roots or []):
+ if not r:
+ continue
+ try:
+ rr = r.resolve()
+ except Exception:
+ rr = r
+ if rr not in roots:
+ roots.append(rr)
+
+ file_stem = Path(file_name).stem
+ file_variants = [file_name, f"{file_stem}_h.dat", f"{file_stem}_t.dat"]
+
+ for root in roots:
+ for fv in file_variants:
+ p = (root / dir1 / dir_name / fv).resolve()
+ try:
+ if p.exists() and p.is_file():
+ return p
+ except Exception:
+ continue
+
+ if username:
+ chat_hash = hashlib.md5(username.encode()).hexdigest()
+ for fv in file_variants:
+ p = (root / "msg" / "attach" / chat_hash / dir_name / "Img" / fv).resolve()
+ try:
+ if p.exists() and p.is_file():
+ return p
+ except Exception:
+ continue
+
+ return None
+ finally:
+ conn.close()
+
+
+@lru_cache(maxsize=4096)
+def _fallback_search_media_by_md5(weixin_root_str: str, md5: str, kind: str = "") -> Optional[str]:
+ if not weixin_root_str or not md5:
+ return None
+ try:
+ root = Path(weixin_root_str)
+ except Exception:
+ return None
+
+ kind_key = str(kind or "").lower().strip()
+
+ def _fast_find_emoji_in_cache() -> Optional[str]:
+ md5_prefix = md5[:2] if len(md5) >= 2 else ""
+ if not md5_prefix:
+ return None
+ cache_root = root / "cache"
+ try:
+ if not cache_root.exists() or not cache_root.is_dir():
+ return None
+ except Exception:
+ return None
+
+ exact_names = [
+ f"{md5}_h.dat",
+ f"{md5}_t.dat",
+ f"{md5}.dat",
+ f"{md5}.gif",
+ f"{md5}.webp",
+ f"{md5}.png",
+ f"{md5}.jpg",
+ ]
+ buckets = ["Emoticon", "emoticon", "Emoji", "emoji"]
+
+ candidates: list[Path] = []
+ try:
+ children = list(cache_root.iterdir())
+ except Exception:
+ children = []
+
+ for child in children:
+ try:
+ if not child.is_dir():
+ continue
+ except Exception:
+ continue
+ for bucket in buckets:
+ candidates.append(child / bucket / md5_prefix)
+
+ for bucket in buckets:
+ candidates.append(cache_root / bucket / md5_prefix)
+
+ seen: set[str] = set()
+ uniq: list[Path] = []
+ for c in candidates:
+ try:
+ rc = str(c.resolve())
+ except Exception:
+ rc = str(c)
+ if rc in seen:
+ continue
+ seen.add(rc)
+ uniq.append(c)
+
+ for base in uniq:
+ try:
+ if not base.exists() or not base.is_dir():
+ continue
+ except Exception:
+ continue
+
+ for name in exact_names:
+ p = base / name
+ try:
+ if p.exists() and p.is_file():
+ return str(p)
+ except Exception:
+ continue
+
+ try:
+ for p in base.glob(f"{md5}*"):
+ try:
+ if p.is_file():
+ return str(p)
+ except Exception:
+ continue
+ except Exception:
+ continue
+ return None
+
+ # 根据类型选择搜索目录
+ if kind_key == "file":
+ search_dirs = [root / "msg" / "file"]
+ elif kind_key == "emoji":
+ hit_fast = _fast_find_emoji_in_cache()
+ if hit_fast:
+ return hit_fast
+ search_dirs = [
+ root / "msg" / "emoji",
+ root / "msg" / "emoticon",
+ root / "emoji",
+ root / "emoticon",
+ root / "msg" / "attach",
+ root / "msg" / "file",
+ root / "msg" / "video",
+ ]
+ else:
+ search_dirs = [
+ root / "msg" / "attach",
+ root / "msg" / "file",
+ root / "msg" / "video",
+ root / "cache",
+ ]
+
+ # 根据类型选择搜索模式
+ if kind_key == "file":
+ patterns = [
+ f"*{md5}*",
+ ]
+ elif kind_key == "emoji":
+ patterns = [
+ f"{md5}_h.dat",
+ f"{md5}_t.dat",
+ f"{md5}.dat",
+ f"{md5}*.dat",
+ f"{md5}*.gif",
+ f"{md5}*.webp",
+ f"{md5}*.png",
+ f"{md5}*.jpg",
+ f"*{md5}*",
+ ]
+ else:
+ patterns = [
+ f"{md5}_h.dat",
+ f"{md5}_t.dat",
+ f"{md5}.dat",
+ f"{md5}*.dat",
+ f"{md5}*.jpg",
+ f"{md5}*.jpeg",
+ f"{md5}*.png",
+ f"{md5}*.gif",
+ f"{md5}*.webp",
+ f"{md5}*.mp4",
+ ]
+
+ for d in search_dirs:
+ try:
+ if not d.exists() or not d.is_dir():
+ continue
+ except Exception:
+ continue
+ for pat in patterns:
+ try:
+ for p in d.rglob(pat):
+ try:
+ if p.is_file():
+ return str(p)
+ except Exception:
+ continue
+ except Exception:
+ continue
+ return None
+
+
+def _guess_media_type_by_path(path: Path, fallback: str = "application/octet-stream") -> str:
+ try:
+ mt = mimetypes.guess_type(str(path.name))[0]
+ if mt:
+ return mt
+ except Exception:
+ pass
+ return fallback
+
+
+def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[str]]:
+ if not data:
+ return None, None
+
+ # (offset, magic, media_type)
+ candidates: list[tuple[int, bytes, str]] = [
+ (0, b"\x89PNG\r\n\x1a\n", "image/png"),
+ (0, b"\xff\xd8\xff", "image/jpeg"),
+ (0, b"GIF87a", "image/gif"),
+ (0, b"GIF89a", "image/gif"),
+ (0, b"wxgf", "application/octet-stream"),
+ (1, b"wxgf", "application/octet-stream"),
+ (2, b"wxgf", "application/octet-stream"),
+ (3, b"wxgf", "application/octet-stream"),
+ (4, b"wxgf", "application/octet-stream"),
+ (5, b"wxgf", "application/octet-stream"),
+ (6, b"wxgf", "application/octet-stream"),
+ (7, b"wxgf", "application/octet-stream"),
+ (8, b"wxgf", "application/octet-stream"),
+ (9, b"wxgf", "application/octet-stream"),
+ (10, b"wxgf", "application/octet-stream"),
+ (11, b"wxgf", "application/octet-stream"),
+ (12, b"wxgf", "application/octet-stream"),
+ (13, b"wxgf", "application/octet-stream"),
+ (14, b"wxgf", "application/octet-stream"),
+ (15, b"wxgf", "application/octet-stream"),
+ (0, b"RIFF", "application/octet-stream"),
+ (4, b"ftyp", "video/mp4"),
+ ]
+
+ for offset, magic, mt in candidates:
+ if len(data) < offset + len(magic):
+ continue
+ key = data[offset] ^ magic[0]
+ ok = True
+ for i in range(len(magic)):
+ if (data[offset + i] ^ key) != magic[i]:
+ ok = False
+ break
+ if not ok:
+ continue
+
+ decoded = bytes(b ^ key for b in data)
+
+ if magic == b"wxgf":
+ try:
+ payload = decoded[offset:] if offset > 0 else decoded
+ converted = _wxgf_to_image_bytes(payload)
+ if converted:
+ mtw = _detect_image_media_type(converted[:32])
+ if mtw != "application/octet-stream":
+ return converted, mtw
+ except Exception:
+ pass
+ continue
+
+ if offset == 0 and magic == b"RIFF":
+ if len(decoded) >= 12 and decoded[8:12] == b"WEBP":
+ return decoded, "image/webp"
+ continue
+
+ if mt == "application/octet-stream":
+ mt2 = _detect_image_media_type(decoded[:32])
+ if mt2 != "application/octet-stream":
+ return decoded, mt2
+ continue
+
+ return decoded, mt
+
+ preview_len = 8192
+ try:
+ preview_len = min(int(preview_len), int(len(data)))
+ except Exception:
+ preview_len = 8192
+
+ if preview_len > 0:
+ for key in range(256):
+ try:
+ pv = bytes(b ^ key for b in data[:preview_len])
+ except Exception:
+ continue
+ try:
+ scan = pv
+ if (
+ (scan.find(b"wxgf") >= 0)
+ or (scan.find(b"\x89PNG\r\n\x1a\n") >= 0)
+ or (scan.find(b"\xff\xd8\xff") >= 0)
+ or (scan.find(b"GIF87a") >= 0)
+ or (scan.find(b"GIF89a") >= 0)
+ or (scan.find(b"RIFF") >= 0)
+ or (scan.find(b"ftyp") >= 0)
+ ):
+ decoded = bytes(b ^ key for b in data)
+ dec2, mt2 = _try_strip_media_prefix(decoded)
+ if mt2 != "application/octet-stream":
+ return dec2, mt2
+ except Exception:
+ continue
+
+ return None, None
+
+
+def _detect_wechat_dat_version(data: bytes) -> int:
+ if not data or len(data) < 6:
+ return -1
+ sig = data[:6]
+ if sig == b"\x07\x08V1\x08\x07":
+ return 1
+ if sig == b"\x07\x08V2\x08\x07":
+ return 2
+ return 0
+
+
+def _extract_yyyymm_for_sort(p: Path) -> str:
+ m = re.search(r"(\d{4}-\d{2})", str(p))
+ return m.group(1) if m else "0000-00"
+
+
+@lru_cache(maxsize=16)
+def _get_wechat_template_most_common_last2(weixin_root_str: str) -> Optional[bytes]:
+ try:
+ root = Path(weixin_root_str)
+ if not root.exists() or not root.is_dir():
+ return None
+ except Exception:
+ return None
+
+ try:
+ template_files = list(root.rglob("*_t.dat"))
+ except Exception:
+ template_files = []
+
+ if not template_files:
+ return None
+
+ template_files.sort(key=_extract_yyyymm_for_sort, reverse=True)
+ last_bytes_list: list[bytes] = []
+ for file in template_files[:16]:
+ try:
+ with open(file, "rb") as f:
+ f.seek(-2, 2)
+ b2 = f.read(2)
+ if b2 and len(b2) == 2:
+ last_bytes_list.append(b2)
+ except Exception:
+ continue
+
+ if not last_bytes_list:
+ return None
+ return Counter(last_bytes_list).most_common(1)[0][0]
+
+
+@lru_cache(maxsize=16)
+def _find_wechat_xor_key(weixin_root_str: str) -> Optional[int]:
+ try:
+ root = Path(weixin_root_str)
+ if not root.exists() or not root.is_dir():
+ return None
+ except Exception:
+ return None
+
+ most_common = _get_wechat_template_most_common_last2(weixin_root_str)
+ if not most_common or len(most_common) != 2:
+ return None
+ x, y = most_common[0], most_common[1]
+ xor_key = x ^ 0xFF
+ if xor_key != (y ^ 0xD9):
+ return None
+ return xor_key
+
+
+def _get_wechat_v2_ciphertext(weixin_root: Path, most_common_last2: bytes) -> Optional[bytes]:
+ try:
+ template_files = list(weixin_root.rglob("*_t.dat"))
+ except Exception:
+ return None
+ if not template_files:
+ return None
+
+ template_files.sort(key=_extract_yyyymm_for_sort, reverse=True)
+ sig = b"\x07\x08V2\x08\x07"
+ for file in template_files:
+ try:
+ with open(file, "rb") as f:
+ if f.read(6) != sig:
+ continue
+ f.seek(-2, 2)
+ if f.read(2) != most_common_last2:
+ continue
+ f.seek(0xF)
+ ct = f.read(16)
+ if ct and len(ct) == 16:
+ return ct
+ except Exception:
+ continue
+ return None
+
+
+def _verify_wechat_aes_key(ciphertext: bytes, key16: bytes) -> bool:
+ try:
+ from Crypto.Cipher import AES
+
+ cipher = AES.new(key16[:16], AES.MODE_ECB)
+ plain = cipher.decrypt(ciphertext)
+ if plain.startswith(b"\xff\xd8\xff"):
+ return True
+ if plain.startswith(b"\x89PNG\r\n\x1a\n"):
+ return True
+ return False
+ except Exception:
+ return False
+
+
+class _MEMORY_BASIC_INFORMATION(ctypes.Structure):
+ _fields_ = [
+ ("BaseAddress", ctypes.c_void_p),
+ ("AllocationBase", ctypes.c_void_p),
+ ("AllocationProtect", ctypes.c_ulong),
+ ("RegionSize", ctypes.c_size_t),
+ ("State", ctypes.c_ulong),
+ ("Protect", ctypes.c_ulong),
+ ("Type", ctypes.c_ulong),
+ ]
+
+
+def _find_weixin_pid() -> Optional[int]:
+ if psutil is None:
+ return None
+ for p in psutil.process_iter(["name"]):
+ try:
+ name = (p.info.get("name") or "").lower()
+ if name in {"weixin.exe", "wechat.exe"}:
+ return int(p.pid)
+ except Exception:
+ continue
+ return None
+
+
+def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
+ pid = _find_weixin_pid()
+ if not pid:
+ return None
+
+ PROCESS_VM_READ = 0x0010
+ PROCESS_QUERY_INFORMATION = 0x0400
+ MEM_COMMIT = 0x1000
+ MEM_PRIVATE = 0x20000
+
+ kernel32 = ctypes.windll.kernel32
+
+ OpenProcess = kernel32.OpenProcess
+ OpenProcess.argtypes = [ctypes.c_ulong, ctypes.c_bool, ctypes.c_ulong]
+ OpenProcess.restype = ctypes.c_void_p
+
+ ReadProcessMemory = kernel32.ReadProcessMemory
+ ReadProcessMemory.argtypes = [
+ ctypes.c_void_p,
+ ctypes.c_void_p,
+ ctypes.c_void_p,
+ ctypes.c_size_t,
+ ctypes.POINTER(ctypes.c_size_t),
+ ]
+ ReadProcessMemory.restype = ctypes.c_bool
+
+ VirtualQueryEx = kernel32.VirtualQueryEx
+ VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t]
+ VirtualQueryEx.restype = ctypes.c_size_t
+
+ CloseHandle = kernel32.CloseHandle
+ CloseHandle.argtypes = [ctypes.c_void_p]
+ CloseHandle.restype = ctypes.c_bool
+
+ handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
+ if not handle:
+ return None
+
+ stop = threading.Event()
+ result: list[Optional[bytes]] = [None]
+ pattern = re.compile(rb"[^a-z0-9]([a-z0-9]{32})[^a-z0-9]", flags=re.IGNORECASE)
+
+ def read_mem(addr: int, size: int) -> Optional[bytes]:
+ buf = ctypes.create_string_buffer(size)
+ read = ctypes.c_size_t(0)
+ ok = ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, size, ctypes.byref(read))
+ if not ok or read.value <= 0:
+ return None
+ return buf.raw[: read.value]
+
+ def scan_region(base: int, region_size: int) -> Optional[bytes]:
+ chunk = 4 * 1024 * 1024
+ offset = 0
+ tail = b""
+ while offset < region_size and not stop.is_set():
+ to_read = min(chunk, region_size - offset)
+ b = read_mem(base + offset, int(to_read))
+ if not b:
+ return None
+ data = tail + b
+ for m in pattern.finditer(data):
+ cand32 = m.group(1)
+ cand16 = cand32[:16]
+ if _verify_wechat_aes_key(ciphertext, cand16):
+ return cand16
+ tail = data[-64:] if len(data) > 64 else data
+ offset += to_read
+ return None
+
+ regions: list[tuple[int, int]] = []
+ mbi = _MEMORY_BASIC_INFORMATION()
+ addr = 0
+ try:
+ while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)):
+ try:
+ if int(mbi.State) == MEM_COMMIT and int(mbi.Type) == MEM_PRIVATE:
+ base = int(mbi.BaseAddress)
+ size = int(mbi.RegionSize)
+ if size > 0:
+ regions.append((base, size))
+ addr = int(mbi.BaseAddress) + int(mbi.RegionSize)
+ except Exception:
+ addr += 0x1000
+ if addr <= 0:
+ break
+
+ with ThreadPoolExecutor(max_workers=min(32, max(1, len(regions)))) as ex:
+ for found in ex.map(lambda r: scan_region(r[0], r[1]), regions):
+ if found:
+ result[0] = found
+ stop.set()
+ break
+ finally:
+ CloseHandle(handle)
+
+ return result[0]
+
+
+def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None:
+ try:
+ payload = {
+ "xor": int(xor_key),
+ "aes": aes_key16.decode("ascii", errors="ignore"),
+ }
+ (account_dir / "_media_keys.json").write_text(
+ json.dumps(payload, ensure_ascii=False, indent=2),
+ encoding="utf-8",
+ )
+ except Exception:
+ pass
+
+
+def _decrypt_wechat_dat_v3(data: bytes, xor_key: int) -> bytes:
+ return bytes(b ^ xor_key for b in data)
+
+
+def _decrypt_wechat_dat_v4(data: bytes, xor_key: int, aes_key: bytes) -> bytes:
+ from Crypto.Cipher import AES
+ from Crypto.Util import Padding
+
+ header, rest = data[:0xF], data[0xF:]
+ signature, aes_size, xor_size = struct.unpack("<6sLLx", header)
+ aes_size += AES.block_size - aes_size % AES.block_size
+
+ aes_data = rest[:aes_size]
+ raw_data = rest[aes_size:]
+
+ cipher = AES.new(aes_key[:16], AES.MODE_ECB)
+ decrypted_data = Padding.unpad(cipher.decrypt(aes_data), AES.block_size)
+
+ if xor_size > 0:
+ raw_data = rest[aes_size:-xor_size]
+ xor_data = rest[-xor_size:]
+ xored_data = bytes(b ^ xor_key for b in xor_data)
+ else:
+ xored_data = b""
+
+ return decrypted_data + raw_data + xored_data
+
+
+def _load_media_keys(account_dir: Path) -> dict[str, Any]:
+ p = account_dir / "_media_keys.json"
+ if not p.exists():
+ return {}
+ try:
+ return json.loads(p.read_text(encoding="utf-8"))
+ except Exception:
+ return {}
+
+
+def _get_resource_dir(account_dir: Path) -> Path:
+ """获取解密资源输出目录"""
+ return account_dir / "resource"
+
+
+def _get_decrypted_resource_path(account_dir: Path, md5: str, ext: str = "") -> Path:
+ """根据MD5获取解密后资源的路径"""
+ resource_dir = _get_resource_dir(account_dir)
+ # 使用MD5前2位作为子目录,避免单目录文件过多
+ sub_dir = md5[:2] if len(md5) >= 2 else "00"
+ if ext:
+ return resource_dir / sub_dir / f"{md5}.{ext}"
+ return resource_dir / sub_dir / md5
+
+
+def _detect_image_extension(data: bytes) -> str:
+ """根据图片数据检测文件扩展名"""
+ if not data:
+ return "dat"
+ if data.startswith(b"\x89PNG\r\n\x1a\n"):
+ return "png"
+ if data.startswith(b"\xff\xd8\xff"):
+ return "jpg"
+ if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
+ return "gif"
+ if data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
+ return "webp"
+ return "dat"
+
+
+def _try_find_decrypted_resource(account_dir: Path, md5: str) -> Optional[Path]:
+ """尝试在解密资源目录中查找已解密的资源"""
+ if not md5:
+ return None
+ resource_dir = _get_resource_dir(account_dir)
+ if not resource_dir.exists():
+ return None
+ sub_dir = md5[:2] if len(md5) >= 2 else "00"
+ target_dir = resource_dir / sub_dir
+ if not target_dir.exists():
+ return None
+ # 查找匹配MD5的文件(可能有不同扩展名)
+ for ext in ["jpg", "png", "gif", "webp", "mp4", "dat"]:
+ p = target_dir / f"{md5}.{ext}"
+ if p.exists():
+ return p
+ return None
+
+
+def _read_and_maybe_decrypt_media(
+ path: Path,
+ account_dir: Optional[Path] = None,
+ weixin_root: Optional[Path] = None,
+) -> tuple[bytes, str]:
+ # Fast path: already a normal image
+ with open(path, "rb") as f:
+ head = f.read(64)
+
+ mt = _detect_image_media_type(head)
+ if mt != "application/octet-stream":
+ return path.read_bytes(), mt
+
+ if head.startswith(b"wxgf"):
+ data0 = path.read_bytes()
+ converted0 = _wxgf_to_image_bytes(data0)
+ if converted0:
+ mt0 = _detect_image_media_type(converted0[:32])
+ if mt0 != "application/octet-stream":
+ return converted0, mt0
+
+ try:
+ idx = head.find(b"wxgf")
+ except Exception:
+ idx = -1
+ if 0 < idx <= 4:
+ try:
+ data0 = path.read_bytes()
+ payload0 = data0[idx:]
+ converted0 = _wxgf_to_image_bytes(payload0)
+ if converted0:
+ mt0 = _detect_image_media_type(converted0[:32])
+ if mt0 != "application/octet-stream":
+ return converted0, mt0
+ except Exception:
+ pass
+
+ try:
+ data_pref = path.read_bytes()
+ stripped, mtp = _try_strip_media_prefix(data_pref)
+ if mtp != "application/octet-stream":
+ return stripped, mtp
+ except Exception:
+ pass
+
+ data = path.read_bytes()
+
+ dec, mt2 = _try_xor_decrypt_by_magic(data)
+ if dec is not None and mt2:
+ return dec, mt2
+
+ # Try WeChat .dat v1/v2 decrypt.
+ version = _detect_wechat_dat_version(data)
+ if version in (0, 1, 2):
+ root = weixin_root
+ if root is None and account_dir is not None:
+ root = _resolve_account_wxid_dir(account_dir)
+ if root is None and account_dir is not None:
+ ds = _resolve_account_db_storage_dir(account_dir)
+ root = ds.parent if ds else None
+
+ xor_key = _find_wechat_xor_key(str(root)) if root else None
+ if xor_key is None and account_dir is not None:
+ try:
+ keys2 = _load_media_keys(account_dir)
+ x2 = keys2.get("xor")
+ if x2 is not None:
+ xor_key = int(x2)
+ if not (0 <= int(xor_key) <= 255):
+ xor_key = None
+ else:
+ logger.debug("使用 _media_keys.json 中保存的 xor key")
+ except Exception:
+ xor_key = None
+ try:
+ if version == 0 and xor_key is not None:
+ out = _decrypt_wechat_dat_v3(data, xor_key)
+ try:
+ out2, mtp2 = _try_strip_media_prefix(out)
+ if mtp2 != "application/octet-stream":
+ return out2, mtp2
+ except Exception:
+ pass
+ if out.startswith(b"wxgf"):
+ converted = _wxgf_to_image_bytes(out)
+ if converted:
+ out = converted
+ logger.info(f"wxgf->image: {path} -> {len(out)} bytes")
+ else:
+ logger.info(f"wxgf->image failed: {path}")
+ mt0 = _detect_image_media_type(out[:32])
+ if mt0 != "application/octet-stream":
+ return out, mt0
+ elif version == 1 and xor_key is not None:
+ out = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef")
+ try:
+ out2, mtp2 = _try_strip_media_prefix(out)
+ if mtp2 != "application/octet-stream":
+ return out2, mtp2
+ except Exception:
+ pass
+ if out.startswith(b"wxgf"):
+ converted = _wxgf_to_image_bytes(out)
+ if converted:
+ out = converted
+ logger.info(f"wxgf->image: {path} -> {len(out)} bytes")
+ else:
+ logger.info(f"wxgf->image failed: {path}")
+ mt1 = _detect_image_media_type(out[:32])
+ if mt1 != "application/octet-stream":
+ return out, mt1
+ elif version == 2 and xor_key is not None and account_dir is not None and root is not None:
+ keys = _load_media_keys(account_dir)
+ aes_str = str(keys.get("aes") or "").strip()
+ aes_key16 = aes_str.encode("ascii", errors="ignore")[:16] if aes_str else b""
+
+ if not aes_key16:
+ most_common = _get_wechat_template_most_common_last2(str(root))
+ if most_common:
+ ct = _get_wechat_v2_ciphertext(Path(root), most_common)
+ else:
+ ct = None
+
+ if ct:
+ aes_key16 = _extract_wechat_aes_key_from_process(ct) or b""
+ if aes_key16:
+ _save_media_keys(account_dir, xor_key, aes_key16)
+
+ if aes_key16:
+ out = _decrypt_wechat_dat_v4(data, xor_key, aes_key16)
+ try:
+ out2, mtp2 = _try_strip_media_prefix(out)
+ if mtp2 != "application/octet-stream":
+ return out2, mtp2
+ except Exception:
+ pass
+ if out.startswith(b"wxgf"):
+ converted = _wxgf_to_image_bytes(out)
+ if converted:
+ out = converted
+ logger.info(f"wxgf->image: {path} -> {len(out)} bytes")
+ else:
+ logger.info(f"wxgf->image failed: {path}")
+ mt2b = _detect_image_media_type(out[:32])
+ if mt2b != "application/octet-stream":
+ return out, mt2b
+ except Exception:
+ pass
+
+ # Fallback: return as-is.
+ mt3 = _guess_media_type_by_path(path, fallback="application/octet-stream")
+ return data, mt3
+
+
+def _ensure_decrypted_resource_for_md5(
+ account_dir: Path,
+ md5: str,
+ source_path: Path,
+ weixin_root: Optional[Path] = None,
+) -> Optional[Path]:
+ if not md5 or not source_path:
+ return None
+
+ md5_lower = str(md5).lower()
+ existing = _try_find_decrypted_resource(account_dir, md5_lower)
+ if existing:
+ return existing
+
+ try:
+ if not source_path.exists() or not source_path.is_file():
+ return None
+ except Exception:
+ return None
+
+ data, mt0 = _read_and_maybe_decrypt_media(source_path, account_dir=account_dir, weixin_root=weixin_root)
+ mt2 = str(mt0 or "").strip()
+ if (not mt2) or mt2 == "application/octet-stream":
+ mt2 = _detect_image_media_type(data[:32])
+ if mt2 == "application/octet-stream":
+ try:
+ data2, mtp = _try_strip_media_prefix(data)
+ if mtp != "application/octet-stream":
+ data = data2
+ mt2 = mtp
+ except Exception:
+ pass
+ if mt2 == "application/octet-stream":
+ try:
+ if len(data) >= 8 and data[4:8] == b"ftyp":
+ mt2 = "video/mp4"
+ except Exception:
+ pass
+ if mt2 == "application/octet-stream":
+ return None
+
+ if str(mt2).startswith("image/"):
+ ext = _detect_image_extension(data)
+ elif str(mt2) == "video/mp4":
+ ext = "mp4"
+ else:
+ ext = Path(str(source_path.name)).suffix.lstrip(".").lower() or "dat"
+ output_path = _get_decrypted_resource_path(account_dir, md5_lower, ext)
+ try:
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ if not output_path.exists():
+ output_path.write_bytes(data)
+ except Exception:
+ return None
+
+ return output_path
+
+
+def _collect_all_dat_files(wxid_dir: Path) -> list[tuple[Path, str]]:
+ """收集所有需要解密的.dat文件,返回 (文件路径, md5) 列表"""
+ results: list[tuple[Path, str]] = []
+ if not wxid_dir or not wxid_dir.exists():
+ return results
+
+ # 搜索目录
+ search_dirs = [
+ wxid_dir / "msg" / "attach",
+ wxid_dir / "cache",
+ ]
+
+ for search_dir in search_dirs:
+ if not search_dir.exists():
+ continue
+ try:
+ for dat_file in search_dir.rglob("*.dat"):
+ if not dat_file.is_file():
+ continue
+ # 从文件名提取MD5
+ stem = dat_file.stem
+ # 文件名格式可能是: md5.dat, md5_t.dat, md5_h.dat 等
+ md5 = stem.split("_")[0] if "_" in stem else stem
+ # 验证是否是有效的MD5(32位十六进制)
+ if len(md5) == 32 and all(c in "0123456789abcdefABCDEF" for c in md5):
+ results.append((dat_file, md5.lower()))
+ except Exception as e:
+ logger.warning(f"扫描目录失败 {search_dir}: {e}")
+
+ return results
+
+
+def _decrypt_and_save_resource(
+ dat_path: Path,
+ md5: str,
+ account_dir: Path,
+ xor_key: int,
+ aes_key: Optional[bytes],
+) -> tuple[bool, str]:
+ """解密单个资源文件并保存到resource目录
+
+ Returns:
+ (success, message)
+ """
+ try:
+ data = dat_path.read_bytes()
+ if not data:
+ return False, "文件为空"
+
+ version = _detect_wechat_dat_version(data)
+ decrypted: Optional[bytes] = None
+
+ if version == 0:
+ # V3: 纯XOR解密
+ decrypted = _decrypt_wechat_dat_v3(data, xor_key)
+ elif version == 1:
+ # V4-V1: 使用固定AES密钥
+ decrypted = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef")
+ elif version == 2:
+ # V4-V2: 需要动态AES密钥
+ if aes_key and len(aes_key) >= 16:
+ decrypted = _decrypt_wechat_dat_v4(data, xor_key, aes_key[:16])
+ else:
+ return False, "V4-V2版本需要AES密钥"
+ else:
+ # 尝试简单XOR解密
+ dec, mt = _try_xor_decrypt_by_magic(data)
+ if dec:
+ decrypted = dec
+ else:
+ return False, f"未知加密版本: {version}"
+
+ if not decrypted:
+ return False, "解密结果为空"
+
+ if decrypted.startswith(b"wxgf"):
+ converted = _wxgf_to_image_bytes(decrypted)
+ if converted:
+ decrypted = converted
+
+ # 检测图片类型
+ ext = _detect_image_extension(decrypted)
+ mt = _detect_image_media_type(decrypted[:32])
+ if mt == "application/octet-stream":
+ # 解密可能失败,跳过
+ return False, "解密后非有效图片"
+
+ # 保存到resource目录
+ output_path = _get_decrypted_resource_path(account_dir, md5, ext)
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ output_path.write_bytes(decrypted)
+
+ return True, str(output_path)
+ except Exception as e:
+ return False, str(e)
+
+
+def _convert_silk_to_wav(silk_data: bytes) -> bytes:
+ """Convert SILK audio data to WAV format for browser playback."""
+ import tempfile
+
+ try:
+ import pilk
+ except ImportError:
+ # If pilk not installed, return original data
+ return silk_data
+
+ try:
+ # pilk.silk_to_wav works with file paths, so use temp files
+ with tempfile.NamedTemporaryFile(suffix=".silk", delete=False) as silk_file:
+ silk_file.write(silk_data)
+ silk_path = silk_file.name
+
+ wav_path = silk_path.replace(".silk", ".wav")
+
+ try:
+ pilk.silk_to_wav(silk_path, wav_path, rate=24000)
+ with open(wav_path, "rb") as wav_file:
+ wav_data = wav_file.read()
+ return wav_data
+ finally:
+ # Clean up temp files
+ import os
+
+ try:
+ os.unlink(silk_path)
+ except Exception:
+ pass
+ try:
+ os.unlink(wav_path)
+ except Exception:
+ pass
+ except Exception as e:
+ logger.warning(f"SILK to WAV conversion failed: {e}")
+ return silk_data
+
+
+def _resolve_media_path_for_kind(
+ account_dir: Path,
+ kind: str,
+ md5: str,
+ username: Optional[str],
+) -> Optional[Path]:
+ if not md5:
+ return None
+
+ kind_key = str(kind or "").strip().lower()
+
+ # 优先查找解密后的资源目录(图片、表情、视频缩略图)
+ if kind_key in {"image", "emoji", "video_thumb"}:
+ decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower())
+ if decrypted_path:
+ logger.debug(f"找到解密资源: {decrypted_path}")
+ return decrypted_path
+
+ # 回退到原始逻辑:从微信数据目录查找
+ wxid_dir = _resolve_account_wxid_dir(account_dir)
+ hardlink_db_path = account_dir / "hardlink.db"
+ db_storage_dir = _resolve_account_db_storage_dir(account_dir)
+
+ roots: list[Path] = []
+ if wxid_dir:
+ roots.append(wxid_dir)
+ roots.append(wxid_dir / "msg" / "attach")
+ roots.append(wxid_dir / "msg" / "file")
+ roots.append(wxid_dir / "msg" / "video")
+ roots.append(wxid_dir / "cache")
+ if db_storage_dir:
+ roots.append(db_storage_dir)
+ if not roots:
+ return None
+
+ p = _resolve_media_path_from_hardlink(
+ hardlink_db_path,
+ roots[0],
+ md5=str(md5),
+ kind=str(kind),
+ username=username,
+ extra_roots=roots[1:],
+ )
+ if (not p) and wxid_dir:
+ hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5), kind=kind_key)
+ if hit:
+ p = Path(hit)
+ return p
+
+
+def _pick_best_emoji_source_path(resolved: Path, md5: str) -> Optional[Path]:
+ if not resolved:
+ return None
+ try:
+ if resolved.exists() and resolved.is_file():
+ return resolved
+ except Exception:
+ pass
+
+ try:
+ if not (resolved.exists() and resolved.is_dir()):
+ return None
+ except Exception:
+ return None
+
+ md5s = str(md5 or "").lower().strip()
+ if not md5s:
+ return None
+
+ candidates = [
+ f"{md5s}_h.dat",
+ f"{md5s}_t.dat",
+ f"{md5s}.dat",
+ ]
+ exts = ["gif", "webp", "png", "jpg", "jpeg"]
+ for ext in exts:
+ candidates.append(f"{md5s}.{ext}")
+
+ for name in candidates:
+ p = resolved / name
+ try:
+ if p.exists() and p.is_file():
+ return p
+ except Exception:
+ continue
+
+ patterns = [f"{md5s}*.dat", f"{md5s}*", f"*{md5s}*"]
+ for pat in patterns:
+ try:
+ for p in resolved.glob(pat):
+ try:
+ if p.is_file():
+ return p
+ except Exception:
+ continue
+ except Exception:
+ continue
+ return None
+
+
+def _iter_emoji_source_candidates(resolved: Path, md5: str, limit: int = 20) -> list[Path]:
+ md5s = str(md5 or "").lower().strip()
+ if not md5s:
+ return []
+
+ best = _pick_best_emoji_source_path(resolved, md5s)
+ out: list[Path] = []
+ if best:
+ out.append(best)
+
+ try:
+ if not (resolved.exists() and resolved.is_dir()):
+ return out
+ except Exception:
+ return out
+
+ try:
+ files = [p for p in resolved.iterdir() if p.is_file()]
+ except Exception:
+ files = []
+
+ def score(p: Path) -> tuple[int, int, int]:
+ name = str(p.name).lower()
+ contains = 1 if md5s in name else 0
+ ext = str(p.suffix).lower().lstrip(".")
+ ext_rank = 0
+ if ext == "dat":
+ ext_rank = 3
+ elif ext in {"gif", "webp"}:
+ ext_rank = 2
+ elif ext in {"png", "jpg", "jpeg"}:
+ ext_rank = 1
+ try:
+ sz = int(p.stat().st_size)
+ except Exception:
+ sz = 0
+ return (contains, ext_rank, sz)
+
+ files_sorted = sorted(files, key=score, reverse=True)
+ for p in files_sorted:
+ if p not in out:
+ out.append(p)
+ if len(out) >= int(limit):
+ break
+ return out
diff --git a/src/wechat_decrypt_tool/path_fix.py b/src/wechat_decrypt_tool/path_fix.py
new file mode 100644
index 0000000..c0ddf65
--- /dev/null
+++ b/src/wechat_decrypt_tool/path_fix.py
@@ -0,0 +1,205 @@
+import json
+import os
+import re
+from typing import Callable, Optional
+
+from fastapi import HTTPException, Request
+from fastapi.routing import APIRoute
+
+from .logging_config import get_logger
+
+logger = get_logger(__name__)
+
+
+class PathFixRequest(Request):
+ """自定义Request类,自动修复JSON中的路径问题并检测相对路径"""
+
+ def _is_absolute_path(self, path: str) -> bool:
+ """检测是否为绝对路径,支持Windows、macOS、Linux"""
+ if not path:
+ return False
+
+ # Windows绝对路径:以盘符开头 (C:\, D:\, etc.)
+ if re.match(r'^[A-Za-z]:[/\\]', path):
+ return True
+
+ # Unix-like系统绝对路径:以 / 开头
+ if path.startswith('/'):
+ return True
+
+ return False
+
+ def _validate_paths_in_json(self, json_data: dict) -> Optional[str]:
+ """验证JSON中的路径,返回错误信息(如果有)"""
+ logger.info(f"开始验证路径,JSON数据: {json_data}")
+ # 检查db_storage_path字段(现在是必需的)
+ if 'db_storage_path' not in json_data:
+ return "缺少必需的db_storage_path参数,请提供具体的数据库存储路径。"
+
+ if 'db_storage_path' in json_data:
+ path = json_data['db_storage_path']
+
+ # 检查路径是否为空
+ if not path or not path.strip():
+ return "db_storage_path参数不能为空,请提供具体的数据库存储路径。"
+
+ logger.info(f"检查路径: {path}")
+ is_absolute = self._is_absolute_path(path)
+ logger.info(f"是否为绝对路径: {is_absolute}")
+ if not is_absolute:
+ error_msg = f"请提供绝对路径,当前输入的是相对路径: {path}。\n" \
+ f"Windows绝对路径示例: D:\\wechatMSG\\xwechat_files\\wxid_xxx\\db_storage"
+ return error_msg
+
+ # 检查路径是否存在
+ logger.info(f"检查路径是否存在: {path}")
+ path_exists = os.path.exists(path)
+ logger.info(f"路径存在性: {path_exists}")
+ if not path_exists:
+ # 检查父目录
+ parent_path = os.path.dirname(path)
+ logger.info(f"检查父目录: {parent_path}")
+ parent_exists = os.path.exists(parent_path)
+ logger.info(f"父目录存在性: {parent_exists}")
+ if parent_exists:
+ try:
+ files = os.listdir(parent_path)
+ logger.info(f"父目录内容: {files}")
+ error_msg = f"指定的路径不存在: {path}\n" \
+ f"父目录存在但不包含 'db_storage' 文件夹。\n" \
+ f"请检查路径是否正确,或确保微信数据已生成。"
+ except PermissionError:
+ logger.info(f"无法访问父目录,权限不足")
+ error_msg = f"指定的路径不存在: {path}\n" \
+ f"无法访问父目录,可能是权限问题。"
+ else:
+ error_msg = f"指定的路径不存在: {path}\n" \
+ f"父目录也不存在,请检查路径是否正确。"
+ logger.info(f"返回路径错误: {error_msg}")
+ return error_msg
+ else:
+ logger.info(f"路径存在,使用递归方式检查数据库文件")
+ try:
+ # 使用与自动检测相同的逻辑:递归查找.db文件
+ db_files = []
+ for root, dirs, files in os.walk(path):
+ # 只处理db_storage目录下的数据库文件(与自动检测逻辑一致)
+ if "db_storage" not in root:
+ continue
+ for file_name in files:
+ if not file_name.endswith(".db"):
+ continue
+ # 排除不需要解密的数据库(与自动检测逻辑一致)
+ if file_name in ["key_info.db"]:
+ continue
+ db_path = os.path.join(root, file_name)
+ db_files.append(db_path)
+
+ logger.info(f"递归查找到的数据库文件: {db_files}")
+ if not db_files:
+ error_msg = f"路径存在但没有找到有效的数据库文件: {path}\n" \
+ f"请确保该目录或其子目录包含微信数据库文件(.db文件)。\n" \
+ f"注意:key_info.db文件会被自动排除。"
+ logger.info(f"返回错误: 递归查找未找到有效.db文件")
+ return error_msg
+ logger.info(f"路径验证通过,递归找到{len(db_files)}个有效数据库文件")
+ except PermissionError:
+ error_msg = f"无法访问路径: {path}\n" \
+ f"权限不足,请检查文件夹权限。"
+ return error_msg
+ except Exception as e:
+ logger.warning(f"检查路径内容时出错: {e}")
+ # 如果无法检查内容,继续执行,让后续逻辑处理
+
+ return None
+
+ async def body(self) -> bytes:
+ """重写body方法,预处理JSON中的路径问题"""
+ body = await super().body()
+
+ # 只处理JSON请求
+ content_type = self.headers.get("content-type", "")
+ if "application/json" not in content_type:
+ return body
+
+ try:
+ # 将bytes转换为字符串
+ body_str = body.decode('utf-8')
+
+ # 首先尝试解析JSON以验证路径
+ try:
+ json_data = json.loads(body_str)
+ path_error = self._validate_paths_in_json(json_data)
+ if path_error:
+ logger.info(f"检测到路径错误: {path_error}")
+ # 我们将错误信息存储在请求中,稍后在路由处理器中检查
+ self.state.path_validation_error = path_error
+ return body
+ except json.JSONDecodeError as e:
+ # JSON格式错误,继续尝试修复
+ logger.info(f"JSON解析失败,尝试修复: {e}")
+ pass
+
+ # 使用正则表达式安全地处理Windows路径中的反斜杠
+ # 需要处理两种情况:
+ # 1. 以盘符开头的绝对路径:D:\path\to\file
+ # 2. 不以盘符开头的相对路径:wechatMSG\xwechat_files\...
+
+ # 匹配引号内包含反斜杠的路径(不管是否以盘符开头)
+ pattern = r'"([^"]*?\\[^"]*?)"'
+
+ def fix_path(match):
+ path = match.group(1)
+ # 将单个反斜杠替换为双反斜杠,但避免替换已经转义的反斜杠
+ fixed_path = re.sub(r'(? {fixed_body_str[:100]}...")
+
+ # 修复后重新验证路径
+ try:
+ json_data = json.loads(fixed_body_str)
+ logger.info(f"修复后解析JSON成功,开始验证路径")
+ path_error = self._validate_paths_in_json(json_data)
+ if path_error:
+ logger.info(f"修复后检测到路径错误: {path_error}")
+ self.state.path_validation_error = path_error
+ return fixed_body_str.encode('utf-8')
+ else:
+ logger.info(f"修复后路径验证通过")
+ except json.JSONDecodeError as e:
+ logger.warning(f"修复后JSON仍然解析失败: {e}")
+
+ return fixed_body_str.encode('utf-8')
+
+ except Exception as e:
+ # 如果处理失败,返回原始body
+ logger.warning(f"JSON路径修复失败,使用原始请求体: {e}")
+ return body
+
+
+class PathFixRoute(APIRoute):
+ """自定义APIRoute类,使用PathFixRequest并处理路径验证错误"""
+
+ def get_route_handler(self) -> Callable:
+ original_route_handler = super().get_route_handler()
+
+ async def custom_route_handler(request: Request) -> any:
+ # 将Request替换为我们的自定义Request
+ custom_request = PathFixRequest(request.scope, request.receive)
+
+ # 检查是否有路径验证错误
+ if hasattr(custom_request.state, 'path_validation_error'):
+ raise HTTPException(
+ status_code=400,
+ detail=custom_request.state.path_validation_error,
+ )
+
+ return await original_route_handler(custom_request)
+
+ return custom_route_handler