improvement(media): 移除进程提取密钥并优化媒体解密

- 移除 pymem/yara-python 依赖,明确仅使用 wx_key 获取密钥

- 删除 media_key_finder.py,简化媒体密钥与资源定位逻辑

- 更新媒体接口/脚本与导出说明,避免误导进程提取能力
This commit is contained in:
2977094657
2025-12-25 20:26:56 +08:00
parent 1184dfcc33
commit 7a7069dcf7
8 changed files with 67 additions and 916 deletions

View File

@@ -8,13 +8,9 @@ import os
import re
import sqlite3
import struct
import threading
from collections import Counter
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache
from pathlib import Path
from typing import Any, Optional
from ctypes import wintypes
from fastapi import HTTPException
@@ -22,11 +18,6 @@ from .logging_config import get_logger
logger = get_logger(__name__)
try:
import psutil # type: ignore
except Exception:
psutil = None
# 仓库根目录(用于定位 output/databases
_REPO_ROOT = Path(__file__).resolve().parents[2]
@@ -1031,376 +1022,6 @@ def _detect_wechat_dat_version(data: bytes) -> int:
return 2
return 0
def _extract_yyyymm_for_sort(p: Path) -> str:
m = re.search(r"(\d{4}-\d{2})", str(p))
return m.group(1) if m else "0000-00"
@lru_cache(maxsize=16)
def _get_wechat_template_most_common_last2(weixin_root_str: str) -> Optional[bytes]:
try:
root = Path(weixin_root_str)
if not root.exists() or not root.is_dir():
return None
except Exception:
return None
try:
template_files = list(root.rglob("*_t.dat"))
except Exception:
template_files = []
if not template_files:
return None
template_files.sort(key=_extract_yyyymm_for_sort, reverse=True)
last_bytes_list: list[bytes] = []
for file in template_files[:16]:
try:
with open(file, "rb") as f:
f.seek(-2, 2)
b2 = f.read(2)
if b2 and len(b2) == 2:
last_bytes_list.append(b2)
except Exception:
continue
if not last_bytes_list:
return None
return Counter(last_bytes_list).most_common(1)[0][0]
@lru_cache(maxsize=16)
def _find_wechat_xor_key(weixin_root_str: str) -> Optional[int]:
try:
root = Path(weixin_root_str)
if not root.exists() or not root.is_dir():
return None
except Exception:
return None
most_common = _get_wechat_template_most_common_last2(weixin_root_str)
if not most_common or len(most_common) != 2:
return None
x, y = most_common[0], most_common[1]
xor_key = x ^ 0xFF
if xor_key != (y ^ 0xD9):
return None
return xor_key
def _get_wechat_v2_ciphertext(weixin_root: Path, most_common_last2: bytes) -> Optional[bytes]:
try:
template_files = list(weixin_root.rglob("*_t.dat"))
except Exception:
return None
if not template_files:
return None
template_files.sort(key=_extract_yyyymm_for_sort, reverse=True)
sig = b"\x07\x08V2\x08\x07"
def try_read_ct(file: Path, require_last2: bool) -> Optional[bytes]:
try:
with open(file, "rb") as f:
if f.read(6) != sig:
return None
if require_last2 and most_common_last2 and len(most_common_last2) == 2:
try:
f.seek(-2, 2)
if f.read(2) != most_common_last2:
return None
except Exception:
return None
f.seek(0xF)
ct = f.read(16)
if ct and len(ct) == 16:
return ct
except Exception:
return None
return None
# Prefer matching last2 bytes (older heuristic), but fall back to any V2 template like wx_key.
if most_common_last2 and len(most_common_last2) == 2:
for file in template_files:
ct = try_read_ct(file, require_last2=True)
if ct:
return ct
for file in template_files:
ct = try_read_ct(file, require_last2=False)
if ct:
return ct
return None
def _verify_wechat_aes_key(ciphertext: bytes, key16: bytes) -> bool:
try:
from Crypto.Cipher import AES
cipher = AES.new(key16[:16], AES.MODE_ECB)
plain = cipher.decrypt(ciphertext)
if plain.startswith(b"\xff\xd8\xff"):
return True
if plain.startswith(b"\x89PNG\r\n\x1a\n"):
return True
if plain.startswith(b"GIF87a") or plain.startswith(b"GIF89a"):
return True
if plain.startswith(b"wxgf"):
return True
if len(plain) >= 12 and plain.startswith(b"RIFF") and plain[8:12] == b"WEBP":
return True
if len(plain) >= 8 and plain[4:8] == b"ftyp":
return True
return False
except Exception:
return False
class _MEMORY_BASIC_INFORMATION(ctypes.Structure):
_fields_ = [
("BaseAddress", ctypes.c_void_p),
("AllocationBase", ctypes.c_void_p),
("AllocationProtect", ctypes.c_ulong),
("RegionSize", ctypes.c_size_t),
("State", ctypes.c_ulong),
("Protect", ctypes.c_ulong),
("Type", ctypes.c_ulong),
]
def _find_weixin_pids() -> list[int]:
if psutil is None:
return []
preferred = ["weixin.exe", "wechat.exe", "wechatappex.exe", "wechatapp.exe"]
preferred_set = set(preferred)
pids_by_name: dict[str, list[int]] = {n: [] for n in preferred}
extra: list[int] = []
for p in psutil.process_iter(["pid", "name"]):
try:
name = (p.info.get("name") or "").lower()
pid = int(p.info.get("pid") or 0)
except Exception:
continue
if pid <= 0:
continue
if name in preferred_set:
pids_by_name[name].append(pid)
continue
if name.startswith("wechat") or name.startswith("weixin"):
extra.append(pid)
ordered: list[int] = []
for n in preferred:
ordered.extend(pids_by_name.get(n, []))
ordered.extend(extra)
seen: set[int] = set()
out: list[int] = []
for pid in ordered:
if pid in seen:
continue
seen.add(pid)
out.append(pid)
return out
def _try_enable_windows_debug_privilege() -> None:
if os.name != "nt":
return
try:
advapi32 = ctypes.windll.advapi32
kernel32 = ctypes.windll.kernel32
TOKEN_ADJUST_PRIVILEGES = 0x0020
TOKEN_QUERY = 0x0008
SE_PRIVILEGE_ENABLED = 0x0002
class _LUID(ctypes.Structure):
_fields_ = [("LowPart", wintypes.DWORD), ("HighPart", wintypes.LONG)]
class _LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [("Luid", _LUID), ("Attributes", wintypes.DWORD)]
class _TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [("PrivilegeCount", wintypes.DWORD), ("Privileges", _LUID_AND_ATTRIBUTES * 1)]
token = wintypes.HANDLE()
if not advapi32.OpenProcessToken(
kernel32.GetCurrentProcess(),
TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
ctypes.byref(token),
):
return
try:
luid = _LUID()
if not advapi32.LookupPrivilegeValueW(None, "SeDebugPrivilege", ctypes.byref(luid)):
return
tp = _TOKEN_PRIVILEGES()
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
advapi32.AdjustTokenPrivileges(token, False, ctypes.byref(tp), 0, None, None)
finally:
kernel32.CloseHandle(token)
except Exception:
return
def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
_try_enable_windows_debug_privilege()
pids = _find_weixin_pids()
if not pids:
return None
PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
MEM_PRIVATE = 0x20000
PAGE_NOACCESS = 0x01
PAGE_READONLY = 0x02
PAGE_READWRITE = 0x04
PAGE_WRITECOPY = 0x08
PAGE_EXECUTE_READ = 0x20
PAGE_EXECUTE_READWRITE = 0x40
PAGE_EXECUTE_WRITECOPY = 0x80
PAGE_GUARD = 0x100
kernel32 = ctypes.windll.kernel32
OpenProcess = kernel32.OpenProcess
OpenProcess.argtypes = [ctypes.c_ulong, ctypes.c_bool, ctypes.c_ulong]
OpenProcess.restype = ctypes.c_void_p
ReadProcessMemory = kernel32.ReadProcessMemory
ReadProcessMemory.argtypes = [
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_size_t,
ctypes.POINTER(ctypes.c_size_t),
]
ReadProcessMemory.restype = ctypes.c_bool
VirtualQueryEx = kernel32.VirtualQueryEx
VirtualQueryEx.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_size_t]
VirtualQueryEx.restype = ctypes.c_size_t
CloseHandle = kernel32.CloseHandle
CloseHandle.argtypes = [ctypes.c_void_p]
CloseHandle.restype = ctypes.c_bool
readable_mask = (
PAGE_READONLY
| PAGE_READWRITE
| PAGE_WRITECOPY
| PAGE_EXECUTE_READ
| PAGE_EXECUTE_READWRITE
| PAGE_EXECUTE_WRITECOPY
)
def is_readable(protect: int) -> bool:
if protect & PAGE_GUARD:
return False
if protect & PAGE_NOACCESS:
return False
return bool(protect & readable_mask)
# Keep pattern consistent with wx_key: search for 16/32 lower/upper alpha-num strings with word-boundary-like guards.
# (Using 32 first reduces false positives in some builds.)
pattern = re.compile(rb"(?i)(?<![0-9a-z])([0-9a-z]{32}|[0-9a-z]{16})(?![0-9a-z])")
def scan_pid(pid: int) -> Optional[bytes]:
handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
if not handle:
return None
stop = threading.Event()
result: list[Optional[bytes]] = [None]
def read_mem(addr: int, size: int) -> Optional[bytes]:
buf = ctypes.create_string_buffer(size)
read = ctypes.c_size_t(0)
ok = ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, size, ctypes.byref(read))
if not ok or read.value <= 0:
return None
return buf.raw[: read.value]
def scan_region(base: int, region_size: int) -> Optional[bytes]:
chunk = 4 * 1024 * 1024
offset = 0
tail = b""
while offset < region_size and not stop.is_set():
to_read = min(chunk, region_size - offset)
b = read_mem(base + offset, int(to_read))
if not b:
# Don't abort the whole region on a single read failure (wx_key keeps scanning).
offset += to_read
tail = b""
continue
data = tail + b
for m in pattern.finditer(data):
cand = m.group(1)
if len(cand) == 32:
# wx_key uses key[:16] to validate; keep that but also try the second half for compatibility.
candidates = [cand[:16], cand[16:]]
else:
candidates = [cand]
for cand16 in candidates:
if _verify_wechat_aes_key(ciphertext, cand16):
return cand16
tail = data[-64:] if len(data) > 64 else data
offset += to_read
return None
regions: list[tuple[int, int]] = []
mbi = _MEMORY_BASIC_INFORMATION()
addr = 0
try:
while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)):
try:
if int(mbi.State) == MEM_COMMIT and int(mbi.Type) == MEM_PRIVATE:
protect = int(mbi.Protect)
if is_readable(protect):
base = int(mbi.BaseAddress)
size = int(mbi.RegionSize)
if size > 0:
# Skip extremely large regions to keep runtime bounded (same idea as wx_key).
if size <= 100 * 1024 * 1024:
regions.append((base, size))
addr = int(mbi.BaseAddress) + int(mbi.RegionSize)
except Exception:
addr += 0x1000
if addr <= 0:
break
with ThreadPoolExecutor(max_workers=min(32, max(1, len(regions)))) as ex:
for found in ex.map(lambda r: scan_region(r[0], r[1]), regions):
if found:
result[0] = found
stop.set()
break
finally:
CloseHandle(handle)
return result[0]
for pid in pids:
found = scan_pid(pid)
if found:
return found
return None
@lru_cache(maxsize=4096)
def _fallback_search_media_by_file_id(
weixin_root_str: str,
@@ -1495,11 +1116,17 @@ def _fallback_search_media_by_file_id(
return None
def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None:
def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: Optional[bytes] = None) -> None:
try:
aes_str = ""
if aes_key16:
try:
aes_str = aes_key16.decode("ascii", errors="ignore")[:16]
except Exception:
aes_str = ""
payload = {
"xor": int(xor_key),
"aes": aes_key16.decode("ascii", errors="ignore"),
"aes": aes_str,
}
(account_dir / "_media_keys.json").write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
@@ -1650,17 +1277,13 @@ def _read_and_maybe_decrypt_media(
# Try WeChat .dat v1/v2 decrypt.
version = _detect_wechat_dat_version(data)
if version in (0, 1, 2):
root = weixin_root
if root is None and account_dir is not None:
root = _resolve_account_wxid_dir(account_dir)
if root is None and account_dir is not None:
ds = _resolve_account_db_storage_dir(account_dir)
root = ds.parent if ds else None
xor_key = _find_wechat_xor_key(str(root)) if root else None
if xor_key is None and account_dir is not None:
# 不在本项目内做任何密钥提取仅使用用户保存的密钥_media_keys.json
xor_key: Optional[int] = None
aes_key16 = b""
if account_dir is not None:
try:
keys2 = _load_media_keys(account_dir)
x2 = keys2.get("xor")
if x2 is not None:
xor_key = int(x2)
@@ -1668,8 +1291,13 @@ def _read_and_maybe_decrypt_media(
xor_key = None
else:
logger.debug("使用 _media_keys.json 中保存的 xor key")
aes_str = str(keys2.get("aes") or "").strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
except Exception:
xor_key = None
aes_key16 = b""
try:
if version == 0 and xor_key is not None:
out = _decrypt_wechat_dat_v3(data, xor_key)
@@ -1707,41 +1335,24 @@ def _read_and_maybe_decrypt_media(
mt1 = _detect_image_media_type(out[:32])
if mt1 != "application/octet-stream":
return out, mt1
elif version == 2 and xor_key is not None and account_dir is not None and root is not None:
keys = _load_media_keys(account_dir)
aes_str = str(keys.get("aes") or "").strip()
aes_key16 = aes_str.encode("ascii", errors="ignore")[:16] if aes_str else b""
if not aes_key16:
most_common = _get_wechat_template_most_common_last2(str(root))
if most_common:
ct = _get_wechat_v2_ciphertext(Path(root), most_common)
elif version == 2 and xor_key is not None and aes_key16:
out = _decrypt_wechat_dat_v4(data, xor_key, aes_key16)
try:
out2, mtp2 = _try_strip_media_prefix(out)
if mtp2 != "application/octet-stream":
return out2, mtp2
except Exception:
pass
if out.startswith(b"wxgf"):
converted = _wxgf_to_image_bytes(out)
if converted:
out = converted
logger.info(f"wxgf->image: {path} -> {len(out)} bytes")
else:
ct = None
if ct:
aes_key16 = _extract_wechat_aes_key_from_process(ct) or b""
if aes_key16:
_save_media_keys(account_dir, xor_key, aes_key16)
if aes_key16:
out = _decrypt_wechat_dat_v4(data, xor_key, aes_key16)
try:
out2, mtp2 = _try_strip_media_prefix(out)
if mtp2 != "application/octet-stream":
return out2, mtp2
except Exception:
pass
if out.startswith(b"wxgf"):
converted = _wxgf_to_image_bytes(out)
if converted:
out = converted
logger.info(f"wxgf->image: {path} -> {len(out)} bytes")
else:
logger.info(f"wxgf->image failed: {path}")
mt2b = _detect_image_media_type(out[:32])
if mt2b != "application/octet-stream":
return out, mt2b
logger.info(f"wxgf->image failed: {path}")
mt2b = _detect_image_media_type(out[:32])
if mt2b != "application/octet-stream":
return out, mt2b
except Exception:
pass