From 7d911b4580cf066b1f9c7e3ae3ea852e99623c6f Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Fri, 19 Dec 2025 16:17:07 +0800 Subject: [PATCH] =?UTF-8?q?refactor(api):=20=E6=8F=90=E5=8F=96=E8=BE=85?= =?UTF-8?q?=E5=8A=A9=E5=87=BD=E6=95=B0=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增chat_helpers.py: 聊天相关辅助函数(1064行) - 新增media_helpers.py: 媒体处理辅助函数(1550行) - 新增path_fix.py: 路径修复与验证功能(205行) --- src/wechat_decrypt_tool/chat_helpers.py | 1064 +++++++++++++++ src/wechat_decrypt_tool/media_helpers.py | 1550 ++++++++++++++++++++++ src/wechat_decrypt_tool/path_fix.py | 205 +++ 3 files changed, 2819 insertions(+) create mode 100644 src/wechat_decrypt_tool/chat_helpers.py create mode 100644 src/wechat_decrypt_tool/media_helpers.py create mode 100644 src/wechat_decrypt_tool/path_fix.py diff --git a/src/wechat_decrypt_tool/chat_helpers.py b/src/wechat_decrypt_tool/chat_helpers.py new file mode 100644 index 0000000..21becc1 --- /dev/null +++ b/src/wechat_decrypt_tool/chat_helpers.py @@ -0,0 +1,1064 @@ +import base64 +import hashlib +import html +import os +import re +import sqlite3 +from collections import Counter +from datetime import datetime +from pathlib import Path +from typing import Any, Optional +from urllib.parse import quote + +from fastapi import HTTPException + +from .logging_config import get_logger + +try: + import zstandard as zstd # type: ignore +except Exception: + zstd = None + +logger = get_logger(__name__) + +_REPO_ROOT = Path(__file__).resolve().parents[2] +_OUTPUT_DATABASES_DIR = _REPO_ROOT / "output" / "databases" +_DEBUG_SESSIONS = os.environ.get("WECHAT_TOOL_DEBUG_SESSIONS", "0") == "1" + + +def _list_decrypted_accounts() -> list[str]: + if not _OUTPUT_DATABASES_DIR.exists(): + return [] + + accounts: list[str] = [] + for p in _OUTPUT_DATABASES_DIR.iterdir(): + if not p.is_dir(): + continue + if (p / "session.db").exists() and (p / "contact.db").exists(): + accounts.append(p.name) + + accounts.sort() + return accounts + + +def _resolve_account_dir(account: Optional[str]) -> Path: + accounts = _list_decrypted_accounts() + if not accounts: + raise HTTPException( + status_code=404, + detail="No decrypted databases found. Please decrypt first.", + ) + + selected = account or accounts[0] + base = _OUTPUT_DATABASES_DIR.resolve() + candidate = (_OUTPUT_DATABASES_DIR / selected).resolve() + + if candidate != base and base not in candidate.parents: + raise HTTPException(status_code=400, detail="Invalid account path.") + + if not candidate.exists() or not candidate.is_dir(): + raise HTTPException(status_code=404, detail="Account not found.") + + if not (candidate / "session.db").exists(): + raise HTTPException(status_code=404, detail="session.db not found for this account.") + if not (candidate / "contact.db").exists(): + raise HTTPException(status_code=404, detail="contact.db not found for this account.") + + return candidate + + +def _should_keep_session(username: str, include_official: bool) -> bool: + if not username: + return False + + if not include_official and username.startswith("gh_"): + return False + + if username.startswith(("weixin", "qqmail", "fmessage", "medianote", "floatbottle", "newsapp")): + return False + + if "@kefu.openim" in username: + return False + if "@openim" in username: + return False + if "service_" in username: + return False + + if username in { + "brandsessionholder", + "brandservicesessionholder", + "notifymessage", + "opencustomerservicemsg", + "notification_messages", + "userexperience_alarm", + }: + return False + + return username.endswith("@chatroom") or username.startswith("wxid_") or ("@" not in username) + + +def _format_session_time(ts: Optional[int]) -> str: + if not ts: + return "" + try: + dt = datetime.fromtimestamp(int(ts)) + now = datetime.now() + if dt.date() == now.date(): + return dt.strftime("%H:%M") + return dt.strftime("%m/%d") + except Exception: + return "" + + +def _infer_last_message_brief(msg_type: Optional[int], sub_type: Optional[int]) -> str: + t = int(msg_type or 0) + s = int(sub_type or 0) + + if t == 1: + return "[Text]" + if t == 3: + return "[Image]" + if t == 34: + return "[Voice]" + if t == 42: + return "[Contact Card]" + if t == 43: + return "[Video]" + if t == 47: + return "[Emoji]" + if t == 48: + return "[Location]" + if t == 49: + if s == 5: + return "[Link]" + if s == 6: + return "[File]" + if s in (33, 36): + return "[Mini Program]" + if s == 57: + return "[Quote]" + if s in (63, 88): + return "[Live]" + if s == 87: + return "[Announcement]" + if s == 2000: + return "[Transfer]" + if s == 2003: + return "[Red Packet]" + if s == 19: + return "[Chat History]" + return "[App Message]" + if t == 10000: + return "[System]" + return "[Message]" + + +def _infer_message_brief_by_local_type(local_type: Optional[int]) -> str: + t = int(local_type or 0) + if t == 1: + return "" + if t == 3: + return "[Image]" + if t == 34: + return "[Voice]" + if t == 43: + return "[Video]" + if t == 47: + return "[Emoji]" + if t == 48: + return "[Location]" + if t == 50: + return "[VoIP]" + if t == 10000: + return "[System]" + if t == 244813135921: + return "[Quote]" + if t == 17179869233: + return "[Link]" + if t == 21474836529: + return "[Article]" + if t == 154618822705: + return "[Mini Program]" + if t == 12884901937: + return "[Music]" + if t == 8594229559345: + return "[Red Packet]" + if t == 81604378673: + return "[Chat History]" + if t == 266287972401: + return "[Pat]" + if t == 8589934592049: + return "[Transfer]" + if t == 270582939697: + return "[Live]" + if t == 25769803825: + return "[File]" + return "[Message]" + + +def _quote_ident(ident: str) -> str: + return '"' + ident.replace('"', '""') + '"' + + +def _resolve_msg_table_name(conn: sqlite3.Connection, username: str) -> Optional[str]: + if not username: + return None + md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest() + expected = f"msg_{md5_hex}".lower() + expected_chat = f"chat_{md5_hex}".lower() + + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + names = [r[0] for r in rows if r and r[0]] + + for name in names: + if str(name).lower() == expected: + return str(name) + + for name in names: + if str(name).lower() == expected_chat: + return str(name) + + for name in names: + ln = str(name).lower() + if ln.startswith("msg_") and md5_hex in ln: + return str(name) + if ln.startswith("chat_") and md5_hex in ln: + return str(name) + + for name in names: + if md5_hex in str(name).lower(): + return str(name) + + partial = md5_hex[:24] + for name in names: + if partial in str(name).lower(): + return str(name) + + return None + + +def _query_head_image_usernames(head_image_db_path: Path, usernames: list[str]) -> set[str]: + uniq = list(dict.fromkeys([u for u in usernames if u])) + if not uniq: + return set() + if not head_image_db_path.exists(): + return set() + + conn = sqlite3.connect(str(head_image_db_path)) + try: + placeholders = ",".join(["?"] * len(uniq)) + rows = conn.execute( + f"SELECT username FROM head_image WHERE username IN ({placeholders})", + uniq, + ).fetchall() + return {str(r[0]) for r in rows if r and r[0]} + finally: + conn.close() + + +def _build_avatar_url(account_dir_name: str, username: str) -> str: + return f"/api/chat/avatar?account={quote(account_dir_name)}&username={quote(username)}" + + +def _decode_sqlite_text(value: Any) -> str: + if value is None: + return "" + if isinstance(value, bytes): + try: + return value.decode("utf-8", errors="ignore") + except Exception: + return "" + if isinstance(value, memoryview): + try: + return bytes(value).decode("utf-8", errors="ignore") + except Exception: + return "" + return str(value) + + +def _is_mostly_printable_text(s: str) -> bool: + if not s: + return False + sample = s[:600] + if not sample: + return False + printable = sum(1 for ch in sample if ch.isprintable() or ch in {"\n", "\r", "\t"}) + return (printable / len(sample)) >= 0.85 + + +def _looks_like_xml(s: str) -> bool: + if not s: + return False + t = s.lstrip() + if t.startswith('"') and t.endswith('"'): + t = t.strip('"').lstrip() + return t.startswith("<") + + +def _decode_message_content(compress_value: Any, message_value: Any) -> str: + msg_text = _decode_sqlite_text(message_value) + + if isinstance(message_value, (bytes, bytearray, memoryview)): + raw = bytes(message_value) if isinstance(message_value, memoryview) else message_value + if raw.startswith(b"\x28\xb5\x2f\xfd") and zstd is not None: + try: + out = zstd.decompress(raw) + s = out.decode("utf-8", errors="ignore") + s = html.unescape(s.strip()) + if _looks_like_xml(s) or _is_mostly_printable_text(s): + msg_text = s + except Exception: + pass + + if compress_value is None: + return msg_text + + def try_decode_text_blob(text: str) -> Optional[str]: + t = (text or "").strip() + if not t: + return None + + if len(t) >= 16 and len(t) % 2 == 0 and re.fullmatch(r"[0-9a-fA-F]+", t): + try: + raw = bytes.fromhex(t) + if zstd is not None: + try: + out = zstd.decompress(raw) + s2 = out.decode("utf-8", errors="ignore") + s2 = html.unescape(s2.strip()) + if _looks_like_xml(s2) or _is_mostly_printable_text(s2): + return s2 + except Exception: + pass + s2 = raw.decode("utf-8", errors="ignore") + s2 = html.unescape(s2.strip()) + if _looks_like_xml(s2) or _is_mostly_printable_text(s2): + return s2 + except Exception: + return None + + if len(t) >= 24 and len(t) % 4 == 0 and re.fullmatch(r"[A-Za-z0-9+/=]+", t): + try: + raw = base64.b64decode(t) + if zstd is not None: + try: + out = zstd.decompress(raw) + s2 = out.decode("utf-8", errors="ignore") + s2 = html.unescape(s2.strip()) + if _looks_like_xml(s2) or _is_mostly_printable_text(s2): + return s2 + except Exception: + pass + s2 = raw.decode("utf-8", errors="ignore") + s2 = html.unescape(s2.strip()) + if _looks_like_xml(s2) or _is_mostly_printable_text(s2): + return s2 + except Exception: + return None + + return None + + if isinstance(compress_value, str): + s = html.unescape(compress_value.strip()) + s2 = try_decode_text_blob(s) + if s2: + return s2 + if _looks_like_xml(s) or _is_mostly_printable_text(s): + return s + return msg_text + + data: Optional[bytes] = None + if isinstance(compress_value, memoryview): + data = bytes(compress_value) + elif isinstance(compress_value, (bytes, bytearray)): + data = bytes(compress_value) + + if not data: + return msg_text + + if zstd is not None: + try: + out = zstd.decompress(data) + s = out.decode("utf-8", errors="ignore") + s = html.unescape(s.strip()) + if _looks_like_xml(s) or _is_mostly_printable_text(s): + return s + except Exception: + pass + + try: + s = data.decode("utf-8", errors="ignore") + s = html.unescape(s.strip()) + s2 = try_decode_text_blob(s) + if s2: + return s2 + if _looks_like_xml(s) or _is_mostly_printable_text(s): + return s + except Exception: + pass + + return msg_text + + +_MD5_HEX_RE = re.compile(rb"(?i)[0-9a-f]{32}") + + +def _extract_md5_from_blob(blob: Any) -> str: + if blob is None: + return "" + if isinstance(blob, memoryview): + data = bytes(blob) + elif isinstance(blob, (bytes, bytearray)): + data = bytes(blob) + else: + try: + data = bytes(blob) + except Exception: + return "" + + if not data: + return "" + m = _MD5_HEX_RE.findall(data) + if not m: + return "" + best = Counter([x.lower() for x in m]).most_common(1)[0][0] + try: + return best.decode("ascii", errors="ignore") + except Exception: + return "" + + +def _resource_lookup_chat_id(resource_conn: sqlite3.Connection, username: str) -> Optional[int]: + if not username: + return None + try: + row = resource_conn.execute( + "SELECT rowid FROM ChatName2Id WHERE user_name = ? LIMIT 1", + (username,), + ).fetchone() + if row and row[0] is not None: + return int(row[0]) + except Exception: + return None + return None + + +def _lookup_resource_md5( + resource_conn: sqlite3.Connection, + chat_id: Optional[int], + message_local_type: int, + server_id: int, + local_id: int, + create_time: int, +) -> str: + if server_id <= 0 and local_id <= 0: + return "" + + where_chat = "" + params_prefix: list[Any] = [] + if chat_id is not None and int(chat_id) > 0: + where_chat = " AND chat_id = ?" + params_prefix.append(int(chat_id)) + + where_type = "" + if int(message_local_type) > 0: + where_type = " AND message_local_type = ?" + params_prefix.append(int(message_local_type)) + + try: + if server_id > 0: + row = resource_conn.execute( + "SELECT packed_info FROM MessageResourceInfo WHERE message_svr_id = ?" + + where_chat + + where_type + + " ORDER BY message_id DESC LIMIT 1", + [int(server_id)] + params_prefix, + ).fetchone() + if row and row[0] is not None: + md5 = _extract_md5_from_blob(row[0]) + if md5: + return md5 + except Exception: + pass + + try: + if local_id > 0 and create_time > 0: + row = resource_conn.execute( + "SELECT packed_info FROM MessageResourceInfo WHERE message_local_id = ? AND message_create_time = ?" + + where_chat + + where_type + + " ORDER BY message_id DESC LIMIT 1", + [int(local_id), int(create_time)] + params_prefix, + ).fetchone() + if row and row[0] is not None: + return _extract_md5_from_blob(row[0]) + except Exception: + pass + + return "" + + +def _strip_cdata(s: str) -> str: + if not s: + return "" + out = s.replace("", "") + return out.strip() + + +def _extract_xml_tag_text(xml_text: str, tag: str) -> str: + if not xml_text or not tag: + return "" + m = re.search( + rf"<{re.escape(tag)}>(.*?)", + xml_text, + flags=re.IGNORECASE | re.DOTALL, + ) + if not m: + return "" + return _strip_cdata(m.group(1) or "") + + +def _extract_xml_attr(xml_text: str, attr: str) -> str: + if not xml_text or not attr: + return "" + m = re.search(rf"{re.escape(attr)}\s*=\s*['\"]([^'\"]+)['\"]", xml_text, flags=re.IGNORECASE) + return (m.group(1) or "").strip() if m else "" + + +def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str: + v = _extract_xml_tag_text(xml_text, name) + if v: + return v + return _extract_xml_attr(xml_text, name) + + +def _extract_refermsg_block(xml_text: str) -> str: + if not xml_text: + return "" + m = re.search(r"(]*>.*?)", xml_text, flags=re.IGNORECASE | re.DOTALL) + return (m.group(1) or "").strip() if m else "" + + +def _infer_transfer_status_text( + is_sent: bool, + paysubtype: str, + receivestatus: str, + sendertitle: str, + receivertitle: str, + senderdes: str, + receiverdes: str, +) -> str: + t = str(paysubtype or "").strip() + rs = str(receivestatus or "").strip() + + if rs == "1": + return "已收款" + if rs == "2": + return "已退还" + if rs == "3": + return "已过期" + + if t == "4": + return "已退还" + if t == "9": + return "已被退还" + if t == "10": + return "已过期" + + if t == "8": + return "发起转账" + if t == "3": + return "已收款" if is_sent else "已被接收" + if t == "1": + return "转账" + + title = sendertitle if is_sent else receivertitle + if title: + return title + des = senderdes if is_sent else receiverdes + if des: + return des + return "转账" + + +def _split_group_sender_prefix(text: str) -> tuple[str, str]: + if not text: + return "", text + sep = text.find(":\n") + if sep <= 0: + return "", text + prefix = text[:sep].strip() + body = text[sep + 2 :].lstrip("\n") + if not prefix or len(prefix) > 128: + return "", text + if re.search(r"\s", prefix): + return "", text + if prefix.startswith("wxid_") or prefix.endswith("@chatroom") or "@" in prefix: + return prefix, body + return "", text + + +def _extract_sender_from_group_xml(xml_text: str) -> str: + if not xml_text: + return "" + + v = _extract_xml_tag_text(xml_text, "fromusername") + if v: + return v + v = _extract_xml_attr(xml_text, "fromusername") + if v: + return v + return "" + + +def _parse_pat_message(text: str, contact_rows: dict[str, sqlite3.Row]) -> str: + template = _extract_xml_tag_text(text, "template") + if not template: + return "[拍一拍]" + wxids = list({m.group(1) for m in re.finditer(r"\$\{([^}]+)\}", template) if m.group(1)}) + rendered = template + for wxid in wxids: + row = contact_rows.get(wxid) + name = _pick_display_name(row, wxid) + rendered = rendered.replace(f"${{{wxid}}}", name) + return rendered.strip() or "[拍一拍]" + + +def _parse_quote_message(text: str) -> str: + title = _extract_xml_tag_text(text, "title") + if title: + return title + refer = _extract_xml_tag_text(text, "content") + if refer: + return refer + return "[引用消息]" + + +def _parse_app_message(text: str) -> dict[str, Any]: + app_type_raw = _extract_xml_tag_text(text, "type") + try: + app_type = int(str(app_type_raw or "0").strip() or "0") + except Exception: + app_type = 0 + title = _extract_xml_tag_text(text, "title") + des = _extract_xml_tag_text(text, "des") + url = _extract_xml_tag_text(text, "url") + + if "" in text.lower(): + return {"renderType": "system", "content": "[拍一拍]"} + + if app_type in (5, 68) and url: + thumb_url = _extract_xml_tag_text(text, "thumburl") + return { + "renderType": "link", + "content": des or title or "[链接]", + "title": title or des or "", + "url": url, + "thumbUrl": thumb_url or "", + } + + if app_type in (6, 74): + file_name = title or "" + total_len = _extract_xml_tag_text(text, "totallen") + file_md5 = ( + _extract_xml_tag_or_attr(text, "md5") + or _extract_xml_tag_or_attr(text, "filemd5") + or _extract_xml_tag_or_attr(text, "file_md5") + ) + return { + "renderType": "file", + "content": f"[文件] {file_name}".strip(), + "title": file_name, + "size": total_len or "", + "fileMd5": file_md5 or "", + } + + if app_type == 57 or "]*>.*?)", + "", + text, + flags=re.IGNORECASE | re.DOTALL, + ) + except Exception: + text_wo_refer = text + + reply_text = _extract_xml_tag_text(text_wo_refer, "title") or _extract_xml_tag_text(text, "title") + refer_displayname = _extract_xml_tag_or_attr(refer_block, "displayname") + refer_content = _extract_xml_tag_text(refer_block, "content") + refer_type = _extract_xml_tag_or_attr(refer_block, "type") + + rt = (reply_text or "").strip() + rc = (refer_content or "").strip() + if rt and rc: + if rc == rt: + refer_content = "" + else: + lines = [ln.strip() for ln in rc.splitlines()] + if lines and lines[0] == rt: + refer_content = "\n".join(rc.splitlines()[1:]).lstrip() + elif rc.startswith(rt): + rest = rc[len(rt) :].lstrip() + refer_content = rest + + t = str(refer_type or "").strip() + if t == "3": + refer_content = "[图片]" + elif t == "47": + refer_content = "[表情]" + elif t == "43" or t == "62": + refer_content = "[视频]" + elif t == "34": + refer_content = "[语音]" + elif t == "49" and refer_content: + refer_content = f"[链接] {refer_content}".strip() + + return { + "renderType": "quote", + "content": reply_text or "[引用消息]", + "quoteTitle": refer_displayname or "", + "quoteContent": refer_content or "", + } + + if app_type == 2000 or ( + " list[Path]: + if not account_dir.exists(): + return [] + + candidates: list[Path] = [] + for p in account_dir.glob("*.db"): + n = p.name + ln = n.lower() + if ln in {"session.db", "contact.db", "head_image.db"}: + continue + if ln == "message_resource.db": + continue + if ln == "message_fts.db": + continue + + if re.match(r"^message(_\d+)?\.db$", ln): + candidates.append(p) + continue + if re.match(r"^biz_message(_\d+)?\.db$", ln): + candidates.append(p) + continue + if "message" in ln and ln.endswith(".db"): + candidates.append(p) + continue + candidates.sort(key=lambda x: x.name) + return candidates + + +def _resolve_msg_table_name_by_map(lower_to_actual: dict[str, str], username: str) -> Optional[str]: + if not username: + return None + md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest() + expected = f"msg_{md5_hex}".lower() + expected_chat = f"chat_{md5_hex}".lower() + + if expected in lower_to_actual: + return lower_to_actual[expected] + if expected_chat in lower_to_actual: + return lower_to_actual[expected_chat] + + for ln, actual in lower_to_actual.items(): + if ln.startswith("msg_") and md5_hex in ln: + return actual + if ln.startswith("chat_") and md5_hex in ln: + return actual + + for ln, actual in lower_to_actual.items(): + if md5_hex in ln: + return actual + + partial = md5_hex[:24] + for ln, actual in lower_to_actual.items(): + if partial in ln: + return actual + + return None + + +def _build_latest_message_preview( + username: str, + local_type: int, + raw_text: str, + is_group: bool, + sender_username: str = "", +) -> str: + raw_text = (raw_text or "").strip() + sender_prefix = "" + if is_group and raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')): + sender_prefix, raw_text = _split_group_sender_prefix(raw_text) + if is_group and (not sender_prefix) and sender_username: + sender_prefix = str(sender_username).strip() + + content_text = "" + if local_type == 10000: + if "revokemsg" in raw_text: + content_text = "撤回了一条消息" + else: + content_text = re.sub(r"]*>", "", raw_text) + content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]" + elif local_type == 244813135921: + parsed = _parse_app_message(raw_text) + qt = str(parsed.get("quoteTitle") or "").strip() + qc = str(parsed.get("quoteContent") or "").strip() + c0 = str(parsed.get("content") or "").strip() + content_text = qc or c0 or qt or "[引用消息]" + elif local_type == 49: + parsed = _parse_app_message(raw_text) + rt = str(parsed.get("renderType") or "") + content_text = str(parsed.get("content") or "") + title_text = str(parsed.get("title") or "").strip() + if rt == "file" and title_text: + content_text = title_text + if (not content_text) and rt == "transfer": + content_text = ( + str(parsed.get("senderTitle") or "") + or str(parsed.get("receiverTitle") or "") + or "转账" + ) + if not content_text: + content_text = title_text or str(parsed.get("url") or "") + elif local_type == 25769803825: + parsed = _parse_app_message(raw_text) + title_text = str(parsed.get("title") or "").strip() + content_text = title_text or str(parsed.get("content") or "").strip() or "[文件]" + elif local_type == 3: + content_text = "[图片]" + elif local_type == 34: + duration = _extract_xml_attr(raw_text, "voicelength") + content_text = f"[语音 {duration}秒]" if duration else "[语音]" + elif local_type == 43 or local_type == 62: + content_text = "[视频]" + elif local_type == 47: + content_text = "[表情]" + else: + if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')): + content_text = raw_text + else: + content_text = _infer_message_brief_by_local_type(local_type) + + content_text = (content_text or "").strip() or _infer_message_brief_by_local_type(local_type) + content_text = re.sub(r"\s+", " ", content_text).strip() + if sender_prefix and content_text: + return f"{sender_prefix}: {content_text}" + return content_text + + +def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]: + if not usernames: + return {} + + db_paths = _iter_message_db_paths(account_dir) + if not db_paths: + return {} + + remaining = {u for u in usernames if u} + best: dict[str, tuple[tuple[int, int, int], str]] = {} + + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] account_dir={account_dir} usernames={len(remaining)} dbs={len(db_paths)}" + ) + logger.info( + f"[sessions.preview] db_paths={', '.join([p.name for p in db_paths[:8]])}{'...' if len(db_paths) > 8 else ''}" + ) + + for db_path in db_paths: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + names = [str(r[0]) for r in rows if r and r[0]] + lower_to_actual = {n.lower(): n for n in names} + + found: dict[str, str] = {} + for u in list(remaining): + tn = _resolve_msg_table_name_by_map(lower_to_actual, u) + if tn: + found[u] = tn + + if not found: + continue + + conn.text_factory = bytes + for u, tn in found.items(): + quoted = _quote_ident(tn) + try: + try: + r = conn.execute( + "SELECT " + "m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, " + "n.user_name AS sender_username " + f"FROM {quoted} m " + "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " + "ORDER BY m.create_time DESC, m.sort_seq DESC, m.local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + r = conn.execute( + "SELECT " + "local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username " + f"FROM {quoted} " + "ORDER BY create_time DESC, sort_seq DESC, local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception as e: + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] db={db_path.name} username={u} table={tn} query_failed={e}" + ) + continue + if r is None: + continue + + local_type = int(r["local_type"] or 0) + create_time = int(r["create_time"] or 0) + sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0 + local_id = int(r["local_id"] or 0) + sort_key = (create_time, sort_seq, local_id) + + raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip() + sender_username = _decode_sqlite_text(r["sender_username"]).strip() + preview = _build_latest_message_preview( + username=u, + local_type=local_type, + raw_text=raw_text, + is_group=bool(u.endswith("@chatroom")), + sender_username=sender_username, + ) + if not preview: + continue + + prev = best.get(u) + if prev is None or sort_key > prev[0]: + best[u] = (sort_key, preview) + finally: + conn.close() + + previews = {u: v[1] for u, v in best.items() if v and v[1]} + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] built_previews={len(previews)} remaining_without_preview={len(remaining - set(previews.keys()))}" + ) + return previews + + +def _pick_display_name(contact_row: Optional[sqlite3.Row], fallback_username: str) -> str: + if contact_row is None: + return fallback_username + + for key in ("remark", "nick_name", "alias"): + try: + v = contact_row[key] + except Exception: + v = None + if isinstance(v, str) and v.strip(): + return v.strip() + + return fallback_username + + +def _pick_avatar_url(contact_row: Optional[sqlite3.Row]) -> Optional[str]: + if contact_row is None: + return None + + for key in ("big_head_url", "small_head_url"): + try: + v = contact_row[key] + except Exception: + v = None + if isinstance(v, str) and v.strip(): + return v.strip() + + return None + + +def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str, sqlite3.Row]: + uniq = list(dict.fromkeys([u for u in usernames if u])) + if not uniq: + return {} + + result: dict[str, sqlite3.Row] = {} + + conn = sqlite3.connect(str(contact_db_path)) + conn.row_factory = sqlite3.Row + try: + def query_table(table: str, targets: list[str]) -> None: + if not targets: + return + placeholders = ",".join(["?"] * len(targets)) + sql = f""" + SELECT username, remark, nick_name, alias, big_head_url, small_head_url + FROM {table} + WHERE username IN ({placeholders}) + """ + rows = conn.execute(sql, targets).fetchall() + for r in rows: + result[r["username"]] = r + + query_table("contact", uniq) + missing = [u for u in uniq if u not in result] + query_table("stranger", missing) + return result + finally: + conn.close() diff --git a/src/wechat_decrypt_tool/media_helpers.py b/src/wechat_decrypt_tool/media_helpers.py new file mode 100644 index 0000000..46cf4fa --- /dev/null +++ b/src/wechat_decrypt_tool/media_helpers.py @@ -0,0 +1,1550 @@ +import ctypes +import hashlib +import json +import mimetypes +import os +import re +import sqlite3 +import struct +import threading +from collections import Counter +from concurrent.futures import ThreadPoolExecutor +from functools import lru_cache +from pathlib import Path +from typing import Any, Optional + +from fastapi import HTTPException + +from .logging_config import get_logger + +logger = get_logger(__name__) + +try: + import psutil # type: ignore +except Exception: + psutil = None + + +# 仓库根目录(用于定位 output/databases) +_REPO_ROOT = Path(__file__).resolve().parents[2] +_OUTPUT_DATABASES_DIR = _REPO_ROOT / "output" / "databases" +_PACKAGE_ROOT = Path(__file__).resolve().parent + + +def _list_decrypted_accounts() -> list[str]: + """列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)""" + if not _OUTPUT_DATABASES_DIR.exists(): + return [] + + accounts: list[str] = [] + for p in _OUTPUT_DATABASES_DIR.iterdir(): + if not p.is_dir(): + continue + if (p / "session.db").exists() and (p / "contact.db").exists(): + accounts.append(p.name) + + accounts.sort() + return accounts + + +def _resolve_account_dir(account: Optional[str]) -> Path: + """解析账号目录,并进行路径安全校验(防止路径穿越)""" + accounts = _list_decrypted_accounts() + if not accounts: + raise HTTPException( + status_code=404, + detail="No decrypted databases found. Please decrypt first.", + ) + + selected = account or accounts[0] + base = _OUTPUT_DATABASES_DIR.resolve() + candidate = (_OUTPUT_DATABASES_DIR / selected).resolve() + + if candidate != base and base not in candidate.parents: + raise HTTPException(status_code=400, detail="Invalid account path.") + + if not candidate.exists() or not candidate.is_dir(): + raise HTTPException(status_code=404, detail="Account not found.") + + if not (candidate / "session.db").exists(): + raise HTTPException(status_code=404, detail="session.db not found for this account.") + if not (candidate / "contact.db").exists(): + raise HTTPException(status_code=404, detail="contact.db not found for this account.") + + return candidate + + +def _detect_image_media_type(data: bytes) -> str: + if not data: + return "application/octet-stream" + + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return "image/png" + if data.startswith(b"\xff\xd8\xff"): + return "image/jpeg" + if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"): + return "image/gif" + if data.startswith(b"RIFF") and data[8:12] == b"WEBP": + return "image/webp" + return "application/octet-stream" + + +class _WxAMConfig(ctypes.Structure): + _fields_ = [ + ("mode", ctypes.c_int), + ("reserved", ctypes.c_int), + ] + + +@lru_cache(maxsize=1) +def _get_wxam_decoder(): + if os.name != "nt": + return None + dll_path = _PACKAGE_ROOT / "native" / "VoipEngine.dll" + if not dll_path.exists(): + logger.warning(f"WxAM decoder DLL not found: {dll_path}") + return None + try: + voip_engine = ctypes.WinDLL(str(dll_path)) + fn = voip_engine.wxam_dec_wxam2pic_5 + fn.argtypes = [ + ctypes.c_int64, + ctypes.c_int, + ctypes.c_int64, + ctypes.POINTER(ctypes.c_int), + ctypes.c_int64, + ] + fn.restype = ctypes.c_int64 + logger.info(f"WxAM decoder loaded: {dll_path}") + return fn + except Exception as e: + logger.warning(f"Failed to load WxAM decoder DLL: {dll_path} ({e})") + return None + + +def _wxgf_to_image_bytes(data: bytes) -> Optional[bytes]: + if not data or not data.startswith(b"wxgf"): + return None + fn = _get_wxam_decoder() + if fn is None: + return None + + max_output_size = 52 * 1024 * 1024 + for mode in (0, 3): + try: + config = _WxAMConfig() + config.mode = int(mode) + config.reserved = 0 + + input_buffer = ctypes.create_string_buffer(data, len(data)) + output_buffer = ctypes.create_string_buffer(max_output_size) + output_size = ctypes.c_int(max_output_size) + + result = fn( + ctypes.addressof(input_buffer), + int(len(data)), + ctypes.addressof(output_buffer), + ctypes.byref(output_size), + ctypes.addressof(config), + ) + if result != 0 or output_size.value <= 0: + continue + out = output_buffer.raw[: int(output_size.value)] + if _detect_image_media_type(out[:32]) != "application/octet-stream": + return out + except Exception: + continue + return None + + +def _try_strip_media_prefix(data: bytes) -> tuple[bytes, str]: + if not data: + return data, "application/octet-stream" + + try: + head = data[: min(len(data), 256 * 1024)] + except Exception: + head = data + + # wxgf container + try: + idx = head.find(b"wxgf") + except Exception: + idx = -1 + if idx >= 0 and idx <= 128 * 1024: + try: + payload = data[idx:] + converted = _wxgf_to_image_bytes(payload) + if converted: + mtw = _detect_image_media_type(converted[:32]) + if mtw != "application/octet-stream": + return converted, mtw + except Exception: + pass + + # common image/video headers with small prefix + sigs: list[tuple[bytes, str]] = [ + (b"\x89PNG\r\n\x1a\n", "image/png"), + (b"\xff\xd8\xff", "image/jpeg"), + (b"GIF87a", "image/gif"), + (b"GIF89a", "image/gif"), + ] + for sig, mt in sigs: + try: + j = head.find(sig) + except Exception: + j = -1 + if j >= 0 and j <= 128 * 1024: + sliced = data[j:] + mt2 = _detect_image_media_type(sliced[:32]) + if mt2 != "application/octet-stream": + return sliced, mt2 + + try: + j = head.find(b"RIFF") + except Exception: + j = -1 + if j >= 0 and j <= 128 * 1024: + sliced = data[j:] + try: + if len(sliced) >= 12 and sliced[8:12] == b"WEBP": + return sliced, "image/webp" + except Exception: + pass + + try: + j = head.find(b"ftyp") + except Exception: + j = -1 + if j >= 4 and j <= 128 * 1024: + sliced = data[j - 4 :] + try: + if len(sliced) >= 8 and sliced[4:8] == b"ftyp": + return sliced, "video/mp4" + except Exception: + pass + + return data, "application/octet-stream" + + +def _load_account_source_info(account_dir: Path) -> dict[str, Any]: + p = account_dir / "_source.json" + if not p.exists(): + return {} + try: + return json.loads(p.read_text(encoding="utf-8")) + except Exception: + return {} + + +def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]: + try: + home = Path.home() + except Exception: + return None + + roots = [ + home / "Documents" / "xwechat_files", + home / "Documents" / "WeChat Files", + ] + + # Exact match first + for root in roots: + c = root / account_name + try: + if c.exists() and c.is_dir(): + return c + except Exception: + continue + + # Then try prefix match: wxid_xxx_yyyy + for root in roots: + try: + if not root.exists() or not root.is_dir(): + continue + for p in root.iterdir(): + if not p.is_dir(): + continue + if p.name.startswith(account_name + "_"): + return p + except Exception: + continue + return None + + +def _resolve_account_wxid_dir(account_dir: Path) -> Optional[Path]: + info = _load_account_source_info(account_dir) + wxid_dir = str(info.get("wxid_dir") or "").strip() + if wxid_dir: + try: + p = Path(wxid_dir) + if p.exists() and p.is_dir(): + return p + except Exception: + pass + return _guess_wxid_dir_from_common_paths(account_dir.name) + + +def _resolve_account_db_storage_dir(account_dir: Path) -> Optional[Path]: + info = _load_account_source_info(account_dir) + db_storage_path = str(info.get("db_storage_path") or "").strip() + if db_storage_path: + try: + p = Path(db_storage_path) + if p.exists() and p.is_dir(): + return p + except Exception: + pass + + wxid_dir = _resolve_account_wxid_dir(account_dir) + if wxid_dir: + c = wxid_dir / "db_storage" + try: + if c.exists() and c.is_dir(): + return c + except Exception: + pass + return None + + +def _quote_ident(ident: str) -> str: + return '"' + ident.replace('"', '""') + '"' + + +def _resolve_hardlink_table_name(conn: sqlite3.Connection, prefix: str) -> Optional[str]: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE ? ORDER BY name DESC", + (f"{prefix}%",), + ).fetchall() + if not rows: + return None + return str(rows[0][0]) if rows[0] and rows[0][0] else None + + +def _resolve_hardlink_dir2id_table_name(conn: sqlite3.Connection) -> Optional[str]: + rows = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name LIKE 'dir2id%' ORDER BY name DESC" + ).fetchall() + if not rows: + return None + return str(rows[0][0]) if rows[0] and rows[0][0] else None + + +def _resolve_media_path_from_hardlink( + hardlink_db_path: Path, + wxid_dir: Path, + md5: str, + kind: str, + username: Optional[str], + extra_roots: Optional[list[Path]] = None, +) -> Optional[Path]: + if not hardlink_db_path.exists(): + return None + + kind_key = str(kind or "").lower().strip() + prefixes: list[str] + if kind_key == "image": + prefixes = ["image_hardlink_info"] + elif kind_key == "emoji": + prefixes = [ + "emoji_hardlink_info", + "emotion_hardlink_info", + "image_hardlink_info", + ] + elif kind_key == "video" or kind_key == "video_thumb": + prefixes = ["video_hardlink_info"] + elif kind_key == "file": + prefixes = ["file_hardlink_info"] + else: + return None + + conn = sqlite3.connect(str(hardlink_db_path)) + conn.row_factory = sqlite3.Row + try: + for prefix in prefixes: + table_name = _resolve_hardlink_table_name(conn, prefix) + if not table_name: + continue + + quoted = _quote_ident(table_name) + try: + row = conn.execute( + f"SELECT dir1, dir2, file_name FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1", + (md5,), + ).fetchone() + except Exception: + row = None + if not row: + continue + + dir1 = str(row["dir1"] or "").strip() + dir2 = str(row["dir2"] or "").strip() + file_name = str(row["file_name"] or "").strip() + if not dir1 or not dir2 or not file_name: + continue + + dir_name = dir2 + dir2id_table = _resolve_hardlink_dir2id_table_name(conn) + + if dir2id_table: + try: + drow = conn.execute( + f"SELECT username FROM {_quote_ident(dir2id_table)} WHERE rowid = ? LIMIT 1", + (int(dir2),), + ).fetchone() + if drow and drow[0]: + dir_name = str(drow[0]) + except Exception: + if username: + try: + drow = conn.execute( + f"SELECT dir_name FROM {_quote_ident(dir2id_table)} WHERE dir_id = ? AND username = ? LIMIT 1", + (dir2, username), + ).fetchone() + if drow and drow[0]: + dir_name = str(drow[0]) + except Exception: + pass + + roots: list[Path] = [] + for r in [wxid_dir] + (extra_roots or []): + if not r: + continue + try: + rr = r.resolve() + except Exception: + rr = r + if rr not in roots: + roots.append(rr) + + file_stem = Path(file_name).stem + file_variants = [file_name, f"{file_stem}_h.dat", f"{file_stem}_t.dat"] + + for root in roots: + for fv in file_variants: + p = (root / dir1 / dir_name / fv).resolve() + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + + if username: + chat_hash = hashlib.md5(username.encode()).hexdigest() + for fv in file_variants: + p = (root / "msg" / "attach" / chat_hash / dir_name / "Img" / fv).resolve() + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + + return None + finally: + conn.close() + + +@lru_cache(maxsize=4096) +def _fallback_search_media_by_md5(weixin_root_str: str, md5: str, kind: str = "") -> Optional[str]: + if not weixin_root_str or not md5: + return None + try: + root = Path(weixin_root_str) + except Exception: + return None + + kind_key = str(kind or "").lower().strip() + + def _fast_find_emoji_in_cache() -> Optional[str]: + md5_prefix = md5[:2] if len(md5) >= 2 else "" + if not md5_prefix: + return None + cache_root = root / "cache" + try: + if not cache_root.exists() or not cache_root.is_dir(): + return None + except Exception: + return None + + exact_names = [ + f"{md5}_h.dat", + f"{md5}_t.dat", + f"{md5}.dat", + f"{md5}.gif", + f"{md5}.webp", + f"{md5}.png", + f"{md5}.jpg", + ] + buckets = ["Emoticon", "emoticon", "Emoji", "emoji"] + + candidates: list[Path] = [] + try: + children = list(cache_root.iterdir()) + except Exception: + children = [] + + for child in children: + try: + if not child.is_dir(): + continue + except Exception: + continue + for bucket in buckets: + candidates.append(child / bucket / md5_prefix) + + for bucket in buckets: + candidates.append(cache_root / bucket / md5_prefix) + + seen: set[str] = set() + uniq: list[Path] = [] + for c in candidates: + try: + rc = str(c.resolve()) + except Exception: + rc = str(c) + if rc in seen: + continue + seen.add(rc) + uniq.append(c) + + for base in uniq: + try: + if not base.exists() or not base.is_dir(): + continue + except Exception: + continue + + for name in exact_names: + p = base / name + try: + if p.exists() and p.is_file(): + return str(p) + except Exception: + continue + + try: + for p in base.glob(f"{md5}*"): + try: + if p.is_file(): + return str(p) + except Exception: + continue + except Exception: + continue + return None + + # 根据类型选择搜索目录 + if kind_key == "file": + search_dirs = [root / "msg" / "file"] + elif kind_key == "emoji": + hit_fast = _fast_find_emoji_in_cache() + if hit_fast: + return hit_fast + search_dirs = [ + root / "msg" / "emoji", + root / "msg" / "emoticon", + root / "emoji", + root / "emoticon", + root / "msg" / "attach", + root / "msg" / "file", + root / "msg" / "video", + ] + else: + search_dirs = [ + root / "msg" / "attach", + root / "msg" / "file", + root / "msg" / "video", + root / "cache", + ] + + # 根据类型选择搜索模式 + if kind_key == "file": + patterns = [ + f"*{md5}*", + ] + elif kind_key == "emoji": + patterns = [ + f"{md5}_h.dat", + f"{md5}_t.dat", + f"{md5}.dat", + f"{md5}*.dat", + f"{md5}*.gif", + f"{md5}*.webp", + f"{md5}*.png", + f"{md5}*.jpg", + f"*{md5}*", + ] + else: + patterns = [ + f"{md5}_h.dat", + f"{md5}_t.dat", + f"{md5}.dat", + f"{md5}*.dat", + f"{md5}*.jpg", + f"{md5}*.jpeg", + f"{md5}*.png", + f"{md5}*.gif", + f"{md5}*.webp", + f"{md5}*.mp4", + ] + + for d in search_dirs: + try: + if not d.exists() or not d.is_dir(): + continue + except Exception: + continue + for pat in patterns: + try: + for p in d.rglob(pat): + try: + if p.is_file(): + return str(p) + except Exception: + continue + except Exception: + continue + return None + + +def _guess_media_type_by_path(path: Path, fallback: str = "application/octet-stream") -> str: + try: + mt = mimetypes.guess_type(str(path.name))[0] + if mt: + return mt + except Exception: + pass + return fallback + + +def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[str]]: + if not data: + return None, None + + # (offset, magic, media_type) + candidates: list[tuple[int, bytes, str]] = [ + (0, b"\x89PNG\r\n\x1a\n", "image/png"), + (0, b"\xff\xd8\xff", "image/jpeg"), + (0, b"GIF87a", "image/gif"), + (0, b"GIF89a", "image/gif"), + (0, b"wxgf", "application/octet-stream"), + (1, b"wxgf", "application/octet-stream"), + (2, b"wxgf", "application/octet-stream"), + (3, b"wxgf", "application/octet-stream"), + (4, b"wxgf", "application/octet-stream"), + (5, b"wxgf", "application/octet-stream"), + (6, b"wxgf", "application/octet-stream"), + (7, b"wxgf", "application/octet-stream"), + (8, b"wxgf", "application/octet-stream"), + (9, b"wxgf", "application/octet-stream"), + (10, b"wxgf", "application/octet-stream"), + (11, b"wxgf", "application/octet-stream"), + (12, b"wxgf", "application/octet-stream"), + (13, b"wxgf", "application/octet-stream"), + (14, b"wxgf", "application/octet-stream"), + (15, b"wxgf", "application/octet-stream"), + (0, b"RIFF", "application/octet-stream"), + (4, b"ftyp", "video/mp4"), + ] + + for offset, magic, mt in candidates: + if len(data) < offset + len(magic): + continue + key = data[offset] ^ magic[0] + ok = True + for i in range(len(magic)): + if (data[offset + i] ^ key) != magic[i]: + ok = False + break + if not ok: + continue + + decoded = bytes(b ^ key for b in data) + + if magic == b"wxgf": + try: + payload = decoded[offset:] if offset > 0 else decoded + converted = _wxgf_to_image_bytes(payload) + if converted: + mtw = _detect_image_media_type(converted[:32]) + if mtw != "application/octet-stream": + return converted, mtw + except Exception: + pass + continue + + if offset == 0 and magic == b"RIFF": + if len(decoded) >= 12 and decoded[8:12] == b"WEBP": + return decoded, "image/webp" + continue + + if mt == "application/octet-stream": + mt2 = _detect_image_media_type(decoded[:32]) + if mt2 != "application/octet-stream": + return decoded, mt2 + continue + + return decoded, mt + + preview_len = 8192 + try: + preview_len = min(int(preview_len), int(len(data))) + except Exception: + preview_len = 8192 + + if preview_len > 0: + for key in range(256): + try: + pv = bytes(b ^ key for b in data[:preview_len]) + except Exception: + continue + try: + scan = pv + if ( + (scan.find(b"wxgf") >= 0) + or (scan.find(b"\x89PNG\r\n\x1a\n") >= 0) + or (scan.find(b"\xff\xd8\xff") >= 0) + or (scan.find(b"GIF87a") >= 0) + or (scan.find(b"GIF89a") >= 0) + or (scan.find(b"RIFF") >= 0) + or (scan.find(b"ftyp") >= 0) + ): + decoded = bytes(b ^ key for b in data) + dec2, mt2 = _try_strip_media_prefix(decoded) + if mt2 != "application/octet-stream": + return dec2, mt2 + except Exception: + continue + + return None, None + + +def _detect_wechat_dat_version(data: bytes) -> int: + if not data or len(data) < 6: + return -1 + sig = data[:6] + if sig == b"\x07\x08V1\x08\x07": + return 1 + if sig == b"\x07\x08V2\x08\x07": + return 2 + return 0 + + +def _extract_yyyymm_for_sort(p: Path) -> str: + m = re.search(r"(\d{4}-\d{2})", str(p)) + return m.group(1) if m else "0000-00" + + +@lru_cache(maxsize=16) +def _get_wechat_template_most_common_last2(weixin_root_str: str) -> Optional[bytes]: + try: + root = Path(weixin_root_str) + if not root.exists() or not root.is_dir(): + return None + except Exception: + return None + + try: + template_files = list(root.rglob("*_t.dat")) + except Exception: + template_files = [] + + if not template_files: + return None + + template_files.sort(key=_extract_yyyymm_for_sort, reverse=True) + last_bytes_list: list[bytes] = [] + for file in template_files[:16]: + try: + with open(file, "rb") as f: + f.seek(-2, 2) + b2 = f.read(2) + if b2 and len(b2) == 2: + last_bytes_list.append(b2) + except Exception: + continue + + if not last_bytes_list: + return None + return Counter(last_bytes_list).most_common(1)[0][0] + + +@lru_cache(maxsize=16) +def _find_wechat_xor_key(weixin_root_str: str) -> Optional[int]: + try: + root = Path(weixin_root_str) + if not root.exists() or not root.is_dir(): + return None + except Exception: + return None + + most_common = _get_wechat_template_most_common_last2(weixin_root_str) + if not most_common or len(most_common) != 2: + return None + x, y = most_common[0], most_common[1] + xor_key = x ^ 0xFF + if xor_key != (y ^ 0xD9): + return None + return xor_key + + +def _get_wechat_v2_ciphertext(weixin_root: Path, most_common_last2: bytes) -> Optional[bytes]: + try: + template_files = list(weixin_root.rglob("*_t.dat")) + except Exception: + return None + if not template_files: + return None + + template_files.sort(key=_extract_yyyymm_for_sort, reverse=True) + sig = b"\x07\x08V2\x08\x07" + for file in template_files: + try: + with open(file, "rb") as f: + if f.read(6) != sig: + continue + f.seek(-2, 2) + if f.read(2) != most_common_last2: + continue + f.seek(0xF) + ct = f.read(16) + if ct and len(ct) == 16: + return ct + except Exception: + continue + return None + + +def _verify_wechat_aes_key(ciphertext: bytes, key16: bytes) -> bool: + try: + from Crypto.Cipher import AES + + cipher = AES.new(key16[:16], AES.MODE_ECB) + plain = cipher.decrypt(ciphertext) + if plain.startswith(b"\xff\xd8\xff"): + return True + if plain.startswith(b"\x89PNG\r\n\x1a\n"): + return True + return False + except Exception: + return False + + +class _MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", ctypes.c_ulong), + ("RegionSize", ctypes.c_size_t), + ("State", ctypes.c_ulong), + ("Protect", ctypes.c_ulong), + ("Type", ctypes.c_ulong), + ] + + +def _find_weixin_pid() -> Optional[int]: + if psutil is None: + return None + for p in psutil.process_iter(["name"]): + try: + name = (p.info.get("name") or "").lower() + if name in {"weixin.exe", "wechat.exe"}: + return int(p.pid) + except Exception: + continue + return None + + +def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]: + pid = _find_weixin_pid() + if not pid: + return None + + PROCESS_VM_READ = 0x0010 + PROCESS_QUERY_INFORMATION = 0x0400 + MEM_COMMIT = 0x1000 + MEM_PRIVATE = 0x20000 + + kernel32 = ctypes.windll.kernel32 + + OpenProcess = kernel32.OpenProcess + OpenProcess.argtypes = [ctypes.c_ulong, ctypes.c_bool, ctypes.c_ulong] + OpenProcess.restype = ctypes.c_void_p + + ReadProcessMemory = kernel32.ReadProcessMemory + ReadProcessMemory.argtypes = [ + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_void_p, + ctypes.c_size_t, + ctypes.POINTER(ctypes.c_size_t), + ] + ReadProcessMemory.restype = ctypes.c_bool + + VirtualQueryEx = kernel32.VirtualQueryEx + VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t] + VirtualQueryEx.restype = ctypes.c_size_t + + CloseHandle = kernel32.CloseHandle + CloseHandle.argtypes = [ctypes.c_void_p] + CloseHandle.restype = ctypes.c_bool + + handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) + if not handle: + return None + + stop = threading.Event() + result: list[Optional[bytes]] = [None] + pattern = re.compile(rb"[^a-z0-9]([a-z0-9]{32})[^a-z0-9]", flags=re.IGNORECASE) + + def read_mem(addr: int, size: int) -> Optional[bytes]: + buf = ctypes.create_string_buffer(size) + read = ctypes.c_size_t(0) + ok = ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, size, ctypes.byref(read)) + if not ok or read.value <= 0: + return None + return buf.raw[: read.value] + + def scan_region(base: int, region_size: int) -> Optional[bytes]: + chunk = 4 * 1024 * 1024 + offset = 0 + tail = b"" + while offset < region_size and not stop.is_set(): + to_read = min(chunk, region_size - offset) + b = read_mem(base + offset, int(to_read)) + if not b: + return None + data = tail + b + for m in pattern.finditer(data): + cand32 = m.group(1) + cand16 = cand32[:16] + if _verify_wechat_aes_key(ciphertext, cand16): + return cand16 + tail = data[-64:] if len(data) > 64 else data + offset += to_read + return None + + regions: list[tuple[int, int]] = [] + mbi = _MEMORY_BASIC_INFORMATION() + addr = 0 + try: + while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)): + try: + if int(mbi.State) == MEM_COMMIT and int(mbi.Type) == MEM_PRIVATE: + base = int(mbi.BaseAddress) + size = int(mbi.RegionSize) + if size > 0: + regions.append((base, size)) + addr = int(mbi.BaseAddress) + int(mbi.RegionSize) + except Exception: + addr += 0x1000 + if addr <= 0: + break + + with ThreadPoolExecutor(max_workers=min(32, max(1, len(regions)))) as ex: + for found in ex.map(lambda r: scan_region(r[0], r[1]), regions): + if found: + result[0] = found + stop.set() + break + finally: + CloseHandle(handle) + + return result[0] + + +def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None: + try: + payload = { + "xor": int(xor_key), + "aes": aes_key16.decode("ascii", errors="ignore"), + } + (account_dir / "_media_keys.json").write_text( + json.dumps(payload, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except Exception: + pass + + +def _decrypt_wechat_dat_v3(data: bytes, xor_key: int) -> bytes: + return bytes(b ^ xor_key for b in data) + + +def _decrypt_wechat_dat_v4(data: bytes, xor_key: int, aes_key: bytes) -> bytes: + from Crypto.Cipher import AES + from Crypto.Util import Padding + + header, rest = data[:0xF], data[0xF:] + signature, aes_size, xor_size = struct.unpack("<6sLLx", header) + aes_size += AES.block_size - aes_size % AES.block_size + + aes_data = rest[:aes_size] + raw_data = rest[aes_size:] + + cipher = AES.new(aes_key[:16], AES.MODE_ECB) + decrypted_data = Padding.unpad(cipher.decrypt(aes_data), AES.block_size) + + if xor_size > 0: + raw_data = rest[aes_size:-xor_size] + xor_data = rest[-xor_size:] + xored_data = bytes(b ^ xor_key for b in xor_data) + else: + xored_data = b"" + + return decrypted_data + raw_data + xored_data + + +def _load_media_keys(account_dir: Path) -> dict[str, Any]: + p = account_dir / "_media_keys.json" + if not p.exists(): + return {} + try: + return json.loads(p.read_text(encoding="utf-8")) + except Exception: + return {} + + +def _get_resource_dir(account_dir: Path) -> Path: + """获取解密资源输出目录""" + return account_dir / "resource" + + +def _get_decrypted_resource_path(account_dir: Path, md5: str, ext: str = "") -> Path: + """根据MD5获取解密后资源的路径""" + resource_dir = _get_resource_dir(account_dir) + # 使用MD5前2位作为子目录,避免单目录文件过多 + sub_dir = md5[:2] if len(md5) >= 2 else "00" + if ext: + return resource_dir / sub_dir / f"{md5}.{ext}" + return resource_dir / sub_dir / md5 + + +def _detect_image_extension(data: bytes) -> str: + """根据图片数据检测文件扩展名""" + if not data: + return "dat" + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return "png" + if data.startswith(b"\xff\xd8\xff"): + return "jpg" + if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"): + return "gif" + if data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP": + return "webp" + return "dat" + + +def _try_find_decrypted_resource(account_dir: Path, md5: str) -> Optional[Path]: + """尝试在解密资源目录中查找已解密的资源""" + if not md5: + return None + resource_dir = _get_resource_dir(account_dir) + if not resource_dir.exists(): + return None + sub_dir = md5[:2] if len(md5) >= 2 else "00" + target_dir = resource_dir / sub_dir + if not target_dir.exists(): + return None + # 查找匹配MD5的文件(可能有不同扩展名) + for ext in ["jpg", "png", "gif", "webp", "mp4", "dat"]: + p = target_dir / f"{md5}.{ext}" + if p.exists(): + return p + return None + + +def _read_and_maybe_decrypt_media( + path: Path, + account_dir: Optional[Path] = None, + weixin_root: Optional[Path] = None, +) -> tuple[bytes, str]: + # Fast path: already a normal image + with open(path, "rb") as f: + head = f.read(64) + + mt = _detect_image_media_type(head) + if mt != "application/octet-stream": + return path.read_bytes(), mt + + if head.startswith(b"wxgf"): + data0 = path.read_bytes() + converted0 = _wxgf_to_image_bytes(data0) + if converted0: + mt0 = _detect_image_media_type(converted0[:32]) + if mt0 != "application/octet-stream": + return converted0, mt0 + + try: + idx = head.find(b"wxgf") + except Exception: + idx = -1 + if 0 < idx <= 4: + try: + data0 = path.read_bytes() + payload0 = data0[idx:] + converted0 = _wxgf_to_image_bytes(payload0) + if converted0: + mt0 = _detect_image_media_type(converted0[:32]) + if mt0 != "application/octet-stream": + return converted0, mt0 + except Exception: + pass + + try: + data_pref = path.read_bytes() + stripped, mtp = _try_strip_media_prefix(data_pref) + if mtp != "application/octet-stream": + return stripped, mtp + except Exception: + pass + + data = path.read_bytes() + + dec, mt2 = _try_xor_decrypt_by_magic(data) + if dec is not None and mt2: + return dec, mt2 + + # Try WeChat .dat v1/v2 decrypt. + version = _detect_wechat_dat_version(data) + if version in (0, 1, 2): + root = weixin_root + if root is None and account_dir is not None: + root = _resolve_account_wxid_dir(account_dir) + if root is None and account_dir is not None: + ds = _resolve_account_db_storage_dir(account_dir) + root = ds.parent if ds else None + + xor_key = _find_wechat_xor_key(str(root)) if root else None + if xor_key is None and account_dir is not None: + try: + keys2 = _load_media_keys(account_dir) + x2 = keys2.get("xor") + if x2 is not None: + xor_key = int(x2) + if not (0 <= int(xor_key) <= 255): + xor_key = None + else: + logger.debug("使用 _media_keys.json 中保存的 xor key") + except Exception: + xor_key = None + try: + if version == 0 and xor_key is not None: + out = _decrypt_wechat_dat_v3(data, xor_key) + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") + mt0 = _detect_image_media_type(out[:32]) + if mt0 != "application/octet-stream": + return out, mt0 + elif version == 1 and xor_key is not None: + out = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef") + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") + mt1 = _detect_image_media_type(out[:32]) + if mt1 != "application/octet-stream": + return out, mt1 + elif version == 2 and xor_key is not None and account_dir is not None and root is not None: + keys = _load_media_keys(account_dir) + aes_str = str(keys.get("aes") or "").strip() + aes_key16 = aes_str.encode("ascii", errors="ignore")[:16] if aes_str else b"" + + if not aes_key16: + most_common = _get_wechat_template_most_common_last2(str(root)) + if most_common: + ct = _get_wechat_v2_ciphertext(Path(root), most_common) + else: + ct = None + + if ct: + aes_key16 = _extract_wechat_aes_key_from_process(ct) or b"" + if aes_key16: + _save_media_keys(account_dir, xor_key, aes_key16) + + if aes_key16: + out = _decrypt_wechat_dat_v4(data, xor_key, aes_key16) + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") + mt2b = _detect_image_media_type(out[:32]) + if mt2b != "application/octet-stream": + return out, mt2b + except Exception: + pass + + # Fallback: return as-is. + mt3 = _guess_media_type_by_path(path, fallback="application/octet-stream") + return data, mt3 + + +def _ensure_decrypted_resource_for_md5( + account_dir: Path, + md5: str, + source_path: Path, + weixin_root: Optional[Path] = None, +) -> Optional[Path]: + if not md5 or not source_path: + return None + + md5_lower = str(md5).lower() + existing = _try_find_decrypted_resource(account_dir, md5_lower) + if existing: + return existing + + try: + if not source_path.exists() or not source_path.is_file(): + return None + except Exception: + return None + + data, mt0 = _read_and_maybe_decrypt_media(source_path, account_dir=account_dir, weixin_root=weixin_root) + mt2 = str(mt0 or "").strip() + if (not mt2) or mt2 == "application/octet-stream": + mt2 = _detect_image_media_type(data[:32]) + if mt2 == "application/octet-stream": + try: + data2, mtp = _try_strip_media_prefix(data) + if mtp != "application/octet-stream": + data = data2 + mt2 = mtp + except Exception: + pass + if mt2 == "application/octet-stream": + try: + if len(data) >= 8 and data[4:8] == b"ftyp": + mt2 = "video/mp4" + except Exception: + pass + if mt2 == "application/octet-stream": + return None + + if str(mt2).startswith("image/"): + ext = _detect_image_extension(data) + elif str(mt2) == "video/mp4": + ext = "mp4" + else: + ext = Path(str(source_path.name)).suffix.lstrip(".").lower() or "dat" + output_path = _get_decrypted_resource_path(account_dir, md5_lower, ext) + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + if not output_path.exists(): + output_path.write_bytes(data) + except Exception: + return None + + return output_path + + +def _collect_all_dat_files(wxid_dir: Path) -> list[tuple[Path, str]]: + """收集所有需要解密的.dat文件,返回 (文件路径, md5) 列表""" + results: list[tuple[Path, str]] = [] + if not wxid_dir or not wxid_dir.exists(): + return results + + # 搜索目录 + search_dirs = [ + wxid_dir / "msg" / "attach", + wxid_dir / "cache", + ] + + for search_dir in search_dirs: + if not search_dir.exists(): + continue + try: + for dat_file in search_dir.rglob("*.dat"): + if not dat_file.is_file(): + continue + # 从文件名提取MD5 + stem = dat_file.stem + # 文件名格式可能是: md5.dat, md5_t.dat, md5_h.dat 等 + md5 = stem.split("_")[0] if "_" in stem else stem + # 验证是否是有效的MD5(32位十六进制) + if len(md5) == 32 and all(c in "0123456789abcdefABCDEF" for c in md5): + results.append((dat_file, md5.lower())) + except Exception as e: + logger.warning(f"扫描目录失败 {search_dir}: {e}") + + return results + + +def _decrypt_and_save_resource( + dat_path: Path, + md5: str, + account_dir: Path, + xor_key: int, + aes_key: Optional[bytes], +) -> tuple[bool, str]: + """解密单个资源文件并保存到resource目录 + + Returns: + (success, message) + """ + try: + data = dat_path.read_bytes() + if not data: + return False, "文件为空" + + version = _detect_wechat_dat_version(data) + decrypted: Optional[bytes] = None + + if version == 0: + # V3: 纯XOR解密 + decrypted = _decrypt_wechat_dat_v3(data, xor_key) + elif version == 1: + # V4-V1: 使用固定AES密钥 + decrypted = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef") + elif version == 2: + # V4-V2: 需要动态AES密钥 + if aes_key and len(aes_key) >= 16: + decrypted = _decrypt_wechat_dat_v4(data, xor_key, aes_key[:16]) + else: + return False, "V4-V2版本需要AES密钥" + else: + # 尝试简单XOR解密 + dec, mt = _try_xor_decrypt_by_magic(data) + if dec: + decrypted = dec + else: + return False, f"未知加密版本: {version}" + + if not decrypted: + return False, "解密结果为空" + + if decrypted.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(decrypted) + if converted: + decrypted = converted + + # 检测图片类型 + ext = _detect_image_extension(decrypted) + mt = _detect_image_media_type(decrypted[:32]) + if mt == "application/octet-stream": + # 解密可能失败,跳过 + return False, "解密后非有效图片" + + # 保存到resource目录 + output_path = _get_decrypted_resource_path(account_dir, md5, ext) + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(decrypted) + + return True, str(output_path) + except Exception as e: + return False, str(e) + + +def _convert_silk_to_wav(silk_data: bytes) -> bytes: + """Convert SILK audio data to WAV format for browser playback.""" + import tempfile + + try: + import pilk + except ImportError: + # If pilk not installed, return original data + return silk_data + + try: + # pilk.silk_to_wav works with file paths, so use temp files + with tempfile.NamedTemporaryFile(suffix=".silk", delete=False) as silk_file: + silk_file.write(silk_data) + silk_path = silk_file.name + + wav_path = silk_path.replace(".silk", ".wav") + + try: + pilk.silk_to_wav(silk_path, wav_path, rate=24000) + with open(wav_path, "rb") as wav_file: + wav_data = wav_file.read() + return wav_data + finally: + # Clean up temp files + import os + + try: + os.unlink(silk_path) + except Exception: + pass + try: + os.unlink(wav_path) + except Exception: + pass + except Exception as e: + logger.warning(f"SILK to WAV conversion failed: {e}") + return silk_data + + +def _resolve_media_path_for_kind( + account_dir: Path, + kind: str, + md5: str, + username: Optional[str], +) -> Optional[Path]: + if not md5: + return None + + kind_key = str(kind or "").strip().lower() + + # 优先查找解密后的资源目录(图片、表情、视频缩略图) + if kind_key in {"image", "emoji", "video_thumb"}: + decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower()) + if decrypted_path: + logger.debug(f"找到解密资源: {decrypted_path}") + return decrypted_path + + # 回退到原始逻辑:从微信数据目录查找 + wxid_dir = _resolve_account_wxid_dir(account_dir) + hardlink_db_path = account_dir / "hardlink.db" + db_storage_dir = _resolve_account_db_storage_dir(account_dir) + + roots: list[Path] = [] + if wxid_dir: + roots.append(wxid_dir) + roots.append(wxid_dir / "msg" / "attach") + roots.append(wxid_dir / "msg" / "file") + roots.append(wxid_dir / "msg" / "video") + roots.append(wxid_dir / "cache") + if db_storage_dir: + roots.append(db_storage_dir) + if not roots: + return None + + p = _resolve_media_path_from_hardlink( + hardlink_db_path, + roots[0], + md5=str(md5), + kind=str(kind), + username=username, + extra_roots=roots[1:], + ) + if (not p) and wxid_dir: + hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5), kind=kind_key) + if hit: + p = Path(hit) + return p + + +def _pick_best_emoji_source_path(resolved: Path, md5: str) -> Optional[Path]: + if not resolved: + return None + try: + if resolved.exists() and resolved.is_file(): + return resolved + except Exception: + pass + + try: + if not (resolved.exists() and resolved.is_dir()): + return None + except Exception: + return None + + md5s = str(md5 or "").lower().strip() + if not md5s: + return None + + candidates = [ + f"{md5s}_h.dat", + f"{md5s}_t.dat", + f"{md5s}.dat", + ] + exts = ["gif", "webp", "png", "jpg", "jpeg"] + for ext in exts: + candidates.append(f"{md5s}.{ext}") + + for name in candidates: + p = resolved / name + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + + patterns = [f"{md5s}*.dat", f"{md5s}*", f"*{md5s}*"] + for pat in patterns: + try: + for p in resolved.glob(pat): + try: + if p.is_file(): + return p + except Exception: + continue + except Exception: + continue + return None + + +def _iter_emoji_source_candidates(resolved: Path, md5: str, limit: int = 20) -> list[Path]: + md5s = str(md5 or "").lower().strip() + if not md5s: + return [] + + best = _pick_best_emoji_source_path(resolved, md5s) + out: list[Path] = [] + if best: + out.append(best) + + try: + if not (resolved.exists() and resolved.is_dir()): + return out + except Exception: + return out + + try: + files = [p for p in resolved.iterdir() if p.is_file()] + except Exception: + files = [] + + def score(p: Path) -> tuple[int, int, int]: + name = str(p.name).lower() + contains = 1 if md5s in name else 0 + ext = str(p.suffix).lower().lstrip(".") + ext_rank = 0 + if ext == "dat": + ext_rank = 3 + elif ext in {"gif", "webp"}: + ext_rank = 2 + elif ext in {"png", "jpg", "jpeg"}: + ext_rank = 1 + try: + sz = int(p.stat().st_size) + except Exception: + sz = 0 + return (contains, ext_rank, sz) + + files_sorted = sorted(files, key=score, reverse=True) + for p in files_sorted: + if p not in out: + out.append(p) + if len(out) >= int(limit): + break + return out diff --git a/src/wechat_decrypt_tool/path_fix.py b/src/wechat_decrypt_tool/path_fix.py new file mode 100644 index 0000000..c0ddf65 --- /dev/null +++ b/src/wechat_decrypt_tool/path_fix.py @@ -0,0 +1,205 @@ +import json +import os +import re +from typing import Callable, Optional + +from fastapi import HTTPException, Request +from fastapi.routing import APIRoute + +from .logging_config import get_logger + +logger = get_logger(__name__) + + +class PathFixRequest(Request): + """自定义Request类,自动修复JSON中的路径问题并检测相对路径""" + + def _is_absolute_path(self, path: str) -> bool: + """检测是否为绝对路径,支持Windows、macOS、Linux""" + if not path: + return False + + # Windows绝对路径:以盘符开头 (C:\, D:\, etc.) + if re.match(r'^[A-Za-z]:[/\\]', path): + return True + + # Unix-like系统绝对路径:以 / 开头 + if path.startswith('/'): + return True + + return False + + def _validate_paths_in_json(self, json_data: dict) -> Optional[str]: + """验证JSON中的路径,返回错误信息(如果有)""" + logger.info(f"开始验证路径,JSON数据: {json_data}") + # 检查db_storage_path字段(现在是必需的) + if 'db_storage_path' not in json_data: + return "缺少必需的db_storage_path参数,请提供具体的数据库存储路径。" + + if 'db_storage_path' in json_data: + path = json_data['db_storage_path'] + + # 检查路径是否为空 + if not path or not path.strip(): + return "db_storage_path参数不能为空,请提供具体的数据库存储路径。" + + logger.info(f"检查路径: {path}") + is_absolute = self._is_absolute_path(path) + logger.info(f"是否为绝对路径: {is_absolute}") + if not is_absolute: + error_msg = f"请提供绝对路径,当前输入的是相对路径: {path}。\n" \ + f"Windows绝对路径示例: D:\\wechatMSG\\xwechat_files\\wxid_xxx\\db_storage" + return error_msg + + # 检查路径是否存在 + logger.info(f"检查路径是否存在: {path}") + path_exists = os.path.exists(path) + logger.info(f"路径存在性: {path_exists}") + if not path_exists: + # 检查父目录 + parent_path = os.path.dirname(path) + logger.info(f"检查父目录: {parent_path}") + parent_exists = os.path.exists(parent_path) + logger.info(f"父目录存在性: {parent_exists}") + if parent_exists: + try: + files = os.listdir(parent_path) + logger.info(f"父目录内容: {files}") + error_msg = f"指定的路径不存在: {path}\n" \ + f"父目录存在但不包含 'db_storage' 文件夹。\n" \ + f"请检查路径是否正确,或确保微信数据已生成。" + except PermissionError: + logger.info(f"无法访问父目录,权限不足") + error_msg = f"指定的路径不存在: {path}\n" \ + f"无法访问父目录,可能是权限问题。" + else: + error_msg = f"指定的路径不存在: {path}\n" \ + f"父目录也不存在,请检查路径是否正确。" + logger.info(f"返回路径错误: {error_msg}") + return error_msg + else: + logger.info(f"路径存在,使用递归方式检查数据库文件") + try: + # 使用与自动检测相同的逻辑:递归查找.db文件 + db_files = [] + for root, dirs, files in os.walk(path): + # 只处理db_storage目录下的数据库文件(与自动检测逻辑一致) + if "db_storage" not in root: + continue + for file_name in files: + if not file_name.endswith(".db"): + continue + # 排除不需要解密的数据库(与自动检测逻辑一致) + if file_name in ["key_info.db"]: + continue + db_path = os.path.join(root, file_name) + db_files.append(db_path) + + logger.info(f"递归查找到的数据库文件: {db_files}") + if not db_files: + error_msg = f"路径存在但没有找到有效的数据库文件: {path}\n" \ + f"请确保该目录或其子目录包含微信数据库文件(.db文件)。\n" \ + f"注意:key_info.db文件会被自动排除。" + logger.info(f"返回错误: 递归查找未找到有效.db文件") + return error_msg + logger.info(f"路径验证通过,递归找到{len(db_files)}个有效数据库文件") + except PermissionError: + error_msg = f"无法访问路径: {path}\n" \ + f"权限不足,请检查文件夹权限。" + return error_msg + except Exception as e: + logger.warning(f"检查路径内容时出错: {e}") + # 如果无法检查内容,继续执行,让后续逻辑处理 + + return None + + async def body(self) -> bytes: + """重写body方法,预处理JSON中的路径问题""" + body = await super().body() + + # 只处理JSON请求 + content_type = self.headers.get("content-type", "") + if "application/json" not in content_type: + return body + + try: + # 将bytes转换为字符串 + body_str = body.decode('utf-8') + + # 首先尝试解析JSON以验证路径 + try: + json_data = json.loads(body_str) + path_error = self._validate_paths_in_json(json_data) + if path_error: + logger.info(f"检测到路径错误: {path_error}") + # 我们将错误信息存储在请求中,稍后在路由处理器中检查 + self.state.path_validation_error = path_error + return body + except json.JSONDecodeError as e: + # JSON格式错误,继续尝试修复 + logger.info(f"JSON解析失败,尝试修复: {e}") + pass + + # 使用正则表达式安全地处理Windows路径中的反斜杠 + # 需要处理两种情况: + # 1. 以盘符开头的绝对路径:D:\path\to\file + # 2. 不以盘符开头的相对路径:wechatMSG\xwechat_files\... + + # 匹配引号内包含反斜杠的路径(不管是否以盘符开头) + pattern = r'"([^"]*?\\[^"]*?)"' + + def fix_path(match): + path = match.group(1) + # 将单个反斜杠替换为双反斜杠,但避免替换已经转义的反斜杠 + fixed_path = re.sub(r'(? {fixed_body_str[:100]}...") + + # 修复后重新验证路径 + try: + json_data = json.loads(fixed_body_str) + logger.info(f"修复后解析JSON成功,开始验证路径") + path_error = self._validate_paths_in_json(json_data) + if path_error: + logger.info(f"修复后检测到路径错误: {path_error}") + self.state.path_validation_error = path_error + return fixed_body_str.encode('utf-8') + else: + logger.info(f"修复后路径验证通过") + except json.JSONDecodeError as e: + logger.warning(f"修复后JSON仍然解析失败: {e}") + + return fixed_body_str.encode('utf-8') + + except Exception as e: + # 如果处理失败,返回原始body + logger.warning(f"JSON路径修复失败,使用原始请求体: {e}") + return body + + +class PathFixRoute(APIRoute): + """自定义APIRoute类,使用PathFixRequest并处理路径验证错误""" + + def get_route_handler(self) -> Callable: + original_route_handler = super().get_route_handler() + + async def custom_route_handler(request: Request) -> any: + # 将Request替换为我们的自定义Request + custom_request = PathFixRequest(request.scope, request.receive) + + # 检查是否有路径验证错误 + if hasattr(custom_request.state, 'path_validation_error'): + raise HTTPException( + status_code=400, + detail=custom_request.state.path_validation_error, + ) + + return await original_route_handler(custom_request) + + return custom_route_handler