diff --git a/src/wechat_decrypt_tool/isaac64.py b/src/wechat_decrypt_tool/isaac64.py index c1a8236..c0ee1a1 100644 --- a/src/wechat_decrypt_tool/isaac64.py +++ b/src/wechat_decrypt_tool/isaac64.py @@ -1,13 +1,23 @@ from __future__ import annotations -"""ISAAC-64 PRNG (WeFlow compatible). +"""ISAAC-64 PRNG (best-effort fallback). -WeChat SNS live photo/video decryption uses a keystream generated by ISAAC-64 and -XORs the first 128KB of the mp4 file. WeFlow's implementation reverses the -generated byte array, so we mirror that behavior for compatibility. +In this repo, Moments (SNS) *video* decryption uses a keystream generator that +matches WeFlow's WxIsaac64 (WASM) behavior and XORs only the first 128KB of the +MP4. + +This module provides a pure-Python ISAAC-64 implementation so the backend can +still attempt to generate a keystream when the WASM helper is unavailable. + +Notes: +- Moments *image* decryption is handled via `wcdb_api.dll` (`wcdb_decrypt_sns_image`) + because "ISAAC-64 full-file XOR" is not reliably reproducible for images across + different versions/samples. +- This ISAAC-64 implementation may not perfectly match WxIsaac64; treat it as + best-effort. """ -from typing import Any +from typing import Any, Literal _MASK_64 = 0xFFFFFFFFFFFFFFFF @@ -143,27 +153,58 @@ class Isaac64: self.bb = _u64(self.mm[(y >> 11) & 255] + x) self.randrsl[i] = self.bb - def get_next(self) -> int: + def rand_u64(self) -> int: + """Return the next ISAAC-64 output as an unsigned 64-bit integer. + + Note: The original reference `rand()` consumes `randrsl[]` in reverse order. + """ if self.randcnt == 0: self._isaac64() self.randcnt = 256 - idx = 256 - self.randcnt self.randcnt -= 1 - return _u64(self.randrsl[idx]) + return _u64(self.randrsl[self.randcnt]) - def generate_keystream(self, size: int) -> bytes: - """Generate a keystream of `size` bytes (must be multiple of 8).""" - if size <= 0: + # Backward-compatible alias (older callers used `get_next()`). + def get_next(self) -> int: # pragma: no cover + return self.rand_u64() + + KeystreamWordFormat = Literal["raw_le", "raw_be", "be_swap32", "le_swap32"] + + @staticmethod + def _raw_to_bytes(raw: int, word_format: KeystreamWordFormat) -> bytes: + """Serialize one 64-bit `rand()` output to 8 bytes. + + - raw_le/raw_be: direct endianness of the 64-bit integer. + - be_swap32: big-endian bytes with 32-bit halves swapped (BE(lo32)||BE(hi32)). + This matches the byte layout implied by the doc's `htonl(hi32)||htonl(lo32)` + pattern when the resulting u64 is read as bytes on little-endian hosts. + - le_swap32: little-endian bytes with 32-bit halves swapped. + """ + v = _u64(raw) + if word_format == "raw_le": + return int(v).to_bytes(8, "little", signed=False) + if word_format == "raw_be": + return int(v).to_bytes(8, "big", signed=False) + if word_format == "be_swap32": + b = int(v).to_bytes(8, "big", signed=False) + return b[4:8] + b[0:4] + if word_format == "le_swap32": + b = int(v).to_bytes(8, "little", signed=False) + return b[4:8] + b[0:4] + raise ValueError(f"Unknown ISAAC64 word_format: {word_format}") + + def generate_keystream(self, size: int, *, word_format: KeystreamWordFormat = "be_swap32") -> bytes: + """Generate a keystream of `size` bytes. + + This mirrors the decryption loop behavior: produce a new 8-byte keyblock + for every 8 bytes of input, and slice for tail bytes. + """ + want = int(size or 0) + if want <= 0: return b"" - if size % 8 != 0: - raise ValueError("ISAAC64 keystream size must be multiple of 8 bytes.") + blocks = (want + 7) // 8 out = bytearray() - count = size // 8 - for _ in range(count): - out.extend(int(self.get_next()).to_bytes(8, "little", signed=False)) - - # WeFlow reverses the entire byte array (Uint8Array.reverse()). - out.reverse() - return bytes(out) - + for _ in range(blocks): + out.extend(self._raw_to_bytes(self.rand_u64(), word_format)) + return bytes(out[:want]) diff --git a/src/wechat_decrypt_tool/routers/sns.py b/src/wechat_decrypt_tool/routers/sns.py index 0127cef..fc93776 100644 --- a/src/wechat_decrypt_tool/routers/sns.py +++ b/src/wechat_decrypt_tool/routers/sns.py @@ -26,6 +26,7 @@ from ..chat_helpers import _load_contact_rows, _pick_display_name, _resolve_acco from ..logging_config import get_logger from ..media_helpers import _read_and_maybe_decrypt_media, _resolve_account_wxid_dir from ..path_fix import PathFixRoute +from .. import sns_media as _sns_media from ..wcdb_realtime import ( WCDBRealtimeError, WCDB_REALTIME, @@ -2387,62 +2388,11 @@ def list_sns_media_candidates( def _is_allowed_sns_media_host(host: str) -> bool: - h = str(host or "").strip().lower() - if not h: - return False - # Images: qpic/qlogo. Thumbs: *.tc.qq.com. Videos/live photos: *.video.qq.com. - return ( - h.endswith(".qpic.cn") - or h.endswith(".qlogo.cn") - or h.endswith(".tc.qq.com") - or h.endswith(".video.qq.com") - ) + return _sns_media.is_allowed_sns_media_host(host) def _fix_sns_cdn_url(url: str, *, token: str = "", is_video: bool = False) -> str: - """WeFlow-compatible SNS CDN URL normalization. - - - Force https for Tencent CDNs. - - For images, replace `/150` with `/0` to request the original. - - If token is provided and url doesn't contain it, append `token=&idx=1`. - """ - u = html.unescape(str(url or "")).strip() - if not u: - return "" - - # Only touch Tencent CDNs; keep other URLs intact. - try: - p = urlparse(u) - host = str(p.hostname or "").lower() - if not _is_allowed_sns_media_host(host): - return u - except Exception: - return u - - # http -> https - u = re.sub(r"^http://", "https://", u, flags=re.I) - - # /150 -> /0 (image only) - if not is_video: - u = re.sub(r"/150(?=($|\\?))", "/0", u) - - tok = str(token or "").strip() - if tok and ("token=" not in u): - if is_video: - # Match WeFlow: place `token&idx=1` in front of existing query params. - base, sep, qs = u.partition("?") - if sep: - qs = qs.lstrip("&") - u = f"{base}?token={tok}&idx=1" - if qs: - u = f"{u}&{qs}" - else: - u = f"{u}?token={tok}&idx=1" - else: - connector = "&" if "?" in u else "?" - u = f"{u}{connector}token={tok}&idx=1" - - return u + return _sns_media.fix_sns_cdn_url(url, token=token, is_video=is_video) def _detect_mp4_ftyp(head: bytes) -> bool: @@ -2461,40 +2411,7 @@ def _weflow_wxisaac64_script_path() -> str: @lru_cache(maxsize=64) def _weflow_wxisaac64_keystream(key: str, size: int) -> bytes: - """Generate keystream via WeFlow's WASM (preferred; matches real decryption).""" - key_text = str(key or "").strip() - if not key_text or size <= 0: - return b"" - - # WeFlow is the source-of-truth; use its WASM first, then fall back to our pure-python ISAAC64. - script = _weflow_wxisaac64_script_path() - if not script: - script = "" - - if script: - try: - # The JS helper prints ONLY base64 bytes to stdout; keep stderr for debugging. - proc = subprocess.run( - ["node", script, key_text, str(int(size))], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - timeout=30, - check=False, - ) - if proc.returncode == 0: - out_b64 = (proc.stdout or b"").strip() - if out_b64: - return base64.b64decode(out_b64, validate=False) - except Exception: - pass - - # Fallback: pure python ISAAC64 (WeFlow-compatible reverse). - from ..isaac64 import Isaac64 # pylint: disable=import-outside-toplevel - - want = int(size) - # ISAAC64 generates 8-byte words; generate enough and slice. - size8 = ((want + 7) // 8) * 8 - return Isaac64(key_text).generate_keystream(size8)[:want] + return _sns_media.weflow_wxisaac64_keystream(key, size) _SNS_REMOTE_VIDEO_CACHE_EXTS = [ @@ -2595,55 +2512,7 @@ async def _download_sns_remote_to_file(url: str, dest_path: Path, *, max_bytes: def _maybe_decrypt_sns_video_file(path: Path, key: str) -> bool: - """Decrypt the first 128KB of an encrypted mp4 file in-place (WeFlow/Isaac64). - - Returns True if decryption was performed, False otherwise. - """ - key_text = str(key or "").strip() - if not key_text: - return False - - try: - size = int(path.stat().st_size) - except Exception: - return False - - if size <= 8: - return False - - decrypt_size = min(131072, size) - if decrypt_size <= 0: - return False - - try: - with path.open("r+b") as f: - head = f.read(8) - if _detect_mp4_ftyp(head): - return False - - f.seek(0) - buf = bytearray(f.read(decrypt_size)) - if not buf: - return False - - # Prefer WeFlow's real keystream generator (WASM) to ensure compatibility. - ks = _weflow_wxisaac64_keystream(key_text, decrypt_size) - n = min(len(buf), len(ks)) - for i in range(n): - buf[i] ^= ks[i] - - f.seek(0) - f.write(buf) - f.flush() - - f.seek(0) - head2 = f.read(8) - if _detect_mp4_ftyp(head2): - return True - # Still return True to indicate we mutated bytes; caller may treat as failure if desired. - return True - except Exception: - return False + return _sns_media.maybe_decrypt_sns_video_file(path, key) async def _materialize_sns_remote_video( @@ -2654,124 +2523,21 @@ async def _materialize_sns_remote_video( token: str, use_cache: bool, ) -> Optional[Path]: - """Download SNS video from CDN, decrypt (if needed), and return a local mp4 path.""" - fixed_url = _fix_sns_cdn_url(str(url or ""), token=str(token or ""), is_video=True) - if not fixed_url: - return None - - cache_dir, cache_stem = _sns_remote_video_cache_dir_and_stem(account_dir, url=fixed_url, key=str(key or "")) - - if use_cache: - existing = _sns_remote_video_cache_existing_path(cache_dir, cache_stem) - if existing is not None: - # Best-effort migrate legacy `.bin` -> `.mp4` when it's already decrypted. - try: - if existing.suffix.lower() == ".bin": - with existing.open("rb") as f: - head = f.read(8) - if _detect_mp4_ftyp(head): - target = cache_dir / f"{cache_stem}.mp4" - cache_dir.mkdir(parents=True, exist_ok=True) - os.replace(str(existing), str(target)) - existing = target - except Exception: - pass - return existing - - # Download to a temp file first. - cache_dir.mkdir(parents=True, exist_ok=True) - tmp_path = cache_dir / f"{cache_stem}.mp4.{time.time_ns()}.tmp" - try: - await _download_sns_remote_to_file(fixed_url, tmp_path, max_bytes=200 * 1024 * 1024) - except Exception: - try: - tmp_path.unlink(missing_ok=True) - except Exception: - pass - return None - - # Decrypt in-place (WeFlow ISAAC64) if the file isn't already a mp4. - _maybe_decrypt_sns_video_file(tmp_path, str(key or "")) - - # Validate: mp4 must have `ftyp` at offset 4. - ok_mp4 = False - try: - with tmp_path.open("rb") as f: - head = f.read(8) - ok_mp4 = _detect_mp4_ftyp(head) - except Exception: - ok_mp4 = False - - if not ok_mp4: - try: - tmp_path.unlink(missing_ok=True) - except Exception: - pass - return None - - if use_cache: - final_path = cache_dir / f"{cache_stem}.mp4" - try: - os.replace(str(tmp_path), str(final_path)) - except Exception: - # If rename fails, keep tmp_path as fallback. - final_path = tmp_path - - # Remove other extensions for the same cache key. - for other_ext in _SNS_REMOTE_VIDEO_CACHE_EXTS: - if other_ext.lower() == ".mp4": - continue - other = cache_dir / f"{cache_stem}{other_ext}" - try: - if other.exists() and other.is_file(): - other.unlink(missing_ok=True) - except Exception: - continue - - return final_path - - # Cache disabled: keep the decrypted tmp_path (caller should delete it). - return tmp_path + return await _sns_media.materialize_sns_remote_video( + account_dir=account_dir, + url=url, + key=key, + token=token, + use_cache=use_cache, + ) def _best_effort_unlink(path: str) -> None: - try: - Path(path).unlink(missing_ok=True) - except Exception: - pass + _sns_media.best_effort_unlink(path) def _detect_image_mime(data: bytes) -> str: - """Sniff image mime type by magic bytes. - - IMPORTANT: Do NOT trust HTTP Content-Type as a fallback here. We use this for - validating decrypted bytes. If we blindly trust `image/*`, a failed decrypt - would poison the disk cache and the frontend would keep showing broken images. - """ - if not data: - return "" - - if data.startswith(b"\xFF\xD8\xFF"): - return "image/jpeg" - if data.startswith(b"\x89PNG\r\n\x1a\n"): - return "image/png" - if len(data) >= 6 and data[:6] in (b"GIF87a", b"GIF89a"): - return "image/gif" - if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP": - return "image/webp" - if len(data) >= 12 and data[4:8] == b"ftyp": - # ISO BMFF based image formats (HEIF/HEIC/AVIF). - brand = data[8:12] - if brand == b"avif": - return "image/avif" - if brand in (b"heic", b"heix", b"hevc", b"hevx"): - return "image/heic" - if brand in (b"heif", b"mif1", b"msf1"): - return "image/heif" - if data.startswith(b"BM"): - return "image/bmp" - - return "" + return _sns_media.detect_image_mime(data) _SNS_REMOTE_CACHE_EXTS = [ @@ -2907,146 +2673,25 @@ async def _try_fetch_and_decrypt_sns_remote( token: str, use_cache: bool, ) -> Optional[Response]: - """Try WeFlow-style: download from CDN -> decrypt via wcdb_decrypt_sns_image -> return bytes. + """Try remote download+decrypt first (accurate when keys are present). Returns a Response on success, or None on failure so caller can fall back to local cache matching. """ - u_fixed = _fix_sns_cdn_url(url, token=token, is_video=False) - if not u_fixed: + res = await _sns_media.try_fetch_and_decrypt_sns_image_remote( + account_dir=account_dir, + url=str(url or ""), + key=str(key or ""), + token=str(token or ""), + use_cache=bool(use_cache), + ) + if res is None: return None - try: - p = urlparse(u_fixed) - host = str(p.hostname or "").strip().lower() - except Exception: - return None - if not _is_allowed_sns_media_host(host): - return None - - cache_dir, cache_stem = _sns_remote_cache_dir_and_stem(account_dir, url=u_fixed, key=str(key or "")) - if use_cache: - try: - existing = _sns_remote_cache_existing_path(cache_dir, cache_stem) - if existing is not None: - mt = _ext_to_mime(existing.suffix) - - # Upgrade legacy `.bin` cache to a proper image extension once. - if (existing.suffix or "").lower() == ".bin" or (not mt): - mt2 = _sniff_image_mime_from_file(existing) - if not mt2: - try: - existing.unlink(missing_ok=True) - except Exception: - pass - existing = None - else: - ext2 = _mime_to_ext(mt2) - if ext2 != ".bin": - try: - cache_dir.mkdir(parents=True, exist_ok=True) - desired = cache_dir / f"{cache_stem}{ext2}" - if desired.exists(): - # Another process/version already wrote the real file; drop legacy bin. - existing.unlink(missing_ok=True) - existing = desired - else: - os.replace(str(existing), str(desired)) - existing = desired - except Exception: - pass - mt = mt2 - - if existing is not None and mt: - return FileResponse( - existing, - media_type=mt, - headers={ - "Cache-Control": "public, max-age=86400", - "X-SNS-Source": "remote-cache", - }, - ) - except Exception: - pass - - try: - raw, content_type, x_enc = await _download_sns_remote_bytes(u_fixed) - except Exception as e: - logger.info("[sns] remote download failed: %s", e) - return None - - if not raw: - return None - - # First, validate whether the CDN already returned a real image. - mt_raw = _detect_image_mime(raw) - - decoded = raw - mt = mt_raw - decrypted = False - k = str(key or "").strip() - - # Only attempt decryption when bytes do NOT look like an image, or when CDN explicitly - # signals encryption (x-enc). Some endpoints return already-decoded PNG/JPEG even when - # urlAttrs.enc_idx == 1, and decrypting those would corrupt the bytes. - need_decrypt = bool(k) and (not mt_raw) and bool(raw) - if k and x_enc and str(x_enc).strip() not in ("0", "false", "False"): - need_decrypt = True - - if need_decrypt: - try: - decoded2 = _wcdb_decrypt_sns_image(raw, k) - mt2 = _detect_image_mime(decoded2) - if mt2: - decoded = decoded2 - mt = mt2 - decrypted = decoded2 != raw - else: - # Decrypt failed; if raw is a real image, keep it. Otherwise treat as failure. - if mt_raw: - decoded = raw - mt = mt_raw - decrypted = False - else: - return None - except Exception as e: - logger.info("[sns] remote decrypt failed: %s", e) - if not mt_raw: - return None - decoded = raw - mt = mt_raw - decrypted = False - - if not mt: - return None - - if use_cache: - try: - ext = _mime_to_ext(mt) - cache_dir.mkdir(parents=True, exist_ok=True) - cache_path = cache_dir / f"{cache_stem}{ext}" - - tmp = cache_path.with_suffix(cache_path.suffix + f".{time.time_ns()}.tmp") - tmp.write_bytes(decoded) - os.replace(str(tmp), str(cache_path)) - - # Remove other extensions for the same cache key to avoid stale duplicates. - for other_ext in _SNS_REMOTE_CACHE_EXTS: - if other_ext.lower() == ext.lower(): - continue - other = cache_dir / f"{cache_stem}{other_ext}" - try: - if other.exists() and other.is_file(): - other.unlink(missing_ok=True) - except Exception: - continue - except Exception: - pass - - resp = Response(content=decoded, media_type=mt) + resp = Response(content=res.payload, media_type=res.media_type) resp.headers["Cache-Control"] = "public, max-age=86400" if use_cache else "no-store" - resp.headers["X-SNS-Source"] = "remote-decrypt" if decrypted else "remote" - if x_enc: - resp.headers["X-SNS-X-Enc"] = x_enc + resp.headers["X-SNS-Source"] = str(res.source or "remote") + if res.x_enc: + resp.headers["X-SNS-X-Enc"] = str(res.x_enc) return resp diff --git a/src/wechat_decrypt_tool/sns_export_service.py b/src/wechat_decrypt_tool/sns_export_service.py index 273f249..0e4fa6e 100644 --- a/src/wechat_decrypt_tool/sns_export_service.py +++ b/src/wechat_decrypt_tool/sns_export_service.py @@ -31,19 +31,23 @@ from .chat_export_service import ( # pylint: disable=protected-access _zip_write_tree, ) -# Reuse WeFlow-compatible SNS remote download/decrypt helpers. +# Reuse SNS timeline/local cache helpers. from .routers.sns import ( # pylint: disable=protected-access - _fix_sns_cdn_url, _generate_sns_cache_key, - _materialize_sns_remote_video, _resolve_sns_cached_image_path, _resolve_sns_cached_image_path_by_cache_key, _resolve_sns_cached_image_path_by_md5, _resolve_sns_cached_video_path, - _try_fetch_and_decrypt_sns_remote, list_sns_timeline, ) +# SNS remote download+decrypt helpers (shared with API endpoints). +from .sns_media import ( # pylint: disable=protected-access + fix_sns_cdn_url as _fix_sns_cdn_url, + materialize_sns_remote_video as _materialize_sns_remote_video, + try_fetch_and_decrypt_sns_image_remote as _try_fetch_and_decrypt_sns_image_remote, +) + logger = get_logger(__name__) ExportStatus = Literal["queued", "running", "done", "error", "cancelled"] @@ -624,8 +628,8 @@ class SnsExportManager: # 0) Prefer WeFlow-style remote download+decrypt (accurate when keys are present). if fixed: should_cancel() - resp = run_async( - _try_fetch_and_decrypt_sns_remote( + res = run_async( + _try_fetch_and_decrypt_sns_image_remote( account_dir=account_dir, url=fixed, key=str(key or ""), @@ -633,8 +637,9 @@ class SnsExportManager: use_cache=use_cache, ) ) - if resp is not None: - payload, mt = _response_bytes(resp) + if res is not None: + payload = bytes(res.payload or b"") + mt = str(res.media_type or "") # 1) Local cache fallback (only when cache is enabled; mirrors `/api/sns/media` semantics). if (not payload) and use_cache: diff --git a/src/wechat_decrypt_tool/sns_media.py b/src/wechat_decrypt_tool/sns_media.py new file mode 100644 index 0000000..8cf0bcd --- /dev/null +++ b/src/wechat_decrypt_tool/sns_media.py @@ -0,0 +1,710 @@ +from __future__ import annotations + +"""SNS (Moments) remote media download + decryption helpers. + +This module centralizes the "remote URL -> download -> decrypt -> validate -> cache" pipeline +so it can be reused by: +- FastAPI endpoints (`routers/sns.py`) +- Offline export (`sns_export_service.py`) + +Important notes (empirical, matches current repo behavior): +- SNS images: prefer `wcdb_api.dll` export `wcdb_decrypt_sns_image` (black-box). Pure ISAAC64 + keystream XOR is NOT reliable for images across versions. +- SNS videos: encrypted only for the first 128KB; decrypt via WeFlow's WxIsaac64 (WASM keystream) + and XOR in-place. +""" + +from dataclasses import dataclass +from functools import lru_cache +from pathlib import Path +from typing import Optional +from urllib.parse import urlparse +import base64 +import hashlib +import html +import os +import re +import subprocess +import time + +import httpx +from fastapi import HTTPException + +from .logging_config import get_logger +from .wcdb_realtime import decrypt_sns_image as _wcdb_decrypt_sns_image + +logger = get_logger(__name__) + + +def is_allowed_sns_media_host(host: str) -> bool: + h = str(host or "").strip().lower() + if not h: + return False + # Images: qpic/qlogo. Thumbs: *.tc.qq.com. Videos/live photos: *.video.qq.com. + return h.endswith(".qpic.cn") or h.endswith(".qlogo.cn") or h.endswith(".tc.qq.com") or h.endswith(".video.qq.com") + + +def fix_sns_cdn_url(url: str, *, token: str = "", is_video: bool = False) -> str: + """WeFlow-compatible SNS CDN URL normalization. + + - Force https for Tencent CDNs. + - For images, replace `/150` with `/0` to request the original. + - If token is provided and url doesn't contain it, append `token=&idx=1`. + """ + u = html.unescape(str(url or "")).strip() + if not u: + return "" + + # Only touch Tencent CDNs; keep other URLs intact. + try: + p = urlparse(u) + host = str(p.hostname or "").lower() + if not is_allowed_sns_media_host(host): + return u + except Exception: + return u + + # http -> https + u = re.sub(r"^http://", "https://", u, flags=re.I) + + # /150 -> /0 (image only) + if not is_video: + u = re.sub(r"/150(?=($|\\?))", "/0", u) + + tok = str(token or "").strip() + if tok and ("token=" not in u): + if is_video: + # Match WeFlow: place `token&idx=1` in front of existing query params. + base, sep, qs = u.partition("?") + if sep: + qs = qs.lstrip("&") + u = f"{base}?token={tok}&idx=1" + if qs: + u = f"{u}&{qs}" + else: + u = f"{u}?token={tok}&idx=1" + else: + connector = "&" if "?" in u else "?" + u = f"{u}{connector}token={tok}&idx=1" + + return u + + +def _detect_mp4_ftyp(head: bytes) -> bool: + return bool(head) and len(head) >= 8 and head[4:8] == b"ftyp" + + +@lru_cache(maxsize=1) +def _weflow_wxisaac64_script_path() -> str: + """Locate the Node helper that wraps WeFlow's wasm_video_decode.* assets.""" + repo_root = Path(__file__).resolve().parents[2] + script = repo_root / "tools" / "weflow_wasm_keystream.js" + if script.exists() and script.is_file(): + return str(script) + return "" + + +@lru_cache(maxsize=64) +def weflow_wxisaac64_keystream(key: str, size: int) -> bytes: + """Generate keystream via WeFlow's WASM (preferred; matches real video decryption).""" + key_text = str(key or "").strip() + if not key_text or size <= 0: + return b"" + + # WeFlow is the source-of-truth; use its WASM first, then fall back to our pure-python ISAAC64. + script = _weflow_wxisaac64_script_path() + if script: + try: + # The JS helper prints ONLY base64 bytes to stdout; keep stderr for debugging. + proc = subprocess.run( + ["node", script, key_text, str(int(size))], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=30, + check=False, + ) + if proc.returncode == 0: + out_b64 = (proc.stdout or b"").strip() + if out_b64: + return base64.b64decode(out_b64, validate=False) + except Exception: + pass + + # Fallback: pure python ISAAC64 (best-effort; may not match WxIsaac64 for all versions). + from .isaac64 import Isaac64 # pylint: disable=import-outside-toplevel + + want = int(size) + # ISAAC64 generates 8-byte words; generate enough and slice. + size8 = ((want + 7) // 8) * 8 + return Isaac64(key_text).generate_keystream(size8)[:want] + + +_SNS_REMOTE_VIDEO_CACHE_EXTS = [ + ".mp4", + ".bin", # legacy/unknown +] + + +def _sns_remote_video_cache_dir_and_stem(account_dir: Path, *, url: str, key: str) -> tuple[Path, str]: + digest = hashlib.md5(f"video|{url}|{key}".encode("utf-8", errors="ignore")).hexdigest() + cache_dir = account_dir / "sns_remote_video_cache" / digest[:2] + return cache_dir, digest + + +def _sns_remote_video_cache_existing_path(cache_dir: Path, stem: str) -> Optional[Path]: + for ext in _SNS_REMOTE_VIDEO_CACHE_EXTS: + p = cache_dir / f"{stem}{ext}" + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + return None + + +async def _download_sns_remote_to_file(url: str, dest_path: Path, *, max_bytes: int) -> tuple[str, str]: + """Download SNS media to file (streaming) from Tencent CDN. + + Returns: (content_type, x_enc) + """ + u = str(url or "").strip() + if not u: + return "", "" + + # Safety: only allow Tencent CDN hosts. + try: + p = urlparse(u) + host = str(p.hostname or "").lower() + if not is_allowed_sns_media_host(host): + raise HTTPException(status_code=400, detail="SNS media host not allowed.") + except HTTPException: + raise + except Exception: + raise HTTPException(status_code=400, detail="Invalid SNS media URL.") + + base_headers = { + "User-Agent": "MicroMessenger Client", + "Accept": "*/*", + # Do not request compression for video streams. + "Connection": "keep-alive", + } + + header_variants = [ + {}, + # WeFlow/Electron: MicroMessenger UA + servicewechat.com referer passes some CDN anti-hotlink checks. + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090719) XWEB/8351", + "Referer": "https://servicewechat.com/", + "Origin": "https://servicewechat.com", + }, + {"Referer": "https://wx.qq.com/", "Origin": "https://wx.qq.com"}, + {"Referer": "https://mp.weixin.qq.com/", "Origin": "https://mp.weixin.qq.com"}, + ] + + last_err: Exception | None = None + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: + for extra in header_variants: + headers = dict(base_headers) + headers.update(extra) + try: + if dest_path.exists(): + try: + dest_path.unlink(missing_ok=True) + except Exception: + pass + + total = 0 + async with client.stream("GET", u, headers=headers) as resp: + resp.raise_for_status() + content_type = str(resp.headers.get("Content-Type") or "").strip() + x_enc = str(resp.headers.get("x-enc") or "").strip() + dest_path.parent.mkdir(parents=True, exist_ok=True) + with dest_path.open("wb") as f: + async for chunk in resp.aiter_bytes(): + if not chunk: + continue + total += len(chunk) + if total > max_bytes: + raise HTTPException(status_code=400, detail="SNS video too large.") + f.write(chunk) + return content_type, x_enc + except HTTPException: + raise + except Exception as e: + last_err = e + continue + + raise last_err or RuntimeError("sns remote download failed") + + +def maybe_decrypt_sns_video_file(path: Path, key: str) -> bool: + """Decrypt the first 128KB of an encrypted mp4 file in-place (WeFlow/Isaac64). + + Returns True if decryption was performed, False otherwise. + """ + key_text = str(key or "").strip() + if not key_text: + return False + + try: + size = int(path.stat().st_size) + except Exception: + return False + + if size <= 8: + return False + + decrypt_size = min(131072, size) + if decrypt_size <= 0: + return False + + try: + with path.open("r+b") as f: + head = f.read(8) + if _detect_mp4_ftyp(head): + return False + + f.seek(0) + buf = bytearray(f.read(decrypt_size)) + if not buf: + return False + + ks = weflow_wxisaac64_keystream(key_text, decrypt_size) + n = min(len(buf), len(ks)) + for i in range(n): + buf[i] ^= ks[i] + + f.seek(0) + f.write(buf) + f.flush() + + f.seek(0) + head2 = f.read(8) + if _detect_mp4_ftyp(head2): + return True + # Still return True to indicate we mutated bytes; caller may treat as failure if desired. + return True + except Exception: + return False + + +async def materialize_sns_remote_video( + *, + account_dir: Path, + url: str, + key: str, + token: str, + use_cache: bool, +) -> Optional[Path]: + """Download SNS video from CDN, decrypt (if needed), and return a local mp4 path.""" + fixed_url = fix_sns_cdn_url(str(url or ""), token=str(token or ""), is_video=True) + if not fixed_url: + return None + + cache_dir, cache_stem = _sns_remote_video_cache_dir_and_stem(account_dir, url=fixed_url, key=str(key or "")) + + if use_cache: + existing = _sns_remote_video_cache_existing_path(cache_dir, cache_stem) + if existing is not None: + # Best-effort migrate legacy `.bin` -> `.mp4` when it's already decrypted. + try: + if existing.suffix.lower() == ".bin": + with existing.open("rb") as f: + head = f.read(8) + if _detect_mp4_ftyp(head): + target = cache_dir / f"{cache_stem}.mp4" + cache_dir.mkdir(parents=True, exist_ok=True) + os.replace(str(existing), str(target)) + existing = target + except Exception: + pass + return existing + + # Download to a temp file first. + cache_dir.mkdir(parents=True, exist_ok=True) + tmp_path = cache_dir / f"{cache_stem}.mp4.{time.time_ns()}.tmp" + try: + await _download_sns_remote_to_file(fixed_url, tmp_path, max_bytes=200 * 1024 * 1024) + except Exception: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + return None + + # Decrypt in-place if the file isn't already a mp4. + maybe_decrypt_sns_video_file(tmp_path, str(key or "")) + + # Validate: mp4 must have `ftyp` at offset 4. + ok_mp4 = False + try: + with tmp_path.open("rb") as f: + head = f.read(8) + ok_mp4 = _detect_mp4_ftyp(head) + except Exception: + ok_mp4 = False + + if not ok_mp4: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + return None + + if use_cache: + final_path = cache_dir / f"{cache_stem}.mp4" + try: + os.replace(str(tmp_path), str(final_path)) + except Exception: + # If rename fails, keep tmp_path as fallback. + final_path = tmp_path + + # Remove other extensions for the same cache key. + for other_ext in _SNS_REMOTE_VIDEO_CACHE_EXTS: + if other_ext.lower() == ".mp4": + continue + other = cache_dir / f"{cache_stem}{other_ext}" + try: + if other.exists() and other.is_file(): + other.unlink(missing_ok=True) + except Exception: + continue + + return final_path + + # Cache disabled: keep the decrypted tmp_path (caller should delete it). + return tmp_path + + +def best_effort_unlink(path: str) -> None: + try: + Path(path).unlink(missing_ok=True) + except Exception: + pass + + +def detect_image_mime(data: bytes) -> str: + """Sniff image mime type by magic bytes. + + IMPORTANT: Do NOT trust HTTP Content-Type as a fallback here. We use this for + validating decrypted bytes. If we blindly trust `image/*`, a failed decrypt + would poison the disk cache and the frontend would keep showing broken images. + """ + if not data: + return "" + + if data.startswith(b"\xFF\xD8\xFF"): + return "image/jpeg" + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return "image/png" + if len(data) >= 6 and data[:6] in (b"GIF87a", b"GIF89a"): + return "image/gif" + if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP": + return "image/webp" + if len(data) >= 12 and data[4:8] == b"ftyp": + # ISO BMFF based image formats (HEIF/HEIC/AVIF). + brand = data[8:12] + if brand == b"avif": + return "image/avif" + if brand in (b"heic", b"heix", b"hevc", b"hevx"): + return "image/heic" + if brand in (b"heif", b"mif1", b"msf1"): + return "image/heif" + if data.startswith(b"BM"): + return "image/bmp" + + return "" + + +_SNS_REMOTE_CACHE_EXTS = [ + ".jpg", + ".jpeg", + ".png", + ".gif", + ".webp", + ".bmp", + ".avif", + ".heic", + ".heif", + ".bin", # legacy/unknown +] + + +def _mime_to_ext(mt: str) -> str: + m = str(mt or "").split(";", 1)[0].strip().lower() + return { + "image/jpeg": ".jpg", + "image/jpg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "image/webp": ".webp", + "image/bmp": ".bmp", + "image/avif": ".avif", + "image/heic": ".heic", + "image/heif": ".heif", + }.get(m, ".bin") + + +def _ext_to_mime(ext: str) -> str: + e = str(ext or "").strip().lower().lstrip(".") + return { + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "png": "image/png", + "gif": "image/gif", + "webp": "image/webp", + "bmp": "image/bmp", + "avif": "image/avif", + "heic": "image/heic", + "heif": "image/heif", + }.get(e, "") + + +def _sns_remote_cache_dir_and_stem(account_dir: Path, *, url: str, key: str) -> tuple[Path, str]: + digest = hashlib.md5(f"{url}|{key}".encode("utf-8", errors="ignore")).hexdigest() + cache_dir = account_dir / "sns_remote_cache" / digest[:2] + return cache_dir, digest + + +def _sns_remote_cache_existing_path(cache_dir: Path, stem: str) -> Optional[Path]: + for ext in _SNS_REMOTE_CACHE_EXTS: + p = cache_dir / f"{stem}{ext}" + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + return None + + +def _sniff_image_mime_from_file(path: Path) -> str: + try: + with path.open("rb") as f: + head = f.read(64) + return detect_image_mime(head) + except Exception: + return "" + + +async def _download_sns_remote_bytes(url: str) -> tuple[bytes, str, str]: + """Download SNS media bytes from Tencent CDN with a few safe header variants.""" + u = str(url or "").strip() + if not u: + return b"", "", "" + + max_bytes = 25 * 1024 * 1024 + + base_headers = { + "User-Agent": "MicroMessenger Client", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9", + # Avoid brotli dependency issues; images are already compressed anyway. + "Accept-Encoding": "identity", + "Connection": "keep-alive", + } + + # Some CDN endpoints return a small placeholder image for certain UA/Referer + # combinations but still respond 200. Try the simplest (base headers only) + # first to maximize the chance of getting the real media in one request. + header_variants = [ + {}, + # WeFlow/Electron: MicroMessenger UA + servicewechat.com referer passes some CDN anti-hotlink checks. + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090719) XWEB/8351", + "Referer": "https://servicewechat.com/", + "Origin": "https://servicewechat.com", + }, + {"Referer": "https://wx.qq.com/", "Origin": "https://wx.qq.com"}, + {"Referer": "https://mp.weixin.qq.com/", "Origin": "https://mp.weixin.qq.com"}, + ] + + last_err: Exception | None = None + async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: + for extra in header_variants: + headers = dict(base_headers) + headers.update(extra) + try: + resp = await client.get(u, headers=headers) + resp.raise_for_status() + payload = bytes(resp.content or b"") + if len(payload) > max_bytes: + raise HTTPException(status_code=400, detail="SNS media too large (>25MB).") + content_type = str(resp.headers.get("Content-Type") or "").strip() + x_enc = str(resp.headers.get("x-enc") or "").strip() + return payload, content_type, x_enc + except HTTPException: + raise + except Exception as e: + last_err = e + continue + + raise last_err or RuntimeError("sns remote download failed") + + +@dataclass(frozen=True) +class SnsRemoteImageResult: + payload: bytes + media_type: str + source: str + x_enc: str = "" + cache_path: Optional[Path] = None + + +async def try_fetch_and_decrypt_sns_image_remote( + *, + account_dir: Path, + url: str, + key: str, + token: str, + use_cache: bool, +) -> Optional[SnsRemoteImageResult]: + """Try WeFlow-style: download from CDN -> decrypt via wcdb_decrypt_sns_image -> return bytes. + + Returns a SnsRemoteImageResult on success, or None on failure so caller can fall back to + local cache matching logic. + """ + u_fixed = fix_sns_cdn_url(url, token=token, is_video=False) + if not u_fixed: + return None + + try: + p = urlparse(u_fixed) + host = str(p.hostname or "").strip().lower() + except Exception: + return None + if not is_allowed_sns_media_host(host): + return None + + cache_dir, cache_stem = _sns_remote_cache_dir_and_stem(account_dir, url=u_fixed, key=str(key or "")) + + cache_path: Optional[Path] = None + if use_cache: + try: + existing = _sns_remote_cache_existing_path(cache_dir, cache_stem) + if existing is not None: + mt = _ext_to_mime(existing.suffix) + + # Upgrade legacy `.bin` cache to a proper image extension once. + if (existing.suffix or "").lower() == ".bin" or (not mt): + mt2 = _sniff_image_mime_from_file(existing) + if not mt2: + try: + existing.unlink(missing_ok=True) + except Exception: + pass + existing = None + else: + ext2 = _mime_to_ext(mt2) + if ext2 != ".bin": + try: + cache_dir.mkdir(parents=True, exist_ok=True) + desired = cache_dir / f"{cache_stem}{ext2}" + if desired.exists(): + # Another process/version already wrote the real file; drop legacy bin. + existing.unlink(missing_ok=True) + existing = desired + else: + os.replace(str(existing), str(desired)) + existing = desired + except Exception: + pass + mt = mt2 + + if existing is not None and mt: + try: + payload = existing.read_bytes() + except Exception: + payload = b"" + if payload: + return SnsRemoteImageResult( + payload=payload, + media_type=mt, + source="remote-cache", + x_enc="", + cache_path=existing, + ) + except Exception: + pass + + try: + raw, _content_type, x_enc = await _download_sns_remote_bytes(u_fixed) + except Exception as e: + logger.info("[sns_media] remote download failed: %s", e) + return None + + if not raw: + return None + + # First, validate whether the CDN already returned a real image. + mt_raw = detect_image_mime(raw) + + decoded = raw + mt = mt_raw + decrypted = False + k = str(key or "").strip() + + # Only attempt decryption when bytes do NOT look like an image, or when CDN explicitly + # signals encryption (x-enc). Some endpoints return already-decoded PNG/JPEG even when + # urlAttrs.enc_idx == 1, and decrypting those would corrupt the bytes. + need_decrypt = bool(k) and (not mt_raw) and bool(raw) + if k and x_enc and str(x_enc).strip() not in ("0", "false", "False"): + need_decrypt = True + + if need_decrypt: + try: + decoded2 = _wcdb_decrypt_sns_image(raw, k) + mt2 = detect_image_mime(decoded2) + if mt2: + decoded = decoded2 + mt = mt2 + decrypted = decoded2 != raw + else: + # Decrypt failed; if raw is a real image, keep it. Otherwise treat as failure. + if mt_raw: + decoded = raw + mt = mt_raw + decrypted = False + else: + return None + except Exception as e: + logger.info("[sns_media] remote decrypt failed: %s", e) + if not mt_raw: + return None + decoded = raw + mt = mt_raw + decrypted = False + + if not mt: + return None + + if use_cache: + try: + ext = _mime_to_ext(mt) + cache_dir.mkdir(parents=True, exist_ok=True) + cache_path = cache_dir / f"{cache_stem}{ext}" + + tmp = cache_path.with_suffix(cache_path.suffix + f".{time.time_ns()}.tmp") + tmp.write_bytes(decoded) + os.replace(str(tmp), str(cache_path)) + + # Remove other extensions for the same cache key to avoid stale duplicates. + for other_ext in _SNS_REMOTE_CACHE_EXTS: + if other_ext.lower() == ext.lower(): + continue + other = cache_dir / f"{cache_stem}{other_ext}" + try: + if other.exists() and other.is_file(): + other.unlink(missing_ok=True) + except Exception: + continue + except Exception: + cache_path = None + + return SnsRemoteImageResult( + payload=decoded, + media_type=mt, + source="remote-decrypt" if decrypted else "remote", + x_enc=str(x_enc or "").strip(), + cache_path=cache_path, + ) + diff --git a/tests/test_sns_media.py b/tests/test_sns_media.py new file mode 100644 index 0000000..6bc2d38 --- /dev/null +++ b/tests/test_sns_media.py @@ -0,0 +1,180 @@ +import asyncio +import hashlib +import sys +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest import mock + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +from wechat_decrypt_tool import sns_media # noqa: E402 pylint: disable=wrong-import-position + + +class TestSnsMedia(unittest.TestCase): + def test_fix_sns_cdn_url_image_rewrites_150_and_appends_token(self): + u = "http://mmsns.qpic.cn/sns/abc/150" + out = sns_media.fix_sns_cdn_url(u, token="tkn", is_video=False) + self.assertEqual(out, "https://mmsns.qpic.cn/sns/abc/0?token=tkn&idx=1") + + u2 = "https://mmsns.qpic.cn/sns/abc/150?foo=bar" + out2 = sns_media.fix_sns_cdn_url(u2, token="tkn", is_video=False) + self.assertEqual(out2, "https://mmsns.qpic.cn/sns/abc/0?foo=bar&token=tkn&idx=1") + + def test_fix_sns_cdn_url_video_places_token_first(self): + u = "https://snsvideodownload.video.qq.com/abc.mp4?foo=1&bar=2" + out = sns_media.fix_sns_cdn_url(u, token="tkn", is_video=True) + self.assertEqual(out, "https://snsvideodownload.video.qq.com/abc.mp4?token=tkn&idx=1&foo=1&bar=2") + + def test_fix_sns_cdn_url_non_tencent_host_passthrough(self): + u = "http://example.com/a/150?x=1" + out = sns_media.fix_sns_cdn_url(u, token="tkn", is_video=False) + self.assertEqual(out, u) + + def test_maybe_decrypt_sns_video_file_xors_inplace(self): + # Build a fake MP4 header (ftyp at offset 4) and encrypt it by XORing with a keystream. + plain = b"\x00\x00\x00\x20ftypisom" + b"\x00" * 48 + ks = bytes(range(len(plain))) + enc = bytes([plain[i] ^ ks[i] for i in range(len(plain))]) + + with TemporaryDirectory() as td: + p = Path(td) / "v.mp4" + p.write_bytes(enc) + + with mock.patch("wechat_decrypt_tool.sns_media.weflow_wxisaac64_keystream", return_value=ks): + did = sns_media.maybe_decrypt_sns_video_file(p, key="1") + self.assertTrue(did) + self.assertEqual(p.read_bytes(), plain) + + # Second run should be a no-op because it already looks like a MP4. + did2 = sns_media.maybe_decrypt_sns_video_file(p, key="1") + self.assertFalse(did2) + + def test_try_fetch_and_decrypt_sns_image_remote_cache_hit(self): + with TemporaryDirectory() as td: + account_dir = Path(td) / "acc" + account_dir.mkdir(parents=True, exist_ok=True) + + url = "https://mmsns.qpic.cn/sns/test/0?token=tkn&idx=1" + key = "123" + fixed = sns_media.fix_sns_cdn_url(url, token="tkn", is_video=False) + digest = hashlib.md5(f"{fixed}|{key}".encode("utf-8", errors="ignore")).hexdigest() + + cache_dir = account_dir / "sns_remote_cache" / digest[:2] + cache_dir.mkdir(parents=True, exist_ok=True) + cache_path = cache_dir / f"{digest}.jpg" + + payload = b"\xff\xd8\xff\x00fakejpeg" + cache_path.write_bytes(payload) + + res = asyncio.run( + sns_media.try_fetch_and_decrypt_sns_image_remote( + account_dir=account_dir, + url=url, + key=key, + token="tkn", + use_cache=True, + ) + ) + self.assertIsNotNone(res) + assert res is not None + self.assertEqual(res.source, "remote-cache") + self.assertEqual(res.media_type, "image/jpeg") + self.assertEqual(res.payload, payload) + self.assertTrue(res.cache_path and res.cache_path.exists()) + + def test_try_fetch_and_decrypt_sns_image_remote_cache_upgrades_bin_extension(self): + with TemporaryDirectory() as td: + account_dir = Path(td) / "acc" + account_dir.mkdir(parents=True, exist_ok=True) + + url = "https://mmsns.qpic.cn/sns/test/0?token=tkn&idx=1" + key = "123" + fixed = sns_media.fix_sns_cdn_url(url, token="tkn", is_video=False) + digest = hashlib.md5(f"{fixed}|{key}".encode("utf-8", errors="ignore")).hexdigest() + + cache_dir = account_dir / "sns_remote_cache" / digest[:2] + cache_dir.mkdir(parents=True, exist_ok=True) + bin_path = cache_dir / f"{digest}.bin" + png_payload = b"\x89PNG\r\n\x1a\n" + b"fakepng" + bin_path.write_bytes(png_payload) + + res = asyncio.run( + sns_media.try_fetch_and_decrypt_sns_image_remote( + account_dir=account_dir, + url=url, + key=key, + token="tkn", + use_cache=True, + ) + ) + self.assertIsNotNone(res) + assert res is not None + self.assertEqual(res.source, "remote-cache") + self.assertEqual(res.media_type, "image/png") + self.assertTrue(res.cache_path and res.cache_path.suffix.lower() == ".png") + self.assertTrue(res.cache_path and res.cache_path.exists()) + self.assertFalse(bin_path.exists()) + + def test_try_fetch_and_decrypt_sns_image_remote_decrypts_when_needed(self): + raw = b"\x01\x02\x03\x04not_an_image" + decoded = b"\x89PNG\r\n\x1a\n" + b"decoded" + + async def fake_download(_url: str): + return raw, "image/jpeg", "1" + + with TemporaryDirectory() as td: + account_dir = Path(td) / "acc" + account_dir.mkdir(parents=True, exist_ok=True) + + with mock.patch("wechat_decrypt_tool.sns_media._download_sns_remote_bytes", side_effect=fake_download): + with mock.patch("wechat_decrypt_tool.sns_media._wcdb_decrypt_sns_image", return_value=decoded): + res = asyncio.run( + sns_media.try_fetch_and_decrypt_sns_image_remote( + account_dir=account_dir, + url="https://mmsns.qpic.cn/sns/test/0", + key="123", + token="tkn", + use_cache=False, + ) + ) + + self.assertIsNotNone(res) + assert res is not None + self.assertEqual(res.media_type, "image/png") + self.assertEqual(res.source, "remote-decrypt") + self.assertEqual(res.x_enc, "1") + self.assertEqual(res.payload, decoded) + + def test_try_fetch_and_decrypt_sns_image_remote_decrypt_failure_returns_none(self): + raw = b"\x01\x02\x03\x04not_an_image" + decoded_bad = b"\x00\x00\x00\x00still_bad" + + async def fake_download(_url: str): + return raw, "image/jpeg", "1" + + with TemporaryDirectory() as td: + account_dir = Path(td) / "acc" + account_dir.mkdir(parents=True, exist_ok=True) + + with mock.patch("wechat_decrypt_tool.sns_media._download_sns_remote_bytes", side_effect=fake_download): + with mock.patch("wechat_decrypt_tool.sns_media._wcdb_decrypt_sns_image", return_value=decoded_bad): + res = asyncio.run( + sns_media.try_fetch_and_decrypt_sns_image_remote( + account_dir=account_dir, + url="https://mmsns.qpic.cn/sns/test/0", + key="123", + token="tkn", + use_cache=False, + ) + ) + + self.assertIsNone(res) + + +if __name__ == "__main__": + unittest.main() +