diff --git a/pyproject.toml b/pyproject.toml index 191a4c5..ce86e93 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -32,3 +32,4 @@ build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] packages = ["src/wechat_decrypt_tool"] +include = ["src/wechat_decrypt_tool/native/VoipEngine.dll"] diff --git a/src/wechat_decrypt_tool/api.py b/src/wechat_decrypt_tool/api.py index 40b6340..1e21dce 100644 --- a/src/wechat_decrypt_tool/api.py +++ b/src/wechat_decrypt_tool/api.py @@ -20,6 +20,9 @@ from functools import lru_cache from pathlib import Path from typing import Optional, Callable, Any from urllib.parse import quote +from urllib.parse import urlparse +import ipaddress +import requests try: import zstandard as zstd # type: ignore @@ -48,6 +51,8 @@ logger = get_logger(__name__) # 仓库根目录(用于定位 output/databases) _REPO_ROOT = Path(__file__).resolve().parents[2] _OUTPUT_DATABASES_DIR = _REPO_ROOT / "output" / "databases" +_PACKAGE_ROOT = Path(__file__).resolve().parent +_DEBUG_SESSIONS = os.environ.get("WECHAT_TOOL_DEBUG_SESSIONS", "0") == "1" def _list_decrypted_accounts() -> list[str]: @@ -284,6 +289,76 @@ def _detect_image_media_type(data: bytes) -> str: return "application/octet-stream" +def _try_strip_media_prefix(data: bytes) -> tuple[bytes, str]: + if not data: + return data, "application/octet-stream" + + try: + head = data[: min(len(data), 256 * 1024)] + except Exception: + head = data + + # wxgf container + try: + idx = head.find(b"wxgf") + except Exception: + idx = -1 + if idx >= 0 and idx <= 128 * 1024: + try: + payload = data[idx:] + converted = _wxgf_to_image_bytes(payload) + if converted: + mtw = _detect_image_media_type(converted[:32]) + if mtw != "application/octet-stream": + return converted, mtw + except Exception: + pass + + # common image/video headers with small prefix + sigs: list[tuple[bytes, str]] = [ + (b"\x89PNG\r\n\x1a\n", "image/png"), + (b"\xff\xd8\xff", "image/jpeg"), + (b"GIF87a", "image/gif"), + (b"GIF89a", "image/gif"), + ] + for sig, mt in sigs: + try: + j = head.find(sig) + except Exception: + j = -1 + if j >= 0 and j <= 128 * 1024: + sliced = data[j:] + mt2 = _detect_image_media_type(sliced[:32]) + if mt2 != "application/octet-stream": + return sliced, mt2 + + try: + j = head.find(b"RIFF") + except Exception: + j = -1 + if j >= 0 and j <= 128 * 1024: + sliced = data[j:] + try: + if len(sliced) >= 12 and sliced[8:12] == b"WEBP": + return sliced, "image/webp" + except Exception: + pass + + try: + j = head.find(b"ftyp") + except Exception: + j = -1 + if j >= 4 and j <= 128 * 1024: + sliced = data[j - 4 :] + try: + if len(sliced) >= 8 and sliced[4:8] == b"ftyp": + return sliced, "video/mp4" + except Exception: + pass + + return data, "application/octet-stream" + + def _load_account_source_info(account_dir: Path) -> dict[str, Any]: p = account_dir / "_source.json" if not p.exists(): @@ -395,100 +470,105 @@ def _resolve_media_path_from_hardlink( return None kind_key = str(kind or "").lower().strip() - if kind_key == "image" or kind_key == "emoji": - prefix = "image_hardlink_info" + prefixes: list[str] + if kind_key == "image": + prefixes = ["image_hardlink_info"] + elif kind_key == "emoji": + prefixes = [ + "emoji_hardlink_info", + "emotion_hardlink_info", + "image_hardlink_info", + ] elif kind_key == "video" or kind_key == "video_thumb": - prefix = "video_hardlink_info" + prefixes = ["video_hardlink_info"] elif kind_key == "file": - prefix = "file_hardlink_info" + prefixes = ["file_hardlink_info"] else: return None conn = sqlite3.connect(str(hardlink_db_path)) conn.row_factory = sqlite3.Row try: - table_name = _resolve_hardlink_table_name(conn, prefix) - if not table_name: - return None - - quoted = _quote_ident(table_name) - row = conn.execute( - f"SELECT dir1, dir2, file_name FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1", - (md5,), - ).fetchone() - if not row: - return None - - dir1 = str(row["dir1"] or "").strip() - dir2 = str(row["dir2"] or "").strip() - file_name = str(row["file_name"] or "").strip() - if not dir1 or not dir2 or not file_name: - return None - - dir_name = dir2 - dir2id_table = _resolve_hardlink_dir2id_table_name(conn) - - # WeChat 4.x: dir2id table only has 'username' column, use rowid to lookup - if dir2id_table: - try: - # First try WeChat 4.x schema: lookup by rowid - drow = conn.execute( - f"SELECT username FROM {_quote_ident(dir2id_table)} WHERE rowid = ? LIMIT 1", - (int(dir2),), - ).fetchone() - if drow and drow[0]: - dir_name = str(drow[0]) - except Exception: - # Fallback to old schema with dir_id and username columns - if username: - try: - drow = conn.execute( - f"SELECT dir_name FROM {_quote_ident(dir2id_table)} WHERE dir_id = ? AND username = ? LIMIT 1", - (dir2, username), - ).fetchone() - if drow and drow[0]: - dir_name = str(drow[0]) - except Exception: - pass - - roots: list[Path] = [] - for r in [wxid_dir] + (extra_roots or []): - if not r: + for prefix in prefixes: + table_name = _resolve_hardlink_table_name(conn, prefix) + if not table_name: continue - try: - rr = r.resolve() - except Exception: - rr = r - if rr not in roots: - roots.append(rr) - # Try multiple path patterns for different WeChat versions - file_stem = Path(file_name).stem - file_variants = [file_name, f"{file_stem}_h.dat", f"{file_stem}_t.dat"] - - for root in roots: - # Pattern 1: Old structure - {root}/{dir1}/{dir_name}/{file} - for fv in file_variants: - p = (root / dir1 / dir_name / fv).resolve() + quoted = _quote_ident(table_name) + try: + row = conn.execute( + f"SELECT dir1, dir2, file_name FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1", + (md5,), + ).fetchone() + except Exception: + row = None + if not row: + continue + + dir1 = str(row["dir1"] or "").strip() + dir2 = str(row["dir2"] or "").strip() + file_name = str(row["file_name"] or "").strip() + if not dir1 or not dir2 or not file_name: + continue + + dir_name = dir2 + dir2id_table = _resolve_hardlink_dir2id_table_name(conn) + + if dir2id_table: try: - if p.exists() and p.is_file(): - return p + drow = conn.execute( + f"SELECT username FROM {_quote_ident(dir2id_table)} WHERE rowid = ? LIMIT 1", + (int(dir2),), + ).fetchone() + if drow and drow[0]: + dir_name = str(drow[0]) except Exception: + if username: + try: + drow = conn.execute( + f"SELECT dir_name FROM {_quote_ident(dir2id_table)} WHERE dir_id = ? AND username = ? LIMIT 1", + (dir2, username), + ).fetchone() + if drow and drow[0]: + dir_name = str(drow[0]) + except Exception: + pass + + roots: list[Path] = [] + for r in [wxid_dir] + (extra_roots or []): + if not r: continue - - # Pattern 2: WeChat 4.x - {root}/msg/attach/{chat_hash}/{dir_name}/Img/{file} - # chat_hash is MD5 of the username/chat_id - if username: - import hashlib - chat_hash = hashlib.md5(username.encode()).hexdigest() + try: + rr = r.resolve() + except Exception: + rr = r + if rr not in roots: + roots.append(rr) + + file_stem = Path(file_name).stem + file_variants = [file_name, f"{file_stem}_h.dat", f"{file_stem}_t.dat"] + + for root in roots: for fv in file_variants: - p = (root / "msg" / "attach" / chat_hash / dir_name / "Img" / fv).resolve() + p = (root / dir1 / dir_name / fv).resolve() try: if p.exists() and p.is_file(): return p except Exception: continue + if username: + import hashlib + + chat_hash = hashlib.md5(username.encode()).hexdigest() + for fv in file_variants: + p = (root / "msg" / "attach" / chat_hash / dir_name / "Img" / fv).resolve() + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + return None finally: conn.close() @@ -504,10 +584,101 @@ def _fallback_search_media_by_md5(weixin_root_str: str, md5: str, kind: str = "" return None kind_key = str(kind or "").lower().strip() + + def _fast_find_emoji_in_cache() -> Optional[str]: + md5_prefix = md5[:2] if len(md5) >= 2 else "" + if not md5_prefix: + return None + cache_root = root / "cache" + try: + if not cache_root.exists() or not cache_root.is_dir(): + return None + except Exception: + return None + + exact_names = [ + f"{md5}_h.dat", + f"{md5}_t.dat", + f"{md5}.dat", + f"{md5}.gif", + f"{md5}.webp", + f"{md5}.png", + f"{md5}.jpg", + ] + buckets = ["Emoticon", "emoticon", "Emoji", "emoji"] + + candidates: list[Path] = [] + try: + children = list(cache_root.iterdir()) + except Exception: + children = [] + + for child in children: + try: + if not child.is_dir(): + continue + except Exception: + continue + for bucket in buckets: + candidates.append(child / bucket / md5_prefix) + + for bucket in buckets: + candidates.append(cache_root / bucket / md5_prefix) + + seen: set[str] = set() + uniq: list[Path] = [] + for c in candidates: + try: + rc = str(c.resolve()) + except Exception: + rc = str(c) + if rc in seen: + continue + seen.add(rc) + uniq.append(c) + + for base in uniq: + try: + if not base.exists() or not base.is_dir(): + continue + except Exception: + continue + + for name in exact_names: + p = base / name + try: + if p.exists() and p.is_file(): + return str(p) + except Exception: + continue + + try: + for p in base.glob(f"{md5}*"): + try: + if p.is_file(): + return str(p) + except Exception: + continue + except Exception: + continue + return None # 根据类型选择搜索目录 if kind_key == "file": search_dirs = [root / "msg" / "file"] + elif kind_key == "emoji": + hit_fast = _fast_find_emoji_in_cache() + if hit_fast: + return hit_fast + search_dirs = [ + root / "msg" / "emoji", + root / "msg" / "emoticon", + root / "emoji", + root / "emoticon", + root / "msg" / "attach", + root / "msg" / "file", + root / "msg" / "video", + ] else: search_dirs = [ root / "msg" / "attach", @@ -522,6 +693,18 @@ def _fallback_search_media_by_md5(weixin_root_str: str, md5: str, kind: str = "" patterns = [ f"*{md5}*", # 任何包含md5的文件 ] + elif kind_key == "emoji": + patterns = [ + f"{md5}_h.dat", + f"{md5}_t.dat", + f"{md5}.dat", + f"{md5}*.dat", + f"{md5}*.gif", + f"{md5}*.webp", + f"{md5}*.png", + f"{md5}*.jpg", + f"*{md5}*", + ] else: # 优先顺序: _h.dat (高清) > _t.dat (缩略图) > 普通 .dat > 其他格式 # 因为基础 .dat 可能是 wxgf 容器格式,而 _h.dat/_t.dat 是真正的图片 @@ -577,6 +760,22 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st (0, b"\xff\xd8\xff", "image/jpeg"), (0, b"GIF87a", "image/gif"), (0, b"GIF89a", "image/gif"), + (0, b"wxgf", "application/octet-stream"), + (1, b"wxgf", "application/octet-stream"), + (2, b"wxgf", "application/octet-stream"), + (3, b"wxgf", "application/octet-stream"), + (4, b"wxgf", "application/octet-stream"), + (5, b"wxgf", "application/octet-stream"), + (6, b"wxgf", "application/octet-stream"), + (7, b"wxgf", "application/octet-stream"), + (8, b"wxgf", "application/octet-stream"), + (9, b"wxgf", "application/octet-stream"), + (10, b"wxgf", "application/octet-stream"), + (11, b"wxgf", "application/octet-stream"), + (12, b"wxgf", "application/octet-stream"), + (13, b"wxgf", "application/octet-stream"), + (14, b"wxgf", "application/octet-stream"), + (15, b"wxgf", "application/octet-stream"), (0, b"RIFF", "application/octet-stream"), (4, b"ftyp", "video/mp4"), ] @@ -595,6 +794,18 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st decoded = bytes(b ^ key for b in data) + if magic == b"wxgf": + try: + payload = decoded[offset:] if offset > 0 else decoded + converted = _wxgf_to_image_bytes(payload) + if converted: + mtw = _detect_image_media_type(converted[:32]) + if mtw != "application/octet-stream": + return converted, mtw + except Exception: + pass + continue + if offset == 0 and magic == b"RIFF": if len(decoded) >= 12 and decoded[8:12] == b"WEBP": return decoded, "image/webp" @@ -608,6 +819,36 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st return decoded, mt + preview_len = 8192 + try: + preview_len = min(int(preview_len), int(len(data))) + except Exception: + preview_len = 8192 + + if preview_len > 0: + for key in range(256): + try: + pv = bytes(b ^ key for b in data[:preview_len]) + except Exception: + continue + try: + scan = pv + if ( + (scan.find(b"wxgf") >= 0) + or (scan.find(b"\x89PNG\r\n\x1a\n") >= 0) + or (scan.find(b"\xff\xd8\xff") >= 0) + or (scan.find(b"GIF87a") >= 0) + or (scan.find(b"GIF89a") >= 0) + or (scan.find(b"RIFF") >= 0) + or (scan.find(b"ftyp") >= 0) + ): + decoded = bytes(b ^ key for b in data) + dec2, mt2 = _try_strip_media_prefix(decoded) + if mt2 != "application/octet-stream": + return dec2, mt2 + except Exception: + continue + return None, None @@ -897,6 +1138,74 @@ def _load_media_keys(account_dir: Path) -> dict[str, Any]: return {} +class _WxAMConfig(ctypes.Structure): + _fields_ = [ + ("mode", ctypes.c_int), + ("reserved", ctypes.c_int), + ] + + +@lru_cache(maxsize=1) +def _get_wxam_decoder(): + if os.name != "nt": + return None + dll_path = _PACKAGE_ROOT / "native" / "VoipEngine.dll" + if not dll_path.exists(): + logger.warning(f"WxAM decoder DLL not found: {dll_path}") + return None + try: + voip_engine = ctypes.WinDLL(str(dll_path)) + fn = voip_engine.wxam_dec_wxam2pic_5 + fn.argtypes = [ + ctypes.c_int64, + ctypes.c_int, + ctypes.c_int64, + ctypes.POINTER(ctypes.c_int), + ctypes.c_int64, + ] + fn.restype = ctypes.c_int64 + logger.info(f"WxAM decoder loaded: {dll_path}") + return fn + except Exception as e: + logger.warning(f"Failed to load WxAM decoder DLL: {dll_path} ({e})") + return None + + +def _wxgf_to_image_bytes(data: bytes) -> Optional[bytes]: + if not data or not data.startswith(b"wxgf"): + return None + fn = _get_wxam_decoder() + if fn is None: + return None + + max_output_size = 52 * 1024 * 1024 + for mode in (0, 3): + try: + config = _WxAMConfig() + config.mode = int(mode) + config.reserved = 0 + + input_buffer = ctypes.create_string_buffer(data, len(data)) + output_buffer = ctypes.create_string_buffer(max_output_size) + output_size = ctypes.c_int(max_output_size) + + result = fn( + ctypes.addressof(input_buffer), + int(len(data)), + ctypes.addressof(output_buffer), + ctypes.byref(output_size), + ctypes.addressof(config), + ) + if result != 0 or output_size.value <= 0: + continue + out = output_buffer.raw[: int(output_size.value)] + if _detect_image_media_type(out[:32]) != "application/octet-stream": + return out + except Exception: + continue + return None + + # ===================== 解密资源目录相关辅助函数 ===================== def _get_resource_dir(account_dir: Path) -> Path: @@ -941,13 +1250,71 @@ def _try_find_decrypted_resource(account_dir: Path, md5: str) -> Optional[Path]: if not target_dir.exists(): return None # 查找匹配MD5的文件(可能有不同扩展名) - for ext in ["jpg", "png", "gif", "webp", "dat"]: + for ext in ["jpg", "png", "gif", "webp", "mp4", "dat"]: p = target_dir / f"{md5}.{ext}" if p.exists(): return p return None +def _ensure_decrypted_resource_for_md5( + account_dir: Path, + md5: str, + source_path: Path, + weixin_root: Optional[Path] = None, +) -> Optional[Path]: + if not md5 or not source_path: + return None + + md5_lower = str(md5).lower() + existing = _try_find_decrypted_resource(account_dir, md5_lower) + if existing: + return existing + + try: + if not source_path.exists() or not source_path.is_file(): + return None + except Exception: + return None + + data, mt0 = _read_and_maybe_decrypt_media(source_path, account_dir=account_dir, weixin_root=weixin_root) + mt2 = str(mt0 or "").strip() + if (not mt2) or mt2 == "application/octet-stream": + mt2 = _detect_image_media_type(data[:32]) + if mt2 == "application/octet-stream": + try: + data2, mtp = _try_strip_media_prefix(data) + if mtp != "application/octet-stream": + data = data2 + mt2 = mtp + except Exception: + pass + if mt2 == "application/octet-stream": + try: + if len(data) >= 8 and data[4:8] == b"ftyp": + mt2 = "video/mp4" + except Exception: + pass + if mt2 == "application/octet-stream": + return None + + if str(mt2).startswith("image/"): + ext = _detect_image_extension(data) + elif str(mt2) == "video/mp4": + ext = "mp4" + else: + ext = Path(str(source_path.name)).suffix.lstrip(".").lower() or "dat" + output_path = _get_decrypted_resource_path(account_dir, md5_lower, ext) + try: + output_path.parent.mkdir(parents=True, exist_ok=True) + if not output_path.exists(): + output_path.write_bytes(data) + except Exception: + return None + + return output_path + + def _collect_all_dat_files(wxid_dir: Path) -> list[tuple[Path, str]]: """收集所有需要解密的.dat文件,返回 (文件路径, md5) 列表""" results: list[tuple[Path, str]] = [] @@ -1022,6 +1389,11 @@ def _decrypt_and_save_resource( if not decrypted: return False, "解密结果为空" + + if decrypted.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(decrypted) + if converted: + decrypted = converted # 检测图片类型 ext = _detect_image_extension(decrypted) @@ -1049,6 +1421,38 @@ def _read_and_maybe_decrypt_media(path: Path, account_dir: Optional[Path] = None if mt != "application/octet-stream": return path.read_bytes(), mt + if head.startswith(b"wxgf"): + data0 = path.read_bytes() + converted0 = _wxgf_to_image_bytes(data0) + if converted0: + mt0 = _detect_image_media_type(converted0[:32]) + if mt0 != "application/octet-stream": + return converted0, mt0 + + try: + idx = head.find(b"wxgf") + except Exception: + idx = -1 + if 0 < idx <= 4: + try: + data0 = path.read_bytes() + payload0 = data0[idx:] + converted0 = _wxgf_to_image_bytes(payload0) + if converted0: + mt0 = _detect_image_media_type(converted0[:32]) + if mt0 != "application/octet-stream": + return converted0, mt0 + except Exception: + pass + + try: + data_pref = path.read_bytes() + stripped, mtp = _try_strip_media_prefix(data_pref) + if mtp != "application/octet-stream": + return stripped, mtp + except Exception: + pass + data = path.read_bytes() dec, mt2 = _try_xor_decrypt_by_magic(data) @@ -1066,14 +1470,52 @@ def _read_and_maybe_decrypt_media(path: Path, account_dir: Optional[Path] = None root = ds.parent if ds else None xor_key = _find_wechat_xor_key(str(root)) if root else None + if xor_key is None and account_dir is not None: + try: + keys2 = _load_media_keys(account_dir) + x2 = keys2.get("xor") + if x2 is not None: + xor_key = int(x2) + if not (0 <= int(xor_key) <= 255): + xor_key = None + else: + logger.debug("使用 _media_keys.json 中保存的 xor key") + except Exception: + xor_key = None try: if version == 0 and xor_key is not None: out = _decrypt_wechat_dat_v3(data, xor_key) + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") mt0 = _detect_image_media_type(out[:32]) if mt0 != "application/octet-stream": return out, mt0 elif version == 1 and xor_key is not None: out = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef") + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") mt1 = _detect_image_media_type(out[:32]) if mt1 != "application/octet-stream": return out, mt1 @@ -1096,6 +1538,19 @@ def _read_and_maybe_decrypt_media(path: Path, account_dir: Optional[Path] = None if aes_key16: out = _decrypt_wechat_dat_v4(data, xor_key, aes_key16) + try: + out2, mtp2 = _try_strip_media_prefix(out) + if mtp2 != "application/octet-stream": + return out2, mtp2 + except Exception: + pass + if out.startswith(b"wxgf"): + converted = _wxgf_to_image_bytes(out) + if converted: + out = converted + logger.info(f"wxgf->image: {path} -> {len(out)} bytes") + else: + logger.info(f"wxgf->image failed: {path}") mt2b = _detect_image_media_type(out[:32]) if mt2b != "application/octet-stream": return out, mt2b @@ -1671,6 +2126,8 @@ def _iter_message_db_paths(account_dir: Path) -> list[Path]: continue if ln == "message_resource.db": continue + if ln == "message_fts.db": + continue if re.match(r"^message(_\d+)?\.db$", ln): candidates.append(p) @@ -1685,6 +2142,200 @@ def _iter_message_db_paths(account_dir: Path) -> list[Path]: return candidates +def _resolve_msg_table_name_by_map(lower_to_actual: dict[str, str], username: str) -> Optional[str]: + if not username: + return None + md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest() + expected = f"msg_{md5_hex}".lower() + expected_chat = f"chat_{md5_hex}".lower() + + if expected in lower_to_actual: + return lower_to_actual[expected] + if expected_chat in lower_to_actual: + return lower_to_actual[expected_chat] + + for ln, actual in lower_to_actual.items(): + if ln.startswith("msg_") and md5_hex in ln: + return actual + if ln.startswith("chat_") and md5_hex in ln: + return actual + + for ln, actual in lower_to_actual.items(): + if md5_hex in ln: + return actual + + partial = md5_hex[:24] + for ln, actual in lower_to_actual.items(): + if partial in ln: + return actual + + return None + + +def _build_latest_message_preview( + username: str, + local_type: int, + raw_text: str, + is_group: bool, + sender_username: str = "", +) -> str: + raw_text = (raw_text or "").strip() + sender_prefix = "" + if is_group and raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')): + sender_prefix, raw_text = _split_group_sender_prefix(raw_text) + if is_group and (not sender_prefix) and sender_username: + sender_prefix = str(sender_username).strip() + + content_text = "" + if local_type == 10000: + if "revokemsg" in raw_text: + content_text = "撤回了一条消息" + else: + content_text = re.sub(r"]*>", "", raw_text) + content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]" + elif local_type == 244813135921: + parsed = _parse_app_message(raw_text) + qt = str(parsed.get("quoteTitle") or "").strip() + qc = str(parsed.get("quoteContent") or "").strip() + c0 = str(parsed.get("content") or "").strip() + content_text = qc or c0 or qt or "[引用消息]" + elif local_type == 49: + parsed = _parse_app_message(raw_text) + rt = str(parsed.get("renderType") or "") + content_text = str(parsed.get("content") or "") + title_text = str(parsed.get("title") or "").strip() + if rt == "file" and title_text: + content_text = title_text + if (not content_text) and rt == "transfer": + content_text = str(parsed.get("senderTitle") or "") or str(parsed.get("receiverTitle") or "") or "转账" + if not content_text: + content_text = title_text or str(parsed.get("url") or "") + elif local_type == 25769803825: + parsed = _parse_app_message(raw_text) + title_text = str(parsed.get("title") or "").strip() + content_text = title_text or str(parsed.get("content") or "").strip() or "[文件]" + elif local_type == 3: + content_text = "[图片]" + elif local_type == 34: + duration = _extract_xml_attr(raw_text, "voicelength") + content_text = f"[语音 {duration}秒]" if duration else "[语音]" + elif local_type == 43 or local_type == 62: + content_text = "[视频]" + elif local_type == 47: + content_text = "[表情]" + else: + if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')): + content_text = raw_text + else: + content_text = _infer_message_brief_by_local_type(local_type) + + content_text = (content_text or "").strip() or _infer_message_brief_by_local_type(local_type) + content_text = re.sub(r"\s+", " ", content_text).strip() + if sender_prefix and content_text: + return f"{sender_prefix}: {content_text}" + return content_text + + +def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]: + if not usernames: + return {} + + db_paths = _iter_message_db_paths(account_dir) + if not db_paths: + return {} + + remaining = {u for u in usernames if u} + best: dict[str, tuple[tuple[int, int, int], str]] = {} + + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] account_dir={account_dir} usernames={len(remaining)} dbs={len(db_paths)}" + ) + logger.info( + f"[sessions.preview] db_paths=" + f"{', '.join([p.name for p in db_paths[:8]])}{'...' if len(db_paths) > 8 else ''}" + ) + + for db_path in db_paths: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + try: + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + names = [str(r[0]) for r in rows if r and r[0]] + lower_to_actual = {n.lower(): n for n in names} + + found: dict[str, str] = {} + for u in list(remaining): + tn = _resolve_msg_table_name_by_map(lower_to_actual, u) + if tn: + found[u] = tn + + if not found: + continue + + conn.text_factory = bytes + for u, tn in found.items(): + quoted = _quote_ident(tn) + try: + try: + r = conn.execute( + "SELECT " + "m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, " + "n.user_name AS sender_username " + f"FROM {quoted} m " + "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " + "ORDER BY m.create_time DESC, m.sort_seq DESC, m.local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + r = conn.execute( + "SELECT " + "local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username " + f"FROM {quoted} " + "ORDER BY create_time DESC, sort_seq DESC, local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception as e: + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] db={db_path.name} username={u} table={tn} query_failed={e}" + ) + continue + if r is None: + continue + + local_type = int(r["local_type"] or 0) + create_time = int(r["create_time"] or 0) + sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0 + local_id = int(r["local_id"] or 0) + sort_key = (create_time, sort_seq, local_id) + + raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip() + sender_username = _decode_sqlite_text(r["sender_username"]).strip() + preview = _build_latest_message_preview( + username=u, + local_type=local_type, + raw_text=raw_text, + is_group=bool(u.endswith("@chatroom")), + sender_username=sender_username, + ) + if not preview: + continue + + prev = best.get(u) + if prev is None or sort_key > prev[0]: + best[u] = (sort_key, preview) + finally: + conn.close() + + previews = {u: v[1] for u, v in best.items() if v and v[1]} + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] built_previews={len(previews)} remaining_without_preview={len(remaining - set(previews.keys()))}" + ) + return previews + + def _pick_display_name(contact_row: Optional[sqlite3.Row], fallback_username: str) -> str: """显示名优先级:remark > nick_name > alias > username""" if contact_row is None: @@ -1991,6 +2642,173 @@ async def get_chat_avatar(username: str, account: Optional[str] = None): return Response(content=data, media_type=media_type) +class EmojiDownloadRequest(BaseModel): + account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)") + md5: str = Field(..., description="表情 MD5") + emoji_url: str = Field(..., description="表情 CDN URL") + force: bool = Field(False, description="是否强制重新下载并覆盖") + + +def _is_valid_md5(s: str) -> bool: + v = str(s or "").strip().lower() + return bool(re.fullmatch(r"[0-9a-f]{32}", v)) + + +def _is_safe_http_url(url: str) -> bool: + u = str(url or "").strip() + if not u: + return False + try: + p = urlparse(u) + except Exception: + return False + if p.scheme not in ("http", "https"): + return False + host = (p.hostname or "").strip() + if not host: + return False + if host in {"localhost"}: + return False + try: + ip = ipaddress.ip_address(host) + if ip.is_private or ip.is_loopback or ip.is_link_local: + return False + except Exception: + pass + return True + + +def _detect_media_type_and_ext(data: bytes) -> tuple[bytes, str, str]: + payload = data + media_type = "application/octet-stream" + ext = "dat" + + try: + payload2, mt2 = _try_strip_media_prefix(payload) + if mt2 != "application/octet-stream": + payload = payload2 + media_type = mt2 + except Exception: + pass + + if media_type == "application/octet-stream": + mt0 = _detect_image_media_type(payload[:32]) + if mt0 != "application/octet-stream": + media_type = mt0 + + if media_type == "application/octet-stream": + try: + if len(payload) >= 8 and payload[4:8] == b"ftyp": + media_type = "video/mp4" + except Exception: + pass + + if media_type.startswith("image/"): + ext = _detect_image_extension(payload) + elif media_type == "video/mp4": + ext = "mp4" + else: + ext = "dat" + + return payload, media_type, ext + + +@app.post("/api/chat/media/emoji/download", summary="下载表情消息资源到本地 resource") +async def download_chat_emoji(req: EmojiDownloadRequest): + md5 = str(req.md5 or "").strip().lower() + emoji_url = str(req.emoji_url or "").strip() + + if not _is_valid_md5(md5): + raise HTTPException(status_code=400, detail="Invalid md5.") + if not _is_safe_http_url(emoji_url): + raise HTTPException(status_code=400, detail="Invalid emoji_url (only public http/https allowed).") + + account_dir = _resolve_account_dir(req.account) + + existing = _try_find_decrypted_resource(account_dir, md5) + if existing and existing.exists() and (not req.force): + return { + "status": "success", + "account": account_dir.name, + "md5": md5, + "saved": True, + "already_exists": True, + "path": str(existing), + "resource_dir": str(existing.parent), + } + + def _download_bytes() -> bytes: + headers = { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120 Safari/537.36", + "Accept": "*/*", + } + r = requests.get(emoji_url, headers=headers, timeout=20, stream=True) + try: + r.raise_for_status() + max_bytes = 30 * 1024 * 1024 + chunks: list[bytes] = [] + total = 0 + for ch in r.iter_content(chunk_size=64 * 1024): + if not ch: + continue + chunks.append(ch) + total += len(ch) + if total > max_bytes: + raise HTTPException(status_code=400, detail="Emoji download too large (>30MB).") + return b"".join(chunks) + finally: + try: + r.close() + except Exception: + pass + + try: + data = await asyncio.to_thread(_download_bytes) + except HTTPException: + raise + except Exception as e: + logger.warning(f"emoji_download failed: md5={md5} url={emoji_url} err={e}") + raise HTTPException(status_code=500, detail=f"Emoji download failed: {e}") + + if not data: + raise HTTPException(status_code=500, detail="Emoji download returned empty body.") + + payload, media_type, ext = _detect_media_type_and_ext(data) + out_path = _get_decrypted_resource_path(account_dir, md5, ext) + out_path.parent.mkdir(parents=True, exist_ok=True) + + if out_path.exists() and (not req.force): + return { + "status": "success", + "account": account_dir.name, + "md5": md5, + "saved": True, + "already_exists": True, + "path": str(out_path), + "resource_dir": str(out_path.parent), + "media_type": media_type, + "bytes": len(payload), + } + + try: + out_path.write_bytes(payload) + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to write emoji file: {e}") + + logger.info(f"emoji_download: md5={md5} url={emoji_url} -> {out_path} bytes={len(payload)} mt={media_type}") + return { + "status": "success", + "account": account_dir.name, + "md5": md5, + "saved": True, + "already_exists": False, + "path": str(out_path), + "resource_dir": str(out_path.parent), + "media_type": media_type, + "bytes": len(payload), + } + + @app.get("/api/chat/media/image", summary="获取图片消息资源") async def get_chat_image(md5: str, account: Optional[str] = None, username: Optional[str] = None): if not md5: @@ -2002,6 +2820,10 @@ async def get_chat_image(md5: str, account: Optional[str] = None, username: Opti if decrypted_path: data = decrypted_path.read_bytes() media_type = _detect_image_media_type(data[:32]) + if media_type == "application/octet-stream": + guessed = mimetypes.guess_type(str(decrypted_path))[0] + if guessed: + media_type = guessed return Response(content=data, media_type=media_type) # 回退到原始逻辑:从微信数据目录实时解密 @@ -2038,7 +2860,20 @@ async def get_chat_image(md5: str, account: Optional[str] = None, username: Opti if not p: raise HTTPException(status_code=404, detail="Image not found.") + logger.info(f"chat_image: md5={md5} resolved_source={p}") + data, media_type = _read_and_maybe_decrypt_media(p, account_dir=account_dir, weixin_root=wxid_dir) + if media_type.startswith("image/"): + try: + out_md5 = str(md5).lower() + ext = _detect_image_extension(data) + out_path = _get_decrypted_resource_path(account_dir, out_md5, ext) + out_path.parent.mkdir(parents=True, exist_ok=True) + if not out_path.exists(): + out_path.write_bytes(data) + except Exception: + pass + logger.info(f"chat_image: md5={md5} media_type={media_type} bytes={len(data)}") return Response(content=data, media_type=media_type) @@ -2053,39 +2888,28 @@ async def get_chat_emoji(md5: str, account: Optional[str] = None, username: Opti if decrypted_path: data = decrypted_path.read_bytes() media_type = _detect_image_media_type(data[:32]) + if media_type == "application/octet-stream": + guessed = mimetypes.guess_type(str(decrypted_path))[0] + if guessed: + media_type = guessed return Response(content=data, media_type=media_type) - # 回退到原始逻辑 wxid_dir = _resolve_account_wxid_dir(account_dir) - hardlink_db_path = account_dir / "hardlink.db" - extra_roots: list[Path] = [] - db_storage_dir = _resolve_account_db_storage_dir(account_dir) - if db_storage_dir: - extra_roots.append(db_storage_dir) - - roots: list[Path] = [] - if wxid_dir: - roots.append(wxid_dir) - if db_storage_dir: - roots.append(db_storage_dir) - if not roots: - raise HTTPException(status_code=404, detail="wxid_dir/db_storage_path not found. Please decrypt with db_storage_path to enable media lookup.") - p = _resolve_media_path_from_hardlink( - hardlink_db_path, - roots[0], - md5=str(md5), - kind="emoji", - username=username, - extra_roots=roots[1:], - ) - if (not p) and wxid_dir: - hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5)) - if hit: - p = Path(hit) + p = _resolve_media_path_for_kind(account_dir, kind="emoji", md5=str(md5), username=username) if not p: raise HTTPException(status_code=404, detail="Emoji not found.") data, media_type = _read_and_maybe_decrypt_media(p, account_dir=account_dir, weixin_root=wxid_dir) + if media_type.startswith("image/"): + try: + out_md5 = str(md5).lower() + ext = _detect_image_extension(data) + out_path = _get_decrypted_resource_path(account_dir, out_md5, ext) + out_path.parent.mkdir(parents=True, exist_ok=True) + if not out_path.exists(): + out_path.write_bytes(data) + except Exception: + pass return Response(content=data, media_type=media_type) @@ -2306,6 +3130,103 @@ def _resolve_media_path_for_kind( return p +def _pick_best_emoji_source_path(resolved: Path, md5: str) -> Optional[Path]: + if not resolved: + return None + try: + if resolved.exists() and resolved.is_file(): + return resolved + except Exception: + pass + + try: + if not (resolved.exists() and resolved.is_dir()): + return None + except Exception: + return None + + md5s = str(md5 or "").lower().strip() + if not md5s: + return None + + candidates = [ + f"{md5s}_h.dat", + f"{md5s}_t.dat", + f"{md5s}.dat", + ] + exts = ["gif", "webp", "png", "jpg", "jpeg"] + for ext in exts: + candidates.append(f"{md5s}.{ext}") + + for name in candidates: + p = resolved / name + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + + patterns = [f"{md5s}*.dat", f"{md5s}*", f"*{md5s}*"] + for pat in patterns: + try: + for p in resolved.glob(pat): + try: + if p.is_file(): + return p + except Exception: + continue + except Exception: + continue + return None + + +def _iter_emoji_source_candidates(resolved: Path, md5: str, limit: int = 20) -> list[Path]: + md5s = str(md5 or "").lower().strip() + if not md5s: + return [] + + best = _pick_best_emoji_source_path(resolved, md5s) + out: list[Path] = [] + if best: + out.append(best) + + try: + if not (resolved.exists() and resolved.is_dir()): + return out + except Exception: + return out + + try: + files = [p for p in resolved.iterdir() if p.is_file()] + except Exception: + files = [] + + def score(p: Path) -> tuple[int, int, int]: + name = str(p.name).lower() + contains = 1 if md5s in name else 0 + ext = str(p.suffix).lower().lstrip(".") + ext_rank = 0 + if ext == "dat": + ext_rank = 3 + elif ext in {"gif", "webp"}: + ext_rank = 2 + elif ext in {"png", "jpg", "jpeg"}: + ext_rank = 1 + try: + sz = int(p.stat().st_size) + except Exception: + sz = 0 + return (contains, ext_rank, sz) + + files_sorted = sorted(files, key=score, reverse=True) + for p in files_sorted: + if p not in out: + out.append(p) + if len(out) >= int(limit): + break + return out + + @app.post("/api/chat/media/open_folder", summary="在资源管理器中打开媒体文件所在位置") async def open_chat_media_folder( kind: str, @@ -2360,23 +3281,200 @@ async def open_chat_media_folder( raise HTTPException(status_code=400, detail="Missing md5.") p = _resolve_media_path_for_kind(account_dir, kind=kind_key, md5=str(md5), username=username) + resolved_before_materialize = p + materialized_ok = False + opened_kind = "resolved" + + if p and kind_key in {"image", "emoji", "video_thumb"}: + wxid_dir = _resolve_account_wxid_dir(account_dir) + source_path = p + if kind_key == "emoji": + candidates: list[Path] = [] + try: + md5s = str(md5 or "").lower().strip() + except Exception: + md5s = str(md5) + + try: + if p is not None and p.exists() and p.is_file(): + if (not str(p.suffix or "")) and md5s and str(p.name).lower() == md5s: + candidates.extend(_iter_emoji_source_candidates(p.parent, md5s)) + if p not in candidates: + candidates.append(p) + else: + candidates.append(p) + candidates.extend(_iter_emoji_source_candidates(p.parent, md5s)) + else: + candidates = _iter_emoji_source_candidates(p, md5s) + except Exception: + candidates = _iter_emoji_source_candidates(p, str(md5)) + + # de-dup while keeping order + seen: set[str] = set() + uniq: list[Path] = [] + for c in candidates: + try: + k = str(c.resolve()) + except Exception: + k = str(c) + if k in seen: + continue + seen.add(k) + uniq.append(c) + candidates = uniq + + try: + preferred: list[Path] = [] + if md5s: + for c in candidates: + try: + if md5s in str(c.name).lower(): + preferred.append(c) + except Exception: + continue + if preferred: + rest = [c for c in candidates if c not in preferred] + candidates = preferred + rest + except Exception: + pass + if not candidates and p is not None: + candidates = [p] + for cand in candidates: + source_path = cand + materialized = _ensure_decrypted_resource_for_md5( + account_dir, + md5=str(md5), + source_path=source_path, + weixin_root=wxid_dir, + ) + if materialized: + p = materialized + materialized_ok = True + opened_kind = "decrypted" + break + + if not materialized_ok: + try: + sz = -1 + head_hex = "" + try: + if source_path and source_path.exists() and source_path.is_file(): + sz = int(source_path.stat().st_size) + with open(source_path, "rb") as f: + head_hex = f.read(32).hex() + except Exception: + pass + logger.info( + f"open_folder: emoji materialize failed: resolved={str(resolved_before_materialize)} source={str(source_path)} size={sz} head32={head_hex}" + ) + except Exception: + pass + + try: + resource_dir = _get_resource_dir(account_dir) + sub_dir = str(md5).lower()[:2] if len(str(md5)) >= 2 else "00" + fallback_dir = resource_dir / sub_dir + fallback_dir.mkdir(parents=True, exist_ok=True) + p = fallback_dir + opened_kind = "resource_dir" + except Exception: + try: + resource_dir = _get_resource_dir(account_dir) + sub_dir = str(md5).lower()[:2] if len(str(md5)) >= 2 else "00" + fallback_dir = resource_dir / sub_dir + fallback_dir.mkdir(parents=True, exist_ok=True) + p = fallback_dir + opened_kind = "resource_dir" + except Exception: + pass + else: + materialized = _ensure_decrypted_resource_for_md5( + account_dir, + md5=str(md5), + source_path=source_path, + weixin_root=wxid_dir, + ) + if materialized: + p = materialized + materialized_ok = True + opened_kind = "decrypted" + + if kind_key == "emoji" and md5: + try: + existing2 = _try_find_decrypted_resource(account_dir, str(md5).lower()) + if existing2: + p = existing2 + opened_kind = "decrypted" + except Exception: + pass + if not p: - raise HTTPException(status_code=404, detail="File not found.") + if kind_key == "emoji": + wxid_dir = _resolve_account_wxid_dir(account_dir) + resource_dir = _get_resource_dir(account_dir) + candidates: list[Path] = [] + if md5: + sub_dir = str(md5).lower()[:2] if len(str(md5)) >= 2 else "00" + c1 = resource_dir / sub_dir + if c1.exists() and c1.is_dir(): + candidates.append(c1) + if resource_dir.exists() and resource_dir.is_dir(): + candidates.append(resource_dir) + if wxid_dir: + for c in [ + wxid_dir / "msg" / "emoji", + wxid_dir / "msg" / "emoticon", + wxid_dir / "emoji", + wxid_dir, + ]: + try: + if c.exists() and c.is_dir(): + candidates.append(c) + except Exception: + continue + candidates.append(account_dir) + p = candidates[0] + else: + raise HTTPException(status_code=404, detail="File not found.") try: target = str(p.resolve()) except Exception: target = str(p) + logger.info(f"open_folder: kind={kind_key} md5={md5} server_id={server_id} -> {target}") + if os.name != "nt": raise HTTPException(status_code=400, detail="open_folder is only supported on Windows.") try: - subprocess.Popen(["explorer", "/select,", target]) + tp = Path(target) + if tp.exists() and tp.is_dir(): + subprocess.Popen(["explorer.exe", str(tp)]) + elif tp.exists(): + subprocess.Popen(["explorer.exe", "/select,", str(tp)]) + else: + subprocess.Popen(["explorer.exe", str(tp.parent)]) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to open explorer: {e}") - return {"status": "success", "path": target} + file_found = False + try: + tp2 = Path(target) + if kind_key == "emoji": + file_found = bool(tp2.exists()) + else: + if tp2.exists() and tp2.is_file(): + file_found = True + except Exception: + pass + + resp = {"status": "success", "path": target} + if kind_key == "emoji": + resp["file_found"] = bool(file_found) + resp["materialized"] = bool(materialized_ok) if "materialized_ok" in locals() else bool(file_found) + resp["opened"] = str(opened_kind) if "opened_kind" in locals() else "unknown" + return resp @app.middleware("http") @@ -2609,6 +3707,11 @@ async def list_chat_sessions( contact_rows = _load_contact_rows(contact_db_path, usernames) local_avatar_usernames = _query_head_image_usernames(head_image_db_path, usernames) + latest_previews = _load_latest_message_previews(account_dir, usernames) + if _DEBUG_SESSIONS: + logger.info( + f"[sessions.preview] endpoint account={account_dir.name} sessions={len(usernames)} previews={len(latest_previews)}" + ) sessions: list[dict[str, Any]] = [] for r in filtered: @@ -2620,13 +3723,8 @@ async def list_chat_sessions( if not avatar_url and username in local_avatar_usernames: avatar_url = base_url + _build_avatar_url(account_dir.name, username) - summary = (r["summary"] or "").strip() if isinstance(r["summary"], str) else (r["summary"] or "") - draft = (r["draft"] or "").strip() if isinstance(r["draft"], str) else (r["draft"] or "") - - if draft: - last_message = f"[Draft] {draft}" - elif summary: - last_message = summary + if str(latest_previews.get(username) or "").strip(): + last_message = str(latest_previews.get(username) or "").strip() else: last_message = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"]) @@ -3121,12 +4219,31 @@ async def list_chat_messages( + f"/api/chat/media/image?account={quote(account_dir.name)}&md5={quote(md5)}&username={quote(username)}" ) elif rt == "emoji": - if (not str(m.get("emojiUrl") or "")) and str(m.get("emojiMd5") or ""): - md5 = str(m.get("emojiMd5") or "") - m["emojiUrl"] = ( - base_url - + f"/api/chat/media/emoji?account={quote(account_dir.name)}&md5={quote(md5)}&username={quote(username)}" - ) + md5 = str(m.get("emojiMd5") or "") + if md5: + existing_local: Optional[Path] = None + try: + existing_local = _try_find_decrypted_resource(account_dir, str(md5).lower()) + except Exception: + existing_local = None + + if existing_local: + try: + cur = str(m.get("emojiUrl") or "") + if cur and re.match(r"^https?://", cur, flags=re.I) and ("/api/chat/media/emoji" not in cur): + m["emojiRemoteUrl"] = cur + except Exception: + pass + + m["emojiUrl"] = ( + base_url + + f"/api/chat/media/emoji?account={quote(account_dir.name)}&md5={quote(md5)}&username={quote(username)}" + ) + elif (not str(m.get("emojiUrl") or "")): + m["emojiUrl"] = ( + base_url + + f"/api/chat/media/emoji?account={quote(account_dir.name)}&md5={quote(md5)}&username={quote(username)}" + ) elif rt == "video": if (not str(m.get("videoThumbUrl") or "")) and str(m.get("videoThumbMd5") or ""): md5 = str(m.get("videoThumbMd5") or "") @@ -3160,8 +4277,7 @@ async def list_chat_messages( sseq = int(m.get("sortSeq") or 0) cts = int(m.get("createTime") or 0) lid = int(m.get("localId") or 0) - primary = sseq or cts - return (primary, cts, lid) + return (cts, sseq, lid) merged.sort(key=sort_key, reverse=True) has_more_global = bool(has_more_any or (len(merged) > (int(offset) + int(limit)))) diff --git a/src/wechat_decrypt_tool/media_key_finder.py b/src/wechat_decrypt_tool/media_key_finder.py new file mode 100644 index 0000000..1d09de6 --- /dev/null +++ b/src/wechat_decrypt_tool/media_key_finder.py @@ -0,0 +1,238 @@ +import ctypes +import json +import os +import re +import threading +from collections import Counter +from concurrent.futures import ThreadPoolExecutor +from ctypes import wintypes +from functools import lru_cache +from pathlib import Path +from typing import Any + +import pymem +import yara +from Crypto.Cipher import AES + +PROCESS_ALL_ACCESS = 0x1F0FFF +MEM_COMMIT = 0x1000 +MEM_PRIVATE = 0x20000 + +kernel32 = ctypes.windll.kernel32 + + +class MEMORY_BASIC_INFORMATION(ctypes.Structure): + _fields_ = [ + ("BaseAddress", ctypes.c_void_p), + ("AllocationBase", ctypes.c_void_p), + ("AllocationProtect", ctypes.c_ulong), + ("RegionSize", ctypes.c_size_t), + ("State", ctypes.c_ulong), + ("Protect", ctypes.c_ulong), + ("Type", ctypes.c_ulong), + ] + + +def _open_process(pid: int): + return kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, pid) + + +def _read_process_memory(process_handle, address: int, size: int) -> bytes | None: + buffer = ctypes.create_string_buffer(size) + bytes_read = ctypes.c_size_t(0) + success = kernel32.ReadProcessMemory( + process_handle, + ctypes.c_void_p(address), + buffer, + size, + ctypes.byref(bytes_read), + ) + if not success: + return None + return buffer.raw + + +def _get_memory_regions(process_handle) -> list[tuple[int, int]]: + regions: list[tuple[int, int]] = [] + mbi = MEMORY_BASIC_INFORMATION() + address = 0 + while kernel32.VirtualQueryEx( + process_handle, + ctypes.c_void_p(address), + ctypes.byref(mbi), + ctypes.sizeof(mbi), + ): + if mbi.State == MEM_COMMIT and mbi.Type == MEM_PRIVATE: + regions.append((int(mbi.BaseAddress), int(mbi.RegionSize))) + address += int(mbi.RegionSize) + return regions + + +@lru_cache +def _verify(encrypted: bytes, key: bytes) -> bool: + aes_key = key[:16] + cipher = AES.new(aes_key, AES.MODE_ECB) + text = cipher.decrypt(encrypted) + return bool(text.startswith(b"\xff\xd8\xff")) + + +def _search_memory_chunk(process_handle, base_address: int, region_size: int, encrypted: bytes, rules): + memory = _read_process_memory(process_handle, base_address, region_size) + if not memory: + return None + + matches = rules.match(data=memory) + if matches: + for match in matches: + if match.rule == "AesKey": + for string in match.strings: + for instance in string.instances: + content = instance.matched_data[1:-1] + if _verify(encrypted, content): + return content[:16] + return None + + +def _get_aes_key(encrypted: bytes, pid: int) -> Any: + process_handle = _open_process(pid) + if not process_handle: + raise RuntimeError(f"无法打开进程 {pid}") + + rules_key = r""" + rule AesKey { + strings: + $pattern = /[^a-z0-9][a-z0-9]{32}[^a-z0-9]/ + condition: + $pattern + } + """ + rules = yara.compile(source=rules_key) + + process_infos = _get_memory_regions(process_handle) + + found_result = threading.Event() + result = [None] + + def process_chunk(args): + if found_result.is_set(): + return None + base_address, region_size = args + res = _search_memory_chunk(process_handle, base_address, region_size, encrypted, rules) + if res: + result[0] = res + found_result.set() + return res + + with ThreadPoolExecutor(max_workers=min(32, len(process_infos) or 1)) as executor: + executor.map(process_chunk, process_infos) + + kernel32.CloseHandle(process_handle) + return result[0] + + +def _dump_wechat_info_v4(encrypted: bytes, pid: int) -> bytes: + result = _get_aes_key(encrypted, pid) + if isinstance(result, bytes): + return result[:16] + raise RuntimeError("未找到 AES 密钥") + + +def _sort_template_files_by_date(template_files: list[Path]) -> list[Path]: + def get_date_from_path(filepath: Path) -> str: + match = re.search(r"(\d{4}-\d{2})", str(filepath)) + if match: + return match.group(1) + return "0000-00" + + return sorted(template_files, key=get_date_from_path, reverse=True) + + +def find_key( + weixin_dir: Path, + version: int = 4, + xor_key_: int | None = None, + aes_key_: bytes | None = None, +) -> tuple[int, bytes]: + if os.name != "nt": + raise RuntimeError("仅支持 Windows") + if version not in (3, 4): + raise RuntimeError("version must be 3 or 4") + + template_files = _sort_template_files_by_date(list(weixin_dir.rglob("*_t.dat"))) + if not template_files: + raise RuntimeError("未找到模板文件") + + last_bytes_list: list[bytes] = [] + for file in template_files[:16]: + try: + with open(file, "rb") as f: + f.seek(-2, 2) + last_bytes_list.append(f.read(2)) + except Exception: + continue + + if not last_bytes_list: + raise RuntimeError("对于 XOR, 未能成功读取任何模板文件") + + counter = Counter(last_bytes_list) + most_common = counter.most_common(1)[0][0] + + x, y = most_common + xor_key = x ^ 0xFF + if xor_key != (y ^ 0xD9): + raise RuntimeError("未能找到 XOR 密钥") + + if xor_key_ is not None: + if xor_key_ != xor_key: + raise RuntimeError("XOR 密钥校验失败") + return xor_key_, aes_key_ or b"" + + if version == 3: + return xor_key, b"cfcd208495d565ef" + + ciphertext: bytes | None = None + for file in template_files: + with open(file, "rb") as f: + if f.read(6) != b"\x07\x08V2\x08\x07": + continue + f.seek(-2, 2) + if f.read(2) != most_common: + continue + f.seek(0xF) + ciphertext = f.read(16) + break + + if not ciphertext: + raise RuntimeError("对于 AES, 未能成功读取任何模板文件") + + try: + pm = pymem.Pymem("Weixin.exe") + pid = pm.process_id + if not isinstance(pid, int): + raise RuntimeError("找不到微信进程") + except Exception: + raise RuntimeError("找不到微信进程") + + aes_key = _dump_wechat_info_v4(ciphertext, pid) + return xor_key, aes_key + + +CONFIG_FILE = "config.json" + + +def read_key_from_config() -> tuple[int, bytes]: + if os.path.exists(CONFIG_FILE): + with open(CONFIG_FILE, "r", encoding="utf-8") as f: + key_dict = json.loads(f.read()) + x, y = key_dict["xor"], key_dict["aes"] + return x, y.encode()[:16] + return 0, b"" + + +def store_key(xor_k: int, aes_k: bytes) -> None: + key_dict = { + "xor": xor_k, + "aes": aes_k.decode(), + } + with open(CONFIG_FILE, "w", encoding="utf-8") as f: + f.write(json.dumps(key_dict)) diff --git a/src/wechat_decrypt_tool/native/VoipEngine.dll b/src/wechat_decrypt_tool/native/VoipEngine.dll new file mode 100644 index 0000000..af9c8ff Binary files /dev/null and b/src/wechat_decrypt_tool/native/VoipEngine.dll differ diff --git a/src/wechat_decrypt_tool/native/__init__.py b/src/wechat_decrypt_tool/native/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/debug_decrypt_file.py b/tools/debug_decrypt_file.py index 6be95f6..b9b51ba 100644 --- a/tools/debug_decrypt_file.py +++ b/tools/debug_decrypt_file.py @@ -112,33 +112,12 @@ def decrypt_v4(data: bytes, xor_key: int, aes_key: bytes) -> bytes: if version == 2 and xor_key is not None and aes_key16: print("\n[4.1] 使用本地 decrypt_v4 函数:") decrypted = decrypt_v4(data, xor_key, aes_key16) - + # 保存解密后的文件 output_file = Path("test_decrypted_manual.jpg") with open(output_file, "wb") as f: f.write(decrypted) print(f" 已保存: {output_file} ({len(decrypted)} bytes)") - - # 使用 WxDatDecrypt 的函数 - print("\n[4.2] 使用 WxDatDecrypt 的 decrypt_dat_v4:") - sys.path.insert(0, "WxDatDecrypt") - from decrypt import decrypt_dat_v4 as wx_decrypt_v4 - - decrypted_wx = wx_decrypt_v4(TEST_FILE, xor_key, aes_key16) - print(f" 结果长度: {len(decrypted_wx)}") - print(f" 结果前 16 字节: {decrypted_wx[:16].hex()}") - - if decrypted_wx[:3] == b"\xff\xd8\xff": - print(" [OK] 解密成功! 是 JPEG 图片") - elif decrypted_wx[:8] == b"\x89PNG\r\n\x1a\n": - print(" [OK] 解密成功! 是 PNG 图片") - else: - print(" [WARN] 解密后不是有效图片头") - - output_file2 = Path("test_decrypted_wxdat.jpg") - with open(output_file2, "wb") as f: - f.write(decrypted_wx) - print(f" 已保存: {output_file2} ({len(decrypted_wx)} bytes)") else: print(" [ERROR] 无法解密: 缺少必要参数") diff --git a/tools/extract_media_keys.py b/tools/extract_media_keys.py index 8a3add5..8a1aa23 100644 --- a/tools/extract_media_keys.py +++ b/tools/extract_media_keys.py @@ -11,15 +11,14 @@ import sys sys.path.insert(0, "src") -sys.path.insert(0, "WxDatDecrypt") import json from pathlib import Path try: - from key import find_key + from wechat_decrypt_tool.media_key_finder import find_key except ImportError as e: - print(f"[ERROR] 无法导入 WxDatDecrypt: {e}") + print(f"[ERROR] 无法导入 media_key_finder: {e}") print("请确保 pymem, yara-python, pycryptodome 已安装") sys.exit(1)