improvement(media): 提升图片/表情资源解密稳定性并自愈缓存

- 增加图片有效性校验并优化 JPEG/WebP 识别,降低 XOR 解密误判
- 图片资源按变体(b/h/c/t)与大小排序尝试多个候选,提高命中率
- 发现损坏缓存文件时自动删除并重建(单张获取与批量解密均生效)
- 表情本地解密失败时支持从 emoticon.db 安全回退下载,并支持 AES-CBC 解密
This commit is contained in:
2977094657
2025-12-26 21:46:07 +08:00
parent 761648f15a
commit 69fe7fbf88
3 changed files with 523 additions and 47 deletions

View File

@@ -2,6 +2,7 @@ import ctypes
import datetime
import glob
import hashlib
import ipaddress
import json
import mimetypes
import os
@@ -11,6 +12,7 @@ import struct
from functools import lru_cache
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
from fastapi import HTTPException
@@ -74,15 +76,391 @@ def _detect_image_media_type(data: bytes) -> str:
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "image/png"
if data.startswith(b"\xff\xd8\xff"):
return "image/jpeg"
if data.startswith(b"\xff\xd8\xff") and len(data) >= 4:
marker = data[3]
# Most JPEG marker types are in 0xC0..0xFE (APP, SOF, DQT, DHT, SOS, COM, etc.).
# This avoids false positives where random bytes start with 0xFFD8FF.
if marker not in (0x00, 0xFF) and marker >= 0xC0:
return "image/jpeg"
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
return "image/gif"
if data.startswith(b"RIFF") and data[8:12] == b"WEBP":
if data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
return "image/webp"
return "application/octet-stream"
def _is_probably_valid_image(data: bytes, media_type: str) -> bool:
"""Heuristic validation to reduce false positives when guessing XOR keys.
We keep it lightweight (no full parsing), only checking common trailers.
"""
if not data:
return False
mt = str(media_type or "").strip().lower()
if not mt.startswith("image/"):
return False
if mt == "image/jpeg":
if _detect_image_media_type(data[:32]) != "image/jpeg":
return False
trimmed = data.rstrip(b"\x00")
if len(trimmed) < 4 or not trimmed.startswith(b"\xff\xd8\xff"):
return False
if trimmed.endswith(b"\xff\xd9"):
return True
tail = trimmed[-4096:] if len(trimmed) > 4096 else trimmed
i = tail.rfind(b"\xff\xd9")
return i >= 0 and i >= len(tail) - 64 - 2
if mt == "image/png":
if not data.startswith(b"\x89PNG\r\n\x1a\n"):
return False
trailer = b"\x00\x00\x00\x00IEND\xaeB`\x82"
trimmed = data.rstrip(b"\x00")
if trimmed.endswith(trailer):
return True
tail = trimmed[-256:] if len(trimmed) > 256 else trimmed
i = tail.rfind(trailer)
return i >= 0 and i >= len(tail) - 64 - len(trailer)
if mt == "image/gif":
if not (data.startswith(b"GIF87a") or data.startswith(b"GIF89a")):
return False
trimmed = data.rstrip(b"\x00")
if trimmed.endswith(b"\x3B"):
return True
tail = trimmed[-256:] if len(trimmed) > 256 else trimmed
i = tail.rfind(b"\x3B")
return i >= 0 and i >= len(tail) - 16 - 1
if mt == "image/webp":
if len(data) < 12:
return False
return bool(data.startswith(b"RIFF") and data[8:12] == b"WEBP")
# Unknown image types: fall back to header-only check.
return _detect_image_media_type(data[:32]) != "application/octet-stream"
def _normalize_variant_basename(name: str) -> str:
"""Normalize a media filename stem by stripping common variant suffixes.
Mirrors echotrace's idea of normalizing `.t/.h/.b/.c` and `_t/_h/_b/_c`.
"""
v = str(name or "").strip()
if not v:
return ""
lower = v.lower()
for suf in ("_b", "_h", "_c", "_t", ".b", ".h", ".c", ".t"):
if lower.endswith(suf) and len(lower) > len(suf):
return lower[: -len(suf)]
return lower
def _variant_rank(name: str) -> int:
"""Ordering used when trying multiple candidate resources.
Prefer: big > high > original > cache > thumb.
"""
n = str(name or "").lower()
if n.endswith(("_b", ".b")):
return 0
if n.endswith(("_h", ".h")):
return 1
if n.endswith(("_c", ".c")):
return 3
if n.endswith(("_t", ".t")):
return 4
return 2
def _iter_media_source_candidates(source: Path, *, limit: int = 30) -> list[Path]:
"""Yield sibling variant files around a resolved source path.
This is a lightweight approximation of echotrace's \"search many .dat variants then try them\".
"""
if not source:
return []
try:
if not source.exists():
return []
except Exception:
return []
try:
if source.is_dir():
return []
except Exception:
return []
out: list[Path] = []
try:
out.append(source.resolve())
except Exception:
out.append(source)
parent = source.parent
stem = str(source.stem or "")
base = _normalize_variant_basename(stem)
if not base:
return out
preferred_names = [
f"{base}_b.dat",
f"{base}_h.dat",
f"{base}.dat",
f"{base}_c.dat",
f"{base}_t.dat",
f"{base}.b.dat",
f"{base}.h.dat",
f"{base}.c.dat",
f"{base}.t.dat",
f"{base}.gif",
f"{base}.webp",
f"{base}.png",
f"{base}.jpg",
f"{base}.jpeg",
]
for name in preferred_names:
p = parent / name
try:
if p.exists() and p.is_file():
out.append(p.resolve())
except Exception:
continue
# Add any other local .dat siblings with the same normalized base (limit to avoid explosion).
try:
for p in parent.glob(f"{base}*.dat"):
try:
if p.exists() and p.is_file():
out.append(p.resolve())
except Exception:
continue
if len(out) >= int(limit):
break
except Exception:
pass
# De-dup while keeping order.
seen: set[str] = set()
uniq: list[Path] = []
for p in out:
try:
k = str(p.resolve())
except Exception:
k = str(p)
if k in seen:
continue
seen.add(k)
uniq.append(p)
return uniq
def _order_media_candidates(paths: list[Path]) -> list[Path]:
"""Sort candidate files similar to echotrace's variant preference + size heuristic."""
def _stat(p: Path) -> tuple[int, float]:
try:
st = p.stat()
return int(st.st_size), float(st.st_mtime)
except Exception:
return 0, 0.0
def key(p: Path) -> tuple[int, int, int, float, str]:
name = str(p.stem or "").lower()
rank = _variant_rank(name)
ext = str(p.suffix or "").lower()
# Prefer already-decoded formats (non-.dat) within the same variant rank.
ext_penalty = 1 if ext == ".dat" else 0
size, mtime = _stat(p)
return (rank, ext_penalty, -size, -mtime, str(p))
try:
return sorted(list(paths or []), key=key)
except Exception:
return list(paths or [])
def _is_safe_http_url(url: str) -> bool:
u = str(url or "").strip()
if not u:
return False
try:
p = urlparse(u)
except Exception:
return False
if p.scheme not in ("http", "https"):
return False
host = (p.hostname or "").strip()
if not host:
return False
if host in {"localhost"}:
return False
try:
ip = ipaddress.ip_address(host)
if ip.is_private or ip.is_loopback or ip.is_link_local:
return False
except Exception:
pass
return True
def _download_http_bytes(url: str, *, timeout: int = 20, max_bytes: int = 30 * 1024 * 1024) -> bytes:
if not _is_safe_http_url(url):
raise HTTPException(status_code=400, detail="Unsafe URL.")
try:
import requests
except Exception as e:
raise HTTPException(status_code=500, detail=f"requests not available: {e}")
try:
with requests.get(url, stream=True, timeout=timeout) as r:
r.raise_for_status()
try:
cl = int(r.headers.get("content-length") or 0)
if cl and cl > int(max_bytes):
raise HTTPException(status_code=413, detail="Remote file too large.")
except HTTPException:
raise
except Exception:
pass
chunks: list[bytes] = []
total = 0
for chunk in r.iter_content(chunk_size=256 * 1024):
if not chunk:
continue
chunks.append(chunk)
total += len(chunk)
if total > int(max_bytes):
raise HTTPException(status_code=413, detail="Remote file too large.")
return b"".join(chunks)
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=502, detail=f"Download failed: {e}")
def _decrypt_emoticon_aes_cbc(data: bytes, aes_key_hex: str) -> Optional[bytes]:
"""Decrypt WeChat emoticon payload from kNonStoreEmoticonTable.encrypt_url.
Observed scheme (WeChat 4.x):
- key = bytes.fromhex(aes_key_hex) (16 bytes)
- iv = key
- cipher = AES-128-CBC
- padding = PKCS7
"""
if not data:
return None
if len(data) % 16 != 0:
return None
khex = str(aes_key_hex or "").strip().lower()
if not re.fullmatch(r"[0-9a-f]{32}", khex):
return None
try:
key = bytes.fromhex(khex)
if len(key) != 16:
return None
except Exception:
return None
try:
from Crypto.Cipher import AES
from Crypto.Util import Padding
pt_padded = AES.new(key, AES.MODE_CBC, iv=key).decrypt(data)
pt = Padding.unpad(pt_padded, AES.block_size)
return pt
except Exception:
return None
@lru_cache(maxsize=2048)
def _lookup_emoticon_info(account_dir_str: str, md5: str) -> dict[str, str]:
account_dir = Path(account_dir_str)
md5s = str(md5 or "").strip().lower()
if not md5s:
return {}
db_path = account_dir / "emoticon.db"
if not db_path.exists():
return {}
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
row = conn.execute(
"SELECT md5, aes_key, cdn_url, encrypt_url, extern_url, thumb_url, tp_url "
"FROM kNonStoreEmoticonTable WHERE lower(md5) = lower(?) LIMIT 1",
(md5s,),
).fetchone()
if not row:
return {}
return {k: str(row[k] or "") for k in row.keys()}
except Exception:
return {}
finally:
try:
conn.close()
except Exception:
pass
def _try_fetch_emoticon_from_remote(account_dir: Path, md5: str) -> tuple[Optional[bytes], Optional[str]]:
info = _lookup_emoticon_info(str(account_dir), str(md5 or "").lower())
if not info:
return None, None
aes_key_hex = str(info.get("aes_key") or "").strip()
urls: list[str] = []
# Prefer plain CDN URL first; fall back to encrypt_url (needs AES-CBC decrypt).
for k in ("cdn_url", "extern_url", "thumb_url", "tp_url", "encrypt_url"):
u = str(info.get(k) or "").strip()
if u and _is_safe_http_url(u):
urls.append(u)
for url in urls:
try:
payload = _download_http_bytes(url)
except Exception:
continue
candidates: list[bytes] = [payload]
dec = _decrypt_emoticon_aes_cbc(payload, aes_key_hex)
if dec is not None:
candidates.insert(0, dec)
for data in candidates:
if not data:
continue
try:
data2, mt = _try_strip_media_prefix(data)
except Exception:
data2, mt = data, "application/octet-stream"
if mt == "application/octet-stream":
mt = _detect_image_media_type(data2[:32])
if mt == "application/octet-stream":
try:
if len(data2) >= 8 and data2[4:8] == b"ftyp":
mt = "video/mp4"
except Exception:
pass
if mt.startswith("image/") and (not _is_probably_valid_image(data2, mt)):
continue
if mt != "application/octet-stream":
return data2, mt
return None, None
class _WxAMConfig(ctypes.Structure):
_fields_ = [
("mode", ctypes.c_int),
@@ -191,7 +569,7 @@ def _try_strip_media_prefix(data: bytes) -> tuple[bytes, str]:
if j >= 0 and j <= 128 * 1024:
sliced = data[j:]
mt2 = _detect_image_media_type(sliced[:32])
if mt2 != "application/octet-stream":
if mt2 != "application/octet-stream" and _is_probably_valid_image(sliced, mt2):
return sliced, mt2
try:
@@ -363,7 +741,7 @@ def _resolve_media_path_from_hardlink(
quoted = _quote_ident(table_name)
try:
row = conn.execute(
f"SELECT dir1, dir2, file_name, modify_time FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC LIMIT 1",
f"SELECT dir1, dir2, file_name, modify_time FROM {quoted} WHERE md5 = ? ORDER BY modify_time DESC, dir1 DESC, rowid DESC LIMIT 1",
(md5,),
).fetchone()
except Exception:
@@ -917,9 +1295,10 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st
# (offset, magic, media_type)
candidates: list[tuple[int, bytes, str]] = [
(0, b"\x89PNG\r\n\x1a\n", "image/png"),
(0, b"\xff\xd8\xff", "image/jpeg"),
(0, b"GIF87a", "image/gif"),
(0, b"GIF89a", "image/gif"),
(0, b"RIFF", "application/octet-stream"),
(4, b"ftyp", "video/mp4"),
(0, b"wxgf", "application/octet-stream"),
(1, b"wxgf", "application/octet-stream"),
(2, b"wxgf", "application/octet-stream"),
@@ -936,8 +1315,8 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st
(13, b"wxgf", "application/octet-stream"),
(14, b"wxgf", "application/octet-stream"),
(15, b"wxgf", "application/octet-stream"),
(0, b"RIFF", "application/octet-stream"),
(4, b"ftyp", "video/mp4"),
# JPEG magic is short (3 bytes), keep it last to reduce false positives.
(0, b"\xff\xd8\xff", "image/jpeg"),
]
for offset, magic, mt in candidates:
@@ -968,16 +1347,24 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st
if offset == 0 and magic == b"RIFF":
if len(decoded) >= 12 and decoded[8:12] == b"WEBP":
return decoded, "image/webp"
if _is_probably_valid_image(decoded, "image/webp"):
return decoded, "image/webp"
continue
if mt == "application/octet-stream":
mt2 = _detect_image_media_type(decoded[:32])
if mt2 != "application/octet-stream":
return decoded, mt2
if mt == "video/mp4":
try:
if len(decoded) >= 8 and decoded[4:8] == b"ftyp":
return decoded, "video/mp4"
except Exception:
pass
continue
return decoded, mt
mt2 = _detect_image_media_type(decoded[:32])
if mt2 != mt:
continue
if not _is_probably_valid_image(decoded, mt2):
continue
return decoded, mt2
preview_len = 8192
try:
@@ -1005,6 +1392,8 @@ def _try_xor_decrypt_by_magic(data: bytes) -> tuple[Optional[bytes], Optional[st
decoded = bytes(b ^ key for b in data)
dec2, mt2 = _try_strip_media_prefix(decoded)
if mt2 != "application/octet-stream":
if mt2.startswith("image/") and (not _is_probably_valid_image(dec2, mt2)):
continue
return dec2, mt2
except Exception:
continue
@@ -1193,13 +1582,15 @@ def _detect_image_extension(data: bytes) -> str:
"""根据图片数据检测文件扩展名"""
if not data:
return "dat"
if data.startswith(b"\x89PNG\r\n\x1a\n"):
head = data[:32] if len(data) > 32 else data
mt = _detect_image_media_type(head)
if mt == "image/png":
return "png"
if data.startswith(b"\xff\xd8\xff"):
if mt == "image/jpeg":
return "jpg"
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
if mt == "image/gif":
return "gif"
if data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
if mt == "image/webp":
return "webp"
return "dat"
@@ -1262,18 +1653,19 @@ def _read_and_maybe_decrypt_media(
try:
data_pref = path.read_bytes()
# Only accept prefix stripping when it looks like a real image/video,
# otherwise encrypted/random bytes may trigger false positives.
stripped, mtp = _try_strip_media_prefix(data_pref)
if mtp != "application/octet-stream":
return stripped, mtp
if mtp.startswith("image/") and (not _is_probably_valid_image(stripped, mtp)):
pass
else:
return stripped, mtp
except Exception:
pass
data = path.read_bytes()
dec, mt2 = _try_xor_decrypt_by_magic(data)
if dec is not None and mt2:
return dec, mt2
# Try WeChat .dat v1/v2 decrypt.
version = _detect_wechat_dat_version(data)
if version in (0, 1, 2):
@@ -1356,8 +1748,23 @@ def _read_and_maybe_decrypt_media(
except Exception:
pass
# Fallback: try guessing XOR key by magic (only after key-based decrypt attempts).
# For V4 signature files, XOR guessing is not applicable and may be expensive.
if version in (0, -1):
dec, mt2 = _try_xor_decrypt_by_magic(data)
if dec is not None and mt2:
return dec, mt2
# Fallback: return as-is.
mt3 = _guess_media_type_by_path(path, fallback="application/octet-stream")
if mt3.startswith("image/") and (not _is_probably_valid_image(data, mt3)):
mt3 = "application/octet-stream"
if mt3 == "video/mp4":
try:
if not (len(data) >= 8 and data[4:8] == b"ftyp"):
mt3 = "application/octet-stream"
except Exception:
mt3 = "application/octet-stream"
return data, mt3