improvement(media): 聊天媒体支持 file_id 兜底定位

- 图片/视频消息无 MD5 时,解析并下发 file_id,用于本地资源兜底定位与展示
- 后端 chat_media/open_folder 支持 md5/file_id;视频优先可 Range 的文件响应,并在需要时解密落盘
- 前端聊天页与 API 调用适配 file_id;补充媒体 URL 可用性判断
- 解密页补充“获取密钥”提示,支持手动输入/保存密钥;README 同步说明;更新音频图标资源
This commit is contained in:
2977094657
2025-12-23 16:39:38 +08:00
parent a4d652230f
commit 36f5067730
9 changed files with 790 additions and 126 deletions

View File

@@ -1,5 +1,6 @@
import ctypes
import datetime
import glob
import hashlib
import json
import mimetypes
@@ -371,7 +372,7 @@ def _resolve_media_path_from_hardlink(
quoted = _quote_ident(table_name)
try:
row = conn.execute(
f"SELECT dir1, dir2, file_name FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1",
f"SELECT dir1, dir2, file_name, modify_time FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1",
(md5,),
).fetchone()
except Exception:
@@ -383,6 +384,132 @@ def _resolve_media_path_from_hardlink(
if not file_name:
continue
if kind_key in {"video", "video_thumb"}:
roots: list[Path] = []
for r in [wxid_dir] + (extra_roots or []):
if not r:
continue
try:
rr = r.resolve()
except Exception:
rr = r
if rr not in roots:
roots.append(rr)
def _iter_video_base_dirs(r: Path) -> list[Path]:
bases: list[Path] = []
try:
if r.exists() and r.is_dir():
pass
else:
return bases
except Exception:
return bases
candidates = [
r / "msg" / "video",
r / "video",
r if str(r.name).lower() == "video" else None,
]
for c in candidates:
if not c:
continue
try:
if c.exists() and c.is_dir():
bases.append(c)
except Exception:
continue
# de-dup while keeping order
seen: set[str] = set()
uniq: list[Path] = []
for b in bases:
try:
k = str(b.resolve())
except Exception:
k = str(b)
if k in seen:
continue
seen.add(k)
uniq.append(b)
return uniq
modify_time = None
try:
if row["modify_time"] is not None:
modify_time = int(row["modify_time"])
except Exception:
modify_time = None
guessed_month: Optional[str] = None
if modify_time and modify_time > 0:
try:
dt = datetime.datetime.fromtimestamp(int(modify_time))
guessed_month = f"{dt.year:04d}-{dt.month:02d}"
except Exception:
guessed_month = None
stem = Path(file_name).stem
if kind_key == "video":
file_variants = [file_name]
else:
# Prefer real thumbnails when possible.
file_variants = [
f"{stem}_thumb.jpg",
f"{stem}_thumb.jpeg",
f"{stem}_thumb.png",
f"{stem}_thumb.webp",
f"{stem}.jpg",
f"{stem}.jpeg",
f"{stem}.png",
f"{stem}.gif",
f"{stem}.webp",
f"{stem}.dat",
file_name,
]
for root in roots:
for base_dir in _iter_video_base_dirs(root):
dirs_to_check: list[Path] = []
if guessed_month:
dirs_to_check.append(base_dir / guessed_month)
dirs_to_check.append(base_dir)
for d in dirs_to_check:
try:
if not d.exists() or not d.is_dir():
continue
except Exception:
continue
for fv in file_variants:
p = d / fv
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
# Fallback: scan within the month directory for the exact file_name.
if guessed_month:
try:
for p in d.rglob(file_name):
try:
if p.is_file():
return p
except Exception:
continue
except Exception:
pass
# Final fallback: locate by name under msg/video and cache.
for base in _iter_video_base_dirs(wxid_dir):
try:
for p in base.rglob(file_name):
if p.is_file():
return p
except Exception:
pass
return None
if kind_key == "file":
try:
full_row = conn.execute(
@@ -973,20 +1100,38 @@ def _get_wechat_v2_ciphertext(weixin_root: Path, most_common_last2: bytes) -> Op
template_files.sort(key=_extract_yyyymm_for_sort, reverse=True)
sig = b"\x07\x08V2\x08\x07"
for file in template_files:
def try_read_ct(file: Path, require_last2: bool) -> Optional[bytes]:
try:
with open(file, "rb") as f:
if f.read(6) != sig:
continue
f.seek(-2, 2)
if f.read(2) != most_common_last2:
continue
return None
if require_last2 and most_common_last2 and len(most_common_last2) == 2:
try:
f.seek(-2, 2)
if f.read(2) != most_common_last2:
return None
except Exception:
return None
f.seek(0xF)
ct = f.read(16)
if ct and len(ct) == 16:
return ct
except Exception:
continue
return None
return None
# Prefer matching last2 bytes (older heuristic), but fall back to any V2 template like wx_key.
if most_common_last2 and len(most_common_last2) == 2:
for file in template_files:
ct = try_read_ct(file, require_last2=True)
if ct:
return ct
for file in template_files:
ct = try_read_ct(file, require_last2=False)
if ct:
return ct
return None
@@ -1120,8 +1265,6 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000
MEM_PRIVATE = 0x20000
MEM_MAPPED = 0x40000
MEM_IMAGE = 0x1000000
PAGE_NOACCESS = 0x01
PAGE_READONLY = 0x02
@@ -1172,7 +1315,9 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
return False
return bool(protect & readable_mask)
pattern = re.compile(rb"(?i)(?<![0-9a-z])([0-9a-z]{16}|[0-9a-z]{32})(?![0-9a-z])")
# Keep pattern consistent with wx_key: search for 16/32 lower/upper alpha-num strings with word-boundary-like guards.
# (Using 32 first reduces false positives in some builds.)
pattern = re.compile(rb"(?i)(?<![0-9a-z])([0-9a-z]{32}|[0-9a-z]{16})(?![0-9a-z])")
def scan_pid(pid: int) -> Optional[bytes]:
handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
@@ -1198,14 +1343,18 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
to_read = min(chunk, region_size - offset)
b = read_mem(base + offset, int(to_read))
if not b:
return None
# Don't abort the whole region on a single read failure (wx_key keeps scanning).
offset += to_read
tail = b""
continue
data = tail + b
for m in pattern.finditer(data):
cand = m.group(1)
if len(cand) == 16:
candidates = [cand]
else:
if len(cand) == 32:
# wx_key uses key[:16] to validate; keep that but also try the second half for compatibility.
candidates = [cand[:16], cand[16:]]
else:
candidates = [cand]
for cand16 in candidates:
if _verify_wechat_aes_key(ciphertext, cand16):
return cand16
@@ -1219,13 +1368,15 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
try:
while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)):
try:
if int(mbi.State) == MEM_COMMIT and int(mbi.Type) in {MEM_PRIVATE, MEM_MAPPED, MEM_IMAGE}:
if int(mbi.State) == MEM_COMMIT and int(mbi.Type) == MEM_PRIVATE:
protect = int(mbi.Protect)
if is_readable(protect):
base = int(mbi.BaseAddress)
size = int(mbi.RegionSize)
if size > 0:
regions.append((base, size))
# Skip extremely large regions to keep runtime bounded (same idea as wx_key).
if size <= 100 * 1024 * 1024:
regions.append((base, size))
addr = int(mbi.BaseAddress) + int(mbi.RegionSize)
except Exception:
addr += 0x1000
@@ -1250,6 +1401,100 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
return None
@lru_cache(maxsize=4096)
def _fallback_search_media_by_file_id(
weixin_root_str: str,
file_id: str,
kind: str = "",
username: str = "",
) -> Optional[str]:
"""在微信数据目录里按文件名file_id兜底查找媒体文件。
一些微信版本的图片消息不再直接提供 32 位 MD5而是提供形如 `cdnthumburl` 的长串标识,
本函数用于按文件名/前缀在 msg/attach、cache 等目录中定位对应的 .dat 资源文件。
"""
if not weixin_root_str or not file_id:
return None
try:
root = Path(weixin_root_str)
except Exception:
return None
kind_key = str(kind or "").lower().strip()
fid = str(file_id or "").strip()
if not fid:
return None
# 优先在当前会话的 attach 子目录中查找(显著减少扫描范围)
search_dirs: list[Path] = []
if username:
try:
chat_hash = hashlib.md5(str(username).encode()).hexdigest()
search_dirs.append(root / "msg" / "attach" / chat_hash)
except Exception:
pass
if kind_key == "file":
search_dirs.extend([root / "msg" / "file"])
elif kind_key == "video" or kind_key == "video_thumb":
search_dirs.extend([root / "msg" / "video", root / "cache"])
else:
search_dirs.extend([root / "msg" / "attach", root / "cache", root / "msg" / "file", root / "msg" / "video"])
# de-dup while keeping order
seen: set[str] = set()
uniq_dirs: list[Path] = []
for d in search_dirs:
try:
k = str(d.resolve())
except Exception:
k = str(d)
if k in seen:
continue
seen.add(k)
uniq_dirs.append(d)
base = glob.escape(fid)
has_suffix = bool(Path(fid).suffix)
patterns: list[str] = []
if has_suffix:
patterns.append(base)
else:
patterns.extend(
[
f"{base}_h.dat",
f"{base}_t.dat",
f"{base}.dat",
f"{base}*.dat",
f"{base}.jpg",
f"{base}.jpeg",
f"{base}.png",
f"{base}.gif",
f"{base}.webp",
f"{base}*",
]
)
for d in uniq_dirs:
try:
if not d.exists() or not d.is_dir():
continue
except Exception:
continue
for pat in patterns:
try:
for p in d.rglob(pat):
try:
if p.is_file():
return str(p)
except Exception:
continue
except Exception:
continue
return None
def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None:
try:
payload = {