From 6e127b2e3290b381d25552914b01c21ec5d49f29 Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Tue, 17 Feb 2026 23:40:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(sns):=20=E5=A2=9E=E5=BC=BA=E6=9C=8B?= =?UTF-8?q?=E5=8F=8B=E5=9C=88=E6=97=B6=E9=97=B4=E7=BA=BF/=E5=AA=92?= =?UTF-8?q?=E4=BD=93=E8=8E=B7=E5=8F=96=E4=B8=8E=E5=AE=9E=E6=97=B6=E5=90=8C?= =?UTF-8?q?=E6=AD=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 /api/sns/users:按发圈数统计联系人(支持 keyword/limit) - 新增 /api/sns/realtime/sync_latest:WCDB 实时增量同步到解密库(append-only),并持久化 sync state - 朋友圈媒体优先走“远程下载+解密”:图片支持 wcdb_decrypt_sns_image,视频/实况支持 ISAAC64(WeFlow 逻辑) - 增加 WeFlow WASM keystream(Node) 优先 + Python ISAAC64 fallback,提升兼容性 - wcdb_api.dll 支持多路径自动发现/环境变量覆盖,并在状态信息中回传实际使用路径 --- src/wechat_decrypt_tool/isaac64.py | 169 ++ src/wechat_decrypt_tool/routers/sns.py | 2114 ++++++++++++++++- .../sns_realtime_autosync.py | 274 +++ src/wechat_decrypt_tool/wcdb_realtime.py | 135 +- tools/weflow_wasm_keystream.js | 122 + 5 files changed, 2706 insertions(+), 108 deletions(-) create mode 100644 src/wechat_decrypt_tool/isaac64.py create mode 100644 src/wechat_decrypt_tool/sns_realtime_autosync.py create mode 100644 tools/weflow_wasm_keystream.js diff --git a/src/wechat_decrypt_tool/isaac64.py b/src/wechat_decrypt_tool/isaac64.py new file mode 100644 index 0000000..c1a8236 --- /dev/null +++ b/src/wechat_decrypt_tool/isaac64.py @@ -0,0 +1,169 @@ +from __future__ import annotations + +"""ISAAC-64 PRNG (WeFlow compatible). + +WeChat SNS live photo/video decryption uses a keystream generated by ISAAC-64 and +XORs the first 128KB of the mp4 file. WeFlow's implementation reverses the +generated byte array, so we mirror that behavior for compatibility. +""" + +from typing import Any + +_MASK_64 = 0xFFFFFFFFFFFFFFFF + + +def _u64(v: int) -> int: + return int(v) & _MASK_64 + + +class Isaac64: + def __init__(self, seed: Any): + seed_text = str(seed).strip() + if not seed_text: + seed_val = 0 + else: + try: + # WeFlow seeds with BigInt(seed), where seed is usually a decimal string. + seed_val = int(seed_text, 0) + except Exception: + seed_val = 0 + + self.mm = [_u64(0) for _ in range(256)] + self.aa = _u64(0) + self.bb = _u64(0) + self.cc = _u64(0) + self.randrsl = [_u64(0) for _ in range(256)] + self.randrsl[0] = _u64(seed_val) + self.randcnt = 0 + self._init(True) + + def _init(self, flag: bool) -> None: + a = b = c = d = e = f = g = h = _u64(0x9E3779B97F4A7C15) + + def mix() -> tuple[int, int, int, int, int, int, int, int]: + nonlocal a, b, c, d, e, f, g, h + a = _u64(a - e) + f = _u64(f ^ (h >> 9)) + h = _u64(h + a) + + b = _u64(b - f) + g = _u64(g ^ _u64(a << 9)) + a = _u64(a + b) + + c = _u64(c - g) + h = _u64(h ^ (b >> 23)) + b = _u64(b + c) + + d = _u64(d - h) + a = _u64(a ^ _u64(c << 15)) + c = _u64(c + d) + + e = _u64(e - a) + b = _u64(b ^ (d >> 14)) + d = _u64(d + e) + + f = _u64(f - b) + c = _u64(c ^ _u64(e << 20)) + e = _u64(e + f) + + g = _u64(g - c) + d = _u64(d ^ (f >> 17)) + f = _u64(f + g) + + h = _u64(h - d) + e = _u64(e ^ _u64(g << 14)) + g = _u64(g + h) + return a, b, c, d, e, f, g, h + + for _ in range(4): + mix() + + for i in range(0, 256, 8): + if flag: + a = _u64(a + self.randrsl[i]) + b = _u64(b + self.randrsl[i + 1]) + c = _u64(c + self.randrsl[i + 2]) + d = _u64(d + self.randrsl[i + 3]) + e = _u64(e + self.randrsl[i + 4]) + f = _u64(f + self.randrsl[i + 5]) + g = _u64(g + self.randrsl[i + 6]) + h = _u64(h + self.randrsl[i + 7]) + mix() + self.mm[i] = a + self.mm[i + 1] = b + self.mm[i + 2] = c + self.mm[i + 3] = d + self.mm[i + 4] = e + self.mm[i + 5] = f + self.mm[i + 6] = g + self.mm[i + 7] = h + + if flag: + for i in range(0, 256, 8): + a = _u64(a + self.mm[i]) + b = _u64(b + self.mm[i + 1]) + c = _u64(c + self.mm[i + 2]) + d = _u64(d + self.mm[i + 3]) + e = _u64(e + self.mm[i + 4]) + f = _u64(f + self.mm[i + 5]) + g = _u64(g + self.mm[i + 6]) + h = _u64(h + self.mm[i + 7]) + mix() + self.mm[i] = a + self.mm[i + 1] = b + self.mm[i + 2] = c + self.mm[i + 3] = d + self.mm[i + 4] = e + self.mm[i + 5] = f + self.mm[i + 6] = g + self.mm[i + 7] = h + + self._isaac64() + self.randcnt = 256 + + def _isaac64(self) -> None: + self.cc = _u64(self.cc + 1) + self.bb = _u64(self.bb + self.cc) + + for i in range(256): + x = self.mm[i] + if (i & 3) == 0: + # aa ^= ~(aa << 21) + self.aa = _u64(self.aa ^ (_u64(self.aa << 21) ^ _MASK_64)) + elif (i & 3) == 1: + self.aa = _u64(self.aa ^ (self.aa >> 5)) + elif (i & 3) == 2: + self.aa = _u64(self.aa ^ _u64(self.aa << 12)) + else: + self.aa = _u64(self.aa ^ (self.aa >> 33)) + + self.aa = _u64(self.mm[(i + 128) & 255] + self.aa) + y = _u64(self.mm[(x >> 3) & 255] + self.aa + self.bb) + self.mm[i] = y + self.bb = _u64(self.mm[(y >> 11) & 255] + x) + self.randrsl[i] = self.bb + + def get_next(self) -> int: + if self.randcnt == 0: + self._isaac64() + self.randcnt = 256 + idx = 256 - self.randcnt + self.randcnt -= 1 + return _u64(self.randrsl[idx]) + + def generate_keystream(self, size: int) -> bytes: + """Generate a keystream of `size` bytes (must be multiple of 8).""" + if size <= 0: + return b"" + if size % 8 != 0: + raise ValueError("ISAAC64 keystream size must be multiple of 8 bytes.") + + out = bytearray() + count = size // 8 + for _ in range(count): + out.extend(int(self.get_next()).to_bytes(8, "little", signed=False)) + + # WeFlow reverses the entire byte array (Uint8Array.reverse()). + out.reverse() + return bytes(out) + diff --git a/src/wechat_decrypt_tool/routers/sns.py b/src/wechat_decrypt_tool/routers/sns.py index 59254bb..0127cef 100644 --- a/src/wechat_decrypt_tool/routers/sns.py +++ b/src/wechat_decrypt_tool/routers/sns.py @@ -1,15 +1,22 @@ from bisect import bisect_left, bisect_right from functools import lru_cache from pathlib import Path +import os +import base64 import hashlib import json import re import httpx import html # 修复&转义的问题!!! import sqlite3 +import subprocess +import threading import time import xml.etree.ElementTree as ET from typing import Any, Optional +from urllib.parse import urlparse + +from starlette.background import BackgroundTask from fastapi import APIRouter, HTTPException from fastapi.responses import Response, FileResponse # 返回视频文件 @@ -22,6 +29,7 @@ from ..path_fix import PathFixRoute from ..wcdb_realtime import ( WCDBRealtimeError, WCDB_REALTIME, + decrypt_sns_image as _wcdb_decrypt_sns_image, exec_query as _wcdb_exec_query, get_sns_timeline as _wcdb_get_sns_timeline, ) @@ -32,6 +40,68 @@ router = APIRouter(route_class=PathFixRoute) SNS_MEDIA_PICKS_FILE = "_sns_media_picks.json" +_SNS_VIDEO_KEY_RE = re.compile(r' (expires_at_ts, force_sqlite) +_SNS_TIMELINE_AUTO_CACHE: dict[tuple[str, tuple[str, ...], str], tuple[float, bool]] = {} +_SNS_TIMELINE_AUTO_CACHE_MU = threading.Lock() + + +def _sns_timeline_auto_cache_key(account_dir: Path, users: list[str], kw: str) -> tuple[str, tuple[str, ...], str]: + # Normalize so different param orders map to the same key. + a = str(Path(account_dir).name) + u = tuple(sorted([str(x or "").strip() for x in (users or []) if str(x or "").strip()])) + k = str(kw or "").strip() + return (a, u, k) + + +def _sns_timeline_auto_cache_get(key: tuple[str, tuple[str, ...], str]) -> Optional[bool]: + now = time.time() + with _SNS_TIMELINE_AUTO_CACHE_MU: + rec = _SNS_TIMELINE_AUTO_CACHE.get(key) + if not rec: + return None + exp_ts, val = rec + if exp_ts <= now: + try: + del _SNS_TIMELINE_AUTO_CACHE[key] + except Exception: + pass + return None + return bool(val) + + +def _sns_timeline_auto_cache_set( + key: tuple[str, tuple[str, ...], str], + val: bool, + *, + ttl_seconds: int = _SNS_TIMELINE_AUTO_CACHE_TTL_SECONDS, +) -> None: + ttl = int(ttl_seconds or _SNS_TIMELINE_AUTO_CACHE_TTL_SECONDS) + if ttl <= 0: + ttl = _SNS_TIMELINE_AUTO_CACHE_TTL_SECONDS + exp_ts = time.time() + float(ttl) + with _SNS_TIMELINE_AUTO_CACHE_MU: + _SNS_TIMELINE_AUTO_CACHE[key] = (exp_ts, bool(val)) + + +def _sns_decrypted_db_lock(account: str) -> threading.Lock: + key = str(account or "").strip() + if not key: + key = "_" + with _SNS_DECRYPTED_DB_LOCKS_MU: + lock = _SNS_DECRYPTED_DB_LOCKS.get(key) + if lock is None: + lock = threading.Lock() + _SNS_DECRYPTED_DB_LOCKS[key] = lock + return lock + def _parse_csv_list(raw: Optional[str]) -> list[str]: if raw is None: @@ -50,6 +120,351 @@ def _safe_int(v: Any) -> int: return 0 +def _count_sns_timeline_rows_in_decrypted_sqlite( + sns_db_path: Path, + *, + users: list[str], + kw: str, +) -> int: + """Count rows in decrypted `sns.db` for a given query (raw rows, not timeline-filtered).""" + sns_db_path = Path(sns_db_path) + try: + if (not sns_db_path.exists()) or (not sns_db_path.is_file()): + return 0 + except Exception: + return 0 + + filters: list[str] = [] + params: list[Any] = [] + + if users: + placeholders = ",".join(["?"] * len(users)) + filters.append(f"user_name IN ({placeholders})") + params.extend(users) + + if kw: + filters.append("content LIKE ?") + params.append(f"%{kw}%") + + where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" + sql = f"SELECT COUNT(*) AS c FROM SnsTimeLine {where_sql}" + + try: + conn = sqlite3.connect(str(sns_db_path), timeout=2.0) + try: + conn.execute("PRAGMA busy_timeout=2000") + row = conn.execute(sql, params).fetchone() + return int((row[0] if row else 0) or 0) + finally: + try: + conn.close() + except Exception: + pass + except Exception: + return 0 + + +def _count_sns_timeline_posts_in_decrypted_sqlite( + sns_db_path: Path, + *, + users: list[str], + kw: str, +) -> int: + """Count visible-post rows in decrypted `sns.db` for a given query. + + This matches `/api/sns/users`'s `postCount` definition: + - content not null/empty + - exclude cover rows: `7` + """ + sns_db_path = Path(sns_db_path) + try: + if (not sns_db_path.exists()) or (not sns_db_path.is_file()): + return 0 + except Exception: + return 0 + + filters: list[str] = [] + params: list[Any] = [] + + # Base filter: align with list_sns_users() postCount. + filters.append("content IS NOT NULL") + filters.append("content != ?") + params.append("") + filters.append("content NOT LIKE ?") + params.append("%7%") + + if users: + placeholders = ",".join(["?"] * len(users)) + filters.append(f"user_name IN ({placeholders})") + params.extend(users) + + if kw: + filters.append("content LIKE ?") + params.append(f"%{kw}%") + + where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" + sql = f"SELECT COUNT(*) AS c FROM SnsTimeLine {where_sql}" + + try: + conn = sqlite3.connect(str(sns_db_path), timeout=2.0) + try: + conn.execute("PRAGMA busy_timeout=2000") + row = conn.execute(sql, params).fetchone() + return int((row[0] if row else 0) or 0) + finally: + try: + conn.close() + except Exception: + pass + except Exception: + return 0 + + +def _to_signed_i64(v: int) -> int: + x = int(v) & 0xFFFFFFFFFFFFFFFF + if x >= 0x8000000000000000: + x -= 0x10000000000000000 + return int(x) + +def _to_unsigned_i64_str(v: Any) -> str: + """Return unsigned decimal string for a signed/unsigned 64-bit integer-ish value. + + Moments `tid/id` is often an unsigned u64 stored as signed i64 (negative) in sqlite/WCDB. + Frontend cache-key formulas expect the *unsigned* decimal string. + """ + try: + x = int(v) + except Exception: + return str(v or "").strip() + return str(x & 0xFFFFFFFFFFFFFFFF) + + +def _read_sns_realtime_sync_state(account_dir: Path) -> dict[str, Any]: + p = Path(account_dir) / _SNS_REALTIME_SYNC_STATE_FILE + try: + if not p.exists() or (not p.is_file()): + return {} + except Exception: + return {} + + try: + data = json.loads(p.read_text(encoding="utf-8")) + except Exception: + return {} + + return data if isinstance(data, dict) else {} + + +def _write_sns_realtime_sync_state(account_dir: Path, data: dict[str, Any]) -> None: + p = Path(account_dir) / _SNS_REALTIME_SYNC_STATE_FILE + try: + p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + except Exception: + pass + + +def _ensure_decrypted_sns_db(account_dir: Path) -> Path: + """Ensure `{account}/sns.db` exists with at least a minimal `SnsTimeLine` table. + + We keep it minimal (tid/user_name/content) so it stays compatible with older schema + while enabling incremental cache/writeback from WCDB realtime. + """ + account_dir = Path(account_dir) + sns_db_path = account_dir / "sns.db" + + # If something weird exists at that path, bail out. + try: + if sns_db_path.exists() and (not sns_db_path.is_file()): + raise RuntimeError("sns.db path is not a file") + except Exception as e: + raise RuntimeError(f"Invalid sns.db path: {e}") from e + + conn = sqlite3.connect(str(sns_db_path)) + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS SnsTimeLine( + tid INTEGER PRIMARY KEY, + user_name TEXT, + content TEXT + ) + """ + ) + conn.commit() + finally: + try: + conn.close() + except Exception: + pass + + return sns_db_path + + +def _upsert_sns_timeline_rows_to_decrypted_db( + account_dir: Path, + rows: list[tuple[int, str, str, Optional[str]]], + *, + source: str, +) -> int: + """Upsert rows into decrypted `{account}/sns.db` to avoid local missing data. + + rows: [(tid_signed, user_name, content_xml, pack_info_buf_or_none)] + """ + if not rows: + return 0 + + sns_db_path = _ensure_decrypted_sns_db(account_dir) + + # Serialize writes per-account to avoid sqlite "database is locked" errors under concurrency. + with _sns_decrypted_db_lock(Path(account_dir).name): + conn = sqlite3.connect(str(sns_db_path), timeout=2.0) + try: + conn.execute("PRAGMA busy_timeout=2000") + cols: set[str] = set() + try: + info_rows = conn.execute("PRAGMA table_info(SnsTimeLine)").fetchall() + for r in info_rows or []: + try: + cols.add(str(r[1] or "").strip()) + except Exception: + continue + except Exception: + cols = set() + + has_pack = "pack_info_buf" in cols + + if has_pack: + sql = """ + INSERT INTO SnsTimeLine (tid, user_name, content, pack_info_buf) + VALUES (?, ?, ?, ?) + ON CONFLICT(tid) DO UPDATE SET + user_name=excluded.user_name, + content=COALESCE(NULLIF(excluded.content, ''), SnsTimeLine.content), + pack_info_buf=COALESCE(excluded.pack_info_buf, SnsTimeLine.pack_info_buf) + """ + data = [(int(tid), str(u or "").strip(), str(c or ""), p) for tid, u, c, p in rows] + else: + sql = """ + INSERT INTO SnsTimeLine (tid, user_name, content) + VALUES (?, ?, ?) + ON CONFLICT(tid) DO UPDATE SET + user_name=excluded.user_name, + content=COALESCE(NULLIF(excluded.content, ''), SnsTimeLine.content) + """ + data = [(int(tid), str(u or "").strip(), str(c or "")) for tid, u, c, _p in rows] + + conn.executemany(sql, data) + conn.commit() + return len(rows) + except Exception as e: + logger.debug("[sns] decrypted sns.db upsert failed source=%s err=%s", source, e) + try: + conn.rollback() + except Exception: + pass + return 0 + finally: + try: + conn.close() + except Exception: + pass + +def _extract_mp_biz_from_url(url: str) -> str: + """Extract `__biz` from mp.weixin.qq.com URLs (best-effort).""" + u = html.unescape(str(url or "")).replace("&", "&").strip() + if not u: + return "" + m = _MP_BIZ_RE.search(u) + if not m: + return "" + return str(m.group(1) or "").strip() + + +@lru_cache(maxsize=16) +def _build_biz_to_official_index(contact_db_path: str, mtime_ns: int, size: int) -> dict[str, dict[str, Any]]: + """Build mapping: __biz -> { username, serviceType } from contact.db.biz_info.""" + out: dict[str, dict[str, Any]] = {} + if not contact_db_path: + return out + + conn = sqlite3.connect(str(contact_db_path)) + conn.row_factory = sqlite3.Row + try: + try: + rows = conn.execute( + "SELECT username, brand_info, external_info, home_url FROM biz_info" + ).fetchall() + except Exception: + rows = [] + + for r in rows: + try: + uname = str(r["username"] or "").strip() + except Exception: + uname = "" + if not uname: + continue + + try: + brand_info = str(r["brand_info"] or "") + except Exception: + brand_info = "" + try: + external_info = str(r["external_info"] or "") + except Exception: + external_info = "" + try: + home_url = str(r["home_url"] or "") + except Exception: + home_url = "" + + service_type: Optional[int] = None + if external_info: + try: + j = json.loads(external_info) + st = j.get("ServiceType") + if st is not None: + service_type = int(st) + except Exception: + service_type = None + + blob = " ".join([brand_info, external_info, home_url]) + for biz in _MP_BIZ_RE.findall(blob): + b = str(biz or "").strip() + if not b: + continue + prev = out.get(b) + if prev is None: + out[b] = {"username": uname, "serviceType": service_type} + else: + if prev.get("serviceType") is None and service_type is not None: + prev["serviceType"] = service_type + finally: + conn.close() + + return out + + +def _get_biz_to_official_index(contact_db_path: Path) -> dict[str, dict[str, Any]]: + if not contact_db_path.exists(): + return {} + st = contact_db_path.stat() + mtime_ns = int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1e9))) + return _build_biz_to_official_index(str(contact_db_path), mtime_ns, int(st.st_size)) + + +def _extract_sns_video_key(raw_xml: Any) -> str: + """Extract Isaac64 video key from raw XML, e.g. ``.""" + try: + text = str(raw_xml or "") + except Exception: + return "" + if not text: + return "" + m = _SNS_VIDEO_KEY_RE.search(text) + return str(m.group(1) or "").strip() if m else "" + + def _build_location_text(node: Optional[ET.Element]) -> str: if node is None: return "" @@ -763,12 +1178,36 @@ def _list_sns_cached_image_candidate_keys( return tuple(out) -def _get_sns_cover(account_dir: Path, target_wxid: str) -> Optional[dict[str, Any]]: - """无论多古老,强行揪出用户最近的一次朋友圈封面 (type=7)""" - cover_sql = f"SELECT tid, content FROM SnsTimeLine WHERE user_name = '{target_wxid}' AND content LIKE '%7%' ORDER BY tid DESC LIMIT 1" - cover_xml = None - cover_tid = None +def _get_sns_covers(account_dir: Path, target_wxid: str, limit: int = 20) -> list[dict[str, Any]]: + """无论多古老,强行揪出用户的朋友圈封面历史 (type=7)。 + 返回倒序(最新在前)的列表,包含 createTime 便于前端叠加显示。 + """ + wxid = str(target_wxid or "").strip() + if not wxid: + return [] + + try: + lim = int(limit or 20) + except Exception: + lim = 20 + if lim <= 0: + lim = 1 + # Keep payload bounded; cover history isn't worth huge queries. + if lim > 50: + lim = 50 + + wxid_esc = wxid.replace("'", "''") + cover_sql = ( + "SELECT tid, content FROM SnsTimeLine " + f"WHERE user_name = '{wxid_esc}' AND content LIKE '%7%' " + "ORDER BY tid DESC " + f"LIMIT {lim}" + ) + + rows: list[dict[str, Any]] = [] + + # 1) Prefer real-time WCDB if available (reads db_storage/sns/sns.db). try: if WCDB_REALTIME.is_connected(account_dir.name): conn = WCDB_REALTIME.ensure_connected(account_dir) @@ -777,37 +1216,65 @@ def _get_sns_cover(account_dir: Path, target_wxid: str) -> Optional[dict[str, An if not sns_db_path.exists(): sns_db_path = conn.db_storage_dir / "sns.db" # 利用 exec_query 强行查 - rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=cover_sql) - if rows: - cover_xml = rows[0].get("content") - cover_tid = rows[0].get("tid") + rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=cover_sql) or [] except Exception as e: - logger.warning(f"[sns] WCDB cover fetch failed: {e}") + logger.warning("[sns] WCDB cover fetch failed: %s", e) - # 2. 如果没查到,降级从本地解密的 sns.db 查 - if not cover_xml: + # 2) Fallback to local decrypted snapshot sns.db. + if not rows: sns_db_path = account_dir / "sns.db" if sns_db_path.exists(): try: # 只读模式防止锁死 conn_sq = sqlite3.connect(f"file:{sns_db_path}?mode=ro", uri=True) conn_sq.row_factory = sqlite3.Row - row = conn_sq.execute(cover_sql).fetchone() - if row: - cover_xml = str(row["content"] or "") - cover_tid = row["tid"] + rows_sq = conn_sq.execute(cover_sql).fetchall() conn_sq.close() + rows = [{"tid": r["tid"], "content": r["content"]} for r in (rows_sq or [])] except Exception as e: - logger.warning(f"[sns] SQLite cover fetch failed: {e}") + logger.warning("[sns] SQLite cover fetch failed: %s", e) - if cover_xml: - parsed = _parse_timeline_xml(cover_xml, target_wxid) - return { - "id": str(cover_tid or ""), - "media": parsed.get("media", []), - "type": 7 - } - return None + out: list[dict[str, Any]] = [] + seen: set[str] = set() + for rr in rows: + if not isinstance(rr, dict): + continue + cover_xml = rr.get("content") + if not cover_xml: + continue + + try: + cover_tid = int(rr.get("tid") or 0) + except Exception: + cover_tid = 0 + + parsed = _parse_timeline_xml(str(cover_xml or ""), wxid) + media = parsed.get("media") or [] + if not isinstance(media, list) or not media: + continue + + cid = _to_unsigned_i64_str(cover_tid or "") + if cid in seen: + continue + seen.add(cid) + + out.append( + { + "id": cid, + "tid": cover_tid, + "username": wxid, + "createTime": int(parsed.get("createTime") or 0), + "media": media, + "type": 7, + } + ) + return out + + +def _get_sns_cover(account_dir: Path, target_wxid: str) -> Optional[dict[str, Any]]: + """兼容旧逻辑:返回最近的一张朋友圈封面 (type=7)""" + covers = _get_sns_covers(account_dir, target_wxid, limit=1) + return covers[0] if covers else None @@ -891,6 +1358,188 @@ def api_sns_self_info(account: Optional[str] = None): "source": source } + +@router.post("/api/sns/realtime/sync_latest", summary="实时朋友圈同步到解密库(增量)") +def sync_sns_realtime_timeline_latest( + account: Optional[str] = None, + max_scan: int = 200, + force: int = 0, +): + """Sync latest visible Moments from WCDB realtime into decrypted `{account}/sns.db`. + + This is best-effort and intentionally **append-only**: we never delete rows from the decrypted snapshot + even if the post is deleted/hidden later, so users can still browse/export historical cached content. + """ + try: + lim = int(max_scan or 200) + except Exception: + lim = 200 + if lim <= 0: + lim = 200 + if lim > 2000: + lim = 2000 + + try: + force_flag = bool(int(force or 0)) + except Exception: + force_flag = False + + account_dir = _resolve_account_dir(account) + + # If there is no local decrypted sns.db yet, force a first-time materialization. + try: + if not (account_dir / "sns.db").exists(): + force_flag = True + except Exception: + force_flag = True + + info = WCDB_REALTIME.get_status(account_dir) + available = bool(info.get("dll_present") and info.get("key_present") and info.get("db_storage_dir")) + if not available: + raise HTTPException(status_code=404, detail="WCDB realtime not available.") + + st = _read_sns_realtime_sync_state(account_dir) + last_max_id_u = 0 + try: + last_max_id_u = int(str(st.get("maxId") or st.get("max_id") or "0").strip() or "0") + except Exception: + last_max_id_u = 0 + + conn = WCDB_REALTIME.ensure_connected(account_dir) + + t0 = time.perf_counter() + rows: list[dict[str, Any]] = [] + max_id_u = 0 + upsert_rows: list[tuple[int, str, str, Optional[str]]] = [] + + with conn.lock: + rows = _wcdb_get_sns_timeline( + conn.handle, + limit=lim, + offset=0, + usernames=[], + keyword="", + ) + + if not rows: + return { + "status": "ok", + "scanned": 0, + "upserted": 0, + "maxId": str(last_max_id_u or 0), + "elapsedMs": int((time.perf_counter() - t0) * 1000.0), + } + + # Compute the newest unsigned tid/id from WCDB rows. + for r in rows: + if not isinstance(r, dict): + continue + try: + tid_u = int(r.get("id") or 0) + except Exception: + continue + if tid_u > max_id_u: + max_id_u = tid_u + + if (not force_flag) and max_id_u and (max_id_u <= last_max_id_u): + # No new top item; skip heavy exec_query + sqlite writes. + return { + "status": "noop", + "scanned": len(rows), + "upserted": 0, + "maxId": str(max_id_u), + "lastMaxId": str(last_max_id_u), + "elapsedMs": int((time.perf_counter() - t0) * 1000.0), + } + + username_by_tid: dict[int, str] = {} + rawxml_by_tid: dict[int, str] = {} + tids: list[int] = [] + for r in rows: + if not isinstance(r, dict): + continue + uname = str(r.get("username") or "").strip() + try: + tid_u = int(r.get("id") or 0) + except Exception: + continue + tid_s = _to_signed_i64(tid_u) + tids.append(tid_s) + if uname: + username_by_tid[tid_s] = uname + raw_xml = str(r.get("rawXml") or "") + if raw_xml: + rawxml_by_tid[tid_s] = raw_xml + + tids = [t for t in list(dict.fromkeys(tids)) if isinstance(t, int)] + + sql_rows: list[dict[str, Any]] = [] + try: + sns_db_path = conn.db_storage_dir / "sns" / "sns.db" + if not sns_db_path.exists(): + sns_db_path = conn.db_storage_dir / "sns.db" + + if tids and sns_db_path.exists(): + in_sql = ",".join([str(x) for x in tids]) + # Newer schema may have pack_info_buf; try it first, then fall back. + sql = f"SELECT tid, user_name, content, pack_info_buf FROM SnsTimeLine WHERE tid IN ({in_sql})" + try: + sql_rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=sql) + except Exception: + sql = f"SELECT tid, user_name, content FROM SnsTimeLine WHERE tid IN ({in_sql})" + sql_rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=sql) + except Exception: + sql_rows = [] + + if sql_rows: + for rr in sql_rows: + if not isinstance(rr, dict): + continue + try: + tid_val = int(rr.get("tid") or 0) + except Exception: + continue + content_xml = str(rr.get("content") or "") + if not content_xml: + continue + uname = str(rr.get("user_name") or rr.get("username") or "").strip() + if not uname: + uname = username_by_tid.get(tid_val, "") + if not uname: + continue + pack = rr.get("pack_info_buf") + pack_text = None if pack is None else str(pack) + upsert_rows.append((tid_val, uname, content_xml, pack_text)) + else: + # Fallback: store rawXml from WCDB rows (may be enough for parsing/export). + for tid_val, uname in username_by_tid.items(): + raw_xml = rawxml_by_tid.get(tid_val) or "" + if not raw_xml: + continue + upsert_rows.append((int(tid_val), str(uname), str(raw_xml), None)) + + upserted = _upsert_sns_timeline_rows_to_decrypted_db( + account_dir, + upsert_rows, + source="realtime-sync-latest", + ) + + if max_id_u: + st2 = dict(st) + st2["maxId"] = str(max_id_u) + st2["updatedAt"] = int(time.time()) + _write_sns_realtime_sync_state(account_dir, st2) + + return { + "status": "ok", + "scanned": len(rows), + "upserted": int(upserted), + "maxId": str(max_id_u or 0), + "lastMaxId": str(last_max_id_u or 0), + "elapsedMs": int((time.perf_counter() - t0) * 1000.0), + } + + @router.get("/api/sns/timeline", summary="获取朋友圈时间线") def list_sns_timeline( account: Optional[str] = None, @@ -913,72 +1562,493 @@ def list_sns_timeline( kw = str(keyword or "").strip() cover_data = None + covers_data: list[dict[str, Any]] = [] if offset == 0: target_wxid = users[0] if users else account_dir.name - cover_data = _get_sns_cover(account_dir, target_wxid) + covers_data = _get_sns_covers(account_dir, target_wxid, limit=20) + cover_data = covers_data[0] if covers_data else None + + def _list_from_decrypted_sqlite() -> dict[str, Any]: + """Legacy path: query the decrypted sns.db under output/databases/{account}. + + Note: This path may contain historical timeline items that are no longer + visible in WeChat due to privacy settings (e.g. "only last 3 days"). + """ + sns_db_path = account_dir / "sns.db" + if not sns_db_path.exists(): + raise HTTPException(status_code=404, detail="sns.db not found for this account.") + + filters: list[str] = [] + params: list[Any] = [] + + if users: + placeholders = ",".join(["?"] * len(users)) + filters.append(f"user_name IN ({placeholders})") + params.extend(users) + + if kw: + filters.append("content LIKE ?") + params.append(f"%{kw}%") + + where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" + + sql = f""" + SELECT tid, user_name, content + FROM SnsTimeLine + {where_sql} + ORDER BY tid DESC + LIMIT ? OFFSET ? + """ + # Fetch 1 extra row to determine hasMore. + params_with_page = params + [limit + 1, offset] + + conn2 = sqlite3.connect(str(sns_db_path)) + conn2.row_factory = sqlite3.Row + try: + rows2 = conn2.execute(sql, params_with_page).fetchall() + except sqlite3.OperationalError as e: + logger.warning("[sns] query failed: %s", e) + raise HTTPException(status_code=500, detail=f"sns.db query failed: {e}") + finally: + conn2.close() + + has_more2 = len(rows2) > limit + rows2 = rows2[:limit] + + post_usernames2 = [str(r["user_name"] or "").strip() for r in rows2 if str(r["user_name"] or "").strip()] + contact_rows2 = _load_contact_rows(contact_db_path, post_usernames2) if contact_db_path.exists() else {} + biz_index2 = _get_biz_to_official_index(contact_db_path) if contact_db_path.exists() else {} + official_usernames2: set[str] = set() + + timeline2: list[dict[str, Any]] = [] + for r in rows2: + try: + tid2 = r["tid"] + except Exception: + tid2 = None + uname2 = str(r["user_name"] or "").strip() + + content_xml = str(r["content"] or "") + parsed2 = _parse_timeline_xml(content_xml, uname2) + + # Best-effort: attach ISAAC64 video key for SNS videos/live-photos (WeFlow compatible). + video_key2 = _extract_sns_video_key(content_xml) + if video_key2: + pmedia2 = parsed2.get("media") + if isinstance(pmedia2, list): + for m0 in pmedia2: + if not isinstance(m0, dict): + continue + if "videoKey" not in m0: + m0["videoKey"] = video_key2 + lp = m0.get("livePhoto") + if isinstance(lp, dict): + if not str(lp.get("key") or "").strip(): + lp["key"] = video_key2 + + display2 = _pick_display_name(contact_rows2.get(uname2), uname2) if uname2 else uname2 + post_type2 = int(parsed2.get("type", 1) or 1) + + official2: dict[str, Any] = {} + if post_type2 == 3: + content_url2 = str(parsed2.get("contentUrl") or "") + biz2 = _extract_mp_biz_from_url(content_url2) + info2 = biz_index2.get(biz2) if biz2 else None + off_username2 = str(info2.get("username") or "").strip() if isinstance(info2, dict) else "" + off_service_type2 = info2.get("serviceType") if isinstance(info2, dict) else None + official2 = { + "biz": biz2, + "username": off_username2, + "serviceType": off_service_type2, + "displayName": "", + } + if off_username2: + official_usernames2.add(off_username2) + + timeline2.append( + { + "id": _to_unsigned_i64_str(tid2) if tid2 is not None else (str(parsed2.get("createTime") or "") or uname2), + "tid": tid2, + "username": uname2 or parsed2.get("username") or "", + "displayName": display2, + "createTime": int(parsed2.get("createTime") or 0), + "contentDesc": str(parsed2.get("contentDesc") or ""), + "location": str(parsed2.get("location") or ""), + "media": parsed2.get("media") or [], + "likes": parsed2.get("likes") or [], + "comments": parsed2.get("comments") or [], + "type": post_type2, + "title": parsed2.get("title", ""), + "contentUrl": parsed2.get("contentUrl", ""), + "finderFeed": parsed2.get("finderFeed", {}), + "official": official2, + } + ) + + if official_usernames2 and contact_db_path.exists(): + official_rows2 = _load_contact_rows(contact_db_path, list(official_usernames2)) + for item in timeline2: + off2 = item.get("official") + if not isinstance(off2, dict): + continue + u0_2 = str(off2.get("username") or "").strip() + if not u0_2: + continue + row2 = official_rows2.get(u0_2) + if row2 is None: + continue + off2["displayName"] = str(_pick_display_name(row2, u0_2)).strip() + + return { + "timeline": timeline2, + "hasMore": has_more2, + "limit": limit, + "offset": offset, + "source": "sqlite", + "cover": cover_data, + "covers": covers_data, + } + + auto_cache_key = _sns_timeline_auto_cache_key(account_dir, users, kw) if users else None + # If we previously detected that WCDB only returns a visible subset for this contact (less than + # the local decrypted snapshot), skip WCDB for subsequent pages to keep pagination flowing. + if auto_cache_key is not None and offset > 0: + try: + if _sns_timeline_auto_cache_get(auto_cache_key): + out = _list_from_decrypted_sqlite() + out["source"] = "sqlite-auto" + return out + except Exception: + pass + + def _list_from_wcdb_snstimeline_table(wcdb_conn: Any) -> Optional[dict[str, Any]]: + """Query encrypted `SnsTimeLine` table directly (bypass timeline API filtering). + + In some cases (commonly: contact sets "only show last 3 days"), the WCDB timeline API returns + an empty list even though the encrypted `sns.db` still contains cached historical rows. + """ + if not users: + return None + + def _q(v: str) -> str: + return "'" + str(v or "").replace("'", "''") + "'" + + try: + sns_db_path = wcdb_conn.db_storage_dir / "sns" / "sns.db" + if not sns_db_path.exists(): + sns_db_path = wcdb_conn.db_storage_dir / "sns.db" + except Exception: + return None + + if not (sns_db_path.exists() and sns_db_path.is_file()): + return None + + filters: list[str] = [ + "content IS NOT NULL", + "content != ''", + # Cover rows are returned separately via `cover`, do not mix into timeline. + "content NOT LIKE '%7%'", + ] + + ulist = [str(u or "").strip() for u in users if str(u or "").strip()] + if ulist: + filters.append(f"user_name IN ({','.join([_q(u) for u in ulist])})") + + if kw: + kw_esc = str(kw).replace("'", "''") + filters.append(f"content LIKE '%{kw_esc}%'") + + where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" + # Fetch 1 extra row to determine hasMore. + sql = f""" + SELECT tid, user_name, content, pack_info_buf + FROM SnsTimeLine + {where_sql} + ORDER BY tid DESC + LIMIT {int(limit) + 1} OFFSET {int(offset)} + """ + + sql_rows: list[dict[str, Any]] = [] + with wcdb_conn.lock: + try: + sql_rows = _wcdb_exec_query(wcdb_conn.handle, kind="media", path=str(sns_db_path), sql=sql) + except Exception: + # Older schema without pack_info_buf. + sql = f""" + SELECT tid, user_name, content + FROM SnsTimeLine + {where_sql} + ORDER BY tid DESC + LIMIT {int(limit) + 1} OFFSET {int(offset)} + """ + sql_rows = _wcdb_exec_query(wcdb_conn.handle, kind="media", path=str(sns_db_path), sql=sql) + + if not sql_rows: + return None + + has_more3 = len(sql_rows) > int(limit) + sql_rows = sql_rows[: int(limit)] + + post_usernames3: list[str] = [] + upsert_rows3: list[tuple[int, str, str, Optional[str]]] = [] + + # Prepare contact/biz mapping (same as other code paths). + for rr in sql_rows: + if not isinstance(rr, dict): + continue + uname3 = str(rr.get("user_name") or rr.get("username") or "").strip() + if uname3: + post_usernames3.append(uname3) + + contact_rows3 = _load_contact_rows(contact_db_path, post_usernames3) if contact_db_path.exists() else {} + biz_index3 = _get_biz_to_official_index(contact_db_path) if contact_db_path.exists() else {} + official_usernames3: set[str] = set() + + timeline3: list[dict[str, Any]] = [] + for rr in sql_rows: + if not isinstance(rr, dict): + continue + + try: + tid3 = int(rr.get("tid") or 0) + except Exception: + continue + + uname3 = str(rr.get("user_name") or rr.get("username") or "").strip() + if not uname3: + continue + + content_xml3 = str(rr.get("content") or "") + if not content_xml3: + continue + + parsed3 = _parse_timeline_xml(content_xml3, uname3) + + # Attach ISAAC64 key for SNS video/live-photo. + video_key3 = _extract_sns_video_key(content_xml3) + if video_key3: + pmedia3 = parsed3.get("media") + if isinstance(pmedia3, list): + for m0 in pmedia3: + if not isinstance(m0, dict): + continue + if "videoKey" not in m0: + m0["videoKey"] = video_key3 + lp = m0.get("livePhoto") + if isinstance(lp, dict): + if not str(lp.get("key") or "").strip(): + lp["key"] = video_key3 + + post_type3 = int(parsed3.get("type", 1) or 1) + if post_type3 == 7: + continue + + display3 = _pick_display_name(contact_rows3.get(uname3), uname3) if uname3 else uname3 + + official3: dict[str, Any] = {} + if post_type3 == 3: + content_url3 = str(parsed3.get("contentUrl") or "") + biz3 = _extract_mp_biz_from_url(content_url3) + info3 = biz_index3.get(biz3) if biz3 else None + off_username3 = str(info3.get("username") or "").strip() if isinstance(info3, dict) else "" + off_service_type3 = info3.get("serviceType") if isinstance(info3, dict) else None + official3 = { + "biz": biz3, + "username": off_username3, + "serviceType": off_service_type3, + "displayName": "", + } + if off_username3: + official_usernames3.add(off_username3) + + timeline3.append( + { + "id": _to_unsigned_i64_str(tid3), + "tid": _to_unsigned_i64_str(tid3), + "username": uname3, + "displayName": str(display3 or "").replace("\xa0", " ").strip() or uname3, + "createTime": int(parsed3.get("createTime") or 0), + "contentDesc": str(parsed3.get("contentDesc") or ""), + "location": str(parsed3.get("location") or ""), + "media": parsed3.get("media") or [], + "likes": parsed3.get("likes") or [], + "comments": parsed3.get("comments") or [], + "type": post_type3, + "title": parsed3.get("title", ""), + "contentUrl": parsed3.get("contentUrl", ""), + "finderFeed": parsed3.get("finderFeed", {}), + "official": official3, + } + ) + + pack3 = rr.get("pack_info_buf") + upsert_rows3.append((int(tid3), uname3, content_xml3, None if pack3 is None else str(pack3))) + + if official_usernames3 and contact_db_path.exists(): + official_rows3 = _load_contact_rows(contact_db_path, list(official_usernames3)) + for item in timeline3: + off3 = item.get("official") + if not isinstance(off3, dict): + continue + u0_3 = str(off3.get("username") or "").strip() + if not u0_3: + continue + row3 = official_rows3.get(u0_3) + if row3 is None: + continue + off3["displayName"] = str(_pick_display_name(row3, u0_3) or "").replace("\xa0", " ").strip() + + # Incremental writeback: cache what we just fetched into decrypted snapshot. + if upsert_rows3: + _upsert_sns_timeline_rows_to_decrypted_db(account_dir, upsert_rows3, source="timeline-wcdb-direct") + + if not timeline3: + return None + + return { + "timeline": timeline3, + "hasMore": has_more3, + "limit": limit, + "offset": offset, + "source": "wcdb-direct", + "cover": cover_data, + "covers": covers_data, + } # Prefer real-time WCDB access (reads the latest encrypted db_storage/sns/sns.db). # Fallback to the decrypted sqlite copy in output/{account}/sns.db. try: conn = WCDB_REALTIME.ensure_connected(account_dir) + writeback_rows: list[tuple[int, str, str, Optional[str]]] = [] + + cached_posts_total = 0 + if users: + # Used to decide whether to auto-switch to the decrypted sqlite snapshot when WCDB only + # returns a small visible subset (privacy settings, etc.). + try: + with _sns_decrypted_db_lock(Path(account_dir).name): + cached_posts_total = _count_sns_timeline_posts_in_decrypted_sqlite( + account_dir / "sns.db", + users=users, + kw=kw, + ) + except Exception: + cached_posts_total = 0 def _clean_name(v: Any) -> str: return str(v or "").replace("\xa0", " ").strip() # Base timeline (includes likes/comments) from WCDB API. with conn.lock: + wcdb_fetch_limit = limit + 1 + wcdb_probe_total: Optional[int] = None + + # Probe WCDB total when we already have a small (<=200) local cache. + # This lets us switch to sqlite on the *first page* without requiring the user + # to scroll to the end of WCDB's (possibly smaller) visible subset. + if users and offset == 0 and cached_posts_total > int(limit) and cached_posts_total <= 200: + wcdb_fetch_limit = 201 # 200 + 1 sentinel + rows = _wcdb_get_sns_timeline( conn.handle, - limit=limit + 1, + limit=wcdb_fetch_limit, offset=offset, usernames=users, keyword=kw, ) + if wcdb_fetch_limit == 201: + try: + wcdb_probe_total = len(rows) if isinstance(rows, list) else 0 + except Exception: + wcdb_probe_total = None + + # If WCDB ends within 200 and is smaller than the local snapshot, serve snapshot immediately. + if ( + users + and offset == 0 + and isinstance(wcdb_probe_total, int) + and wcdb_probe_total >= 0 + and wcdb_probe_total <= 200 + and cached_posts_total > wcdb_probe_total + ): + try: + if auto_cache_key is None: + auto_cache_key = _sns_timeline_auto_cache_key(account_dir, users, kw) + _sns_timeline_auto_cache_set(auto_cache_key, True) + except Exception: + pass + out = _list_from_decrypted_sqlite() + out["source"] = "sqlite-auto" + return out + # Best-effort: enrich posts with XML-only fields (location + media attrs/size) # by querying SnsTimeLine.content from the encrypted sns.db. + username_by_tid: dict[int, str] = {} content_by_tid: dict[int, str] = {} try: sns_db_path = conn.db_storage_dir / "sns" / "sns.db" if not sns_db_path.exists(): sns_db_path = conn.db_storage_dir / "sns.db" - def _to_signed_i64(v: int) -> int: - x = int(v) & 0xFFFFFFFFFFFFFFFF - if x >= 0x8000000000000000: - x -= 0x10000000000000000 - return int(x) - tids: list[int] = [] for r in (rows or [])[: int(limit)]: if not isinstance(r, dict): continue + uname0 = str(r.get("username") or "").strip() try: tid_u = int(r.get("id") or 0) except Exception: continue - tids.append(_to_signed_i64(tid_u)) + tid_s = _to_signed_i64(tid_u) + tids.append(tid_s) + if uname0: + username_by_tid[tid_s] = uname0 tids = list(dict.fromkeys(tids)) if tids and sns_db_path.exists(): in_sql = ",".join([str(x) for x in tids]) - sql = f"SELECT tid, content FROM SnsTimeLine WHERE tid IN ({in_sql})" - sql_rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=sql) + sql = f"SELECT tid, user_name, content, pack_info_buf FROM SnsTimeLine WHERE tid IN ({in_sql})" + try: + sql_rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=sql) + except Exception: + sql = f"SELECT tid, user_name, content FROM SnsTimeLine WHERE tid IN ({in_sql})" + sql_rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=sql) for rr in sql_rows: try: tid_val = int(rr.get("tid")) except Exception: continue - content_by_tid[tid_val] = str(rr.get("content") or "") + content_xml = str(rr.get("content") or "") + if content_xml: + content_by_tid[tid_val] = content_xml + uname1 = str(rr.get("user_name") or rr.get("username") or "").strip() + if not uname1: + uname1 = username_by_tid.get(tid_val, "") + if uname1 and content_xml: + pack = rr.get("pack_info_buf") + writeback_rows.append((tid_val, uname1, content_xml, None if pack is None else str(pack))) except Exception: content_by_tid = {} + writeback_rows = [] has_more = len(rows) > limit rows = rows[:limit] + # Incremental writeback: cache what we just saw into the decrypted snapshot, + # so later "not visible" (e.g. only last 3 days) still has local data. + if writeback_rows: + _upsert_sns_timeline_rows_to_decrypted_db( + account_dir, + writeback_rows, + source="timeline-wcdb", + ) + post_usernames = [str((r or {}).get("username") or "").strip() for r in rows if isinstance(r, dict)] post_usernames = [u for u in post_usernames if u] contact_rows = _load_contact_rows(contact_db_path, post_usernames) if contact_db_path.exists() else {} + biz_index = _get_biz_to_official_index(contact_db_path) if contact_db_path.exists() else {} + official_usernames: set[str] = set() timeline: list[dict[str, Any]] = [] for r in rows: @@ -996,6 +2066,20 @@ def list_sns_timeline( likes = [_clean_name(x) for x in likes if _clean_name(x)] comments = r.get("comments") if isinstance(r.get("comments"), list) else [] + # WeFlow: live photo / SNS video decryption key comes from `` in raw XML. + # Keep it local to avoid sending huge rawXml to the frontend. + video_key = _extract_sns_video_key(r.get("rawXml")) + if video_key and isinstance(media, list): + for m0 in media: + if not isinstance(m0, dict): + continue + if "videoKey" not in m0: + m0["videoKey"] = video_key + lp = m0.get("livePhoto") + if isinstance(lp, dict): + if not str(lp.get("key") or "").strip(): + lp["key"] = video_key + # Enrich with parsed XML when available. location = str(r.get("location") or "") @@ -1043,9 +2127,39 @@ def list_sns_timeline( mm[k] = v merged.append(mm) media = merged + + # If rawXml didn't contain ``, try extracting from the content XML. + # Some WCDB timeline APIs omit rawXml, but the encrypted sns.db content still has the key. + if isinstance(media, list) and (not video_key): + video_key_xml = _extract_sns_video_key(xml) + if video_key_xml: + for m0 in media: + if not isinstance(m0, dict): + continue + if "videoKey" not in m0: + m0["videoKey"] = video_key_xml + lp = m0.get("livePhoto") + if isinstance(lp, dict): + if not str(lp.get("key") or "").strip(): + lp["key"] = video_key_xml except Exception: pass + official: dict[str, Any] = {} + if post_type == 3: + biz = _extract_mp_biz_from_url(content_url) + info = biz_index.get(biz) if biz else None + off_username = str(info.get("username") or "").strip() if isinstance(info, dict) else "" + off_service_type = info.get("serviceType") if isinstance(info, dict) else None + official = { + "biz": biz, + "username": off_username, + "serviceType": off_service_type, + "displayName": "", + } + if off_username: + official_usernames.add(off_username) + pid = str(r.get("id") or "") or str(create_time or "") or uname timeline.append( { @@ -1063,103 +2177,172 @@ def list_sns_timeline( "title": title, "contentUrl": content_url, "finderFeed": finder_feed, + "official": official, } ) - return { + if official_usernames and contact_db_path.exists(): + official_rows = _load_contact_rows(contact_db_path, list(official_usernames)) + for item in timeline: + off = item.get("official") + if not isinstance(off, dict): + continue + u0 = str(off.get("username") or "").strip() + if not u0: + continue + row = official_rows.get(u0) + if row is None: + continue + off["displayName"] = _clean_name(_pick_display_name(row, u0)) + + wcdb_resp = { "timeline": timeline, "hasMore": has_more, "limit": limit, "offset": offset, "source": "wcdb", "cover": cover_data, + "covers": covers_data, } + + # Some contacts may have Moments cached in the decrypted sqlite, while the WCDB + # real-time API returns empty (commonly caused by privacy settings like + # "only show last 3 days"). In that case, fall back to the decrypted sqlite + # so the UI doesn't show an empty timeline when data exists locally. + if (not timeline) and users: + # 1) Try querying encrypted `SnsTimeLine` table directly (can bypass API filtering). + try: + direct = _list_from_wcdb_snstimeline_table(conn) + except Exception: + direct = None + if isinstance(direct, dict) and direct.get("timeline"): + return direct + + # 2) Fallback to decrypted sqlite snapshot (historical cached content). + try: + legacy = _list_from_decrypted_sqlite() + except HTTPException: + legacy = None + except Exception: + legacy = None + if isinstance(legacy, dict) and legacy.get("timeline"): + return legacy + + # Auto-fallback: if WCDB timeline ends but the local decrypted snapshot has more rows for this + # contact query, switch to the snapshot so the frontend can keep paging. + if users and timeline and (not has_more): + try: + with _sns_decrypted_db_lock(Path(account_dir).name): + cached_total = _count_sns_timeline_posts_in_decrypted_sqlite( + account_dir / "sns.db", + users=users, + kw=kw, + ) + wcdb_total = int(offset) + int(len(timeline)) + if cached_total > wcdb_total: + if auto_cache_key is None: + auto_cache_key = _sns_timeline_auto_cache_key(account_dir, users, kw) + _sns_timeline_auto_cache_set(auto_cache_key, True) + out = _list_from_decrypted_sqlite() + out["source"] = "sqlite-auto" + return out + except Exception: + pass + + return wcdb_resp except WCDBRealtimeError as e: logger.info("[sns] wcdb realtime unavailable: %s", e) except Exception as e: logger.warning("[sns] wcdb realtime failed: %s", e) - # Legacy path: query the decrypted sns.db under output/databases/{account}. + return _list_from_decrypted_sqlite() + + +@router.get("/api/sns/users", summary="列出朋友圈联系人(按发圈数统计)") +def list_sns_users( + account: Optional[str] = None, + keyword: Optional[str] = None, + limit: int = 5000, +): + account_dir = _resolve_account_dir(account) sns_db_path = account_dir / "sns.db" if not sns_db_path.exists(): raise HTTPException(status_code=404, detail="sns.db not found for this account.") - filters: list[str] = [] - params: list[Any] = [] + contact_db_path = account_dir / "contact.db" - if users: - placeholders = ",".join(["?"] * len(users)) - filters.append(f"user_name IN ({placeholders})") - params.extend(users) + try: + lim = int(limit or 5000) + except Exception: + lim = 5000 + if lim <= 0: + lim = 5000 + if lim > 5000: + lim = 5000 - if kw: - filters.append("content LIKE ?") - params.append(f"%{kw}%") - - where_sql = f"WHERE {' AND '.join(filters)}" if filters else "" - - sql = f""" - SELECT tid, user_name, content - FROM SnsTimeLine - {where_sql} - ORDER BY tid DESC - LIMIT ? OFFSET ? - """ - # Fetch 1 extra row to determine hasMore. - params_with_page = params + [limit + 1, offset] + kw = str(keyword or "").strip().lower() conn = sqlite3.connect(str(sns_db_path)) conn.row_factory = sqlite3.Row try: - rows = conn.execute(sql, params_with_page).fetchall() - except sqlite3.OperationalError as e: - logger.warning("[sns] query failed: %s", e) - raise HTTPException(status_code=500, detail=f"sns.db query failed: {e}") + cur = conn.cursor() + cur.execute( + """ + SELECT + user_name AS username, + SUM( + CASE + WHEN content IS NOT NULL AND content != '' AND content NOT LIKE '%7%' + THEN 1 ELSE 0 + END + ) AS postCount, + COUNT(*) AS totalCount + FROM SnsTimeLine + GROUP BY user_name + ORDER BY postCount DESC, totalCount DESC + """ + ) + rows = cur.fetchall() or [] finally: - conn.close() + try: + conn.close() + except Exception: + pass - has_more = len(rows) > limit - rows = rows[:limit] + usernames = [str(r["username"] or "").strip() for r in rows if r is not None] + usernames = [u for u in usernames if u] + contact_rows = _load_contact_rows(contact_db_path, usernames) if contact_db_path.exists() else {} - post_usernames = [str(r["user_name"] or "").strip() for r in rows if str(r["user_name"] or "").strip()] - contact_rows = _load_contact_rows(contact_db_path, post_usernames) if contact_db_path.exists() else {} + items: list[dict[str, Any]] = [] + + def _clean_name(v: Any) -> str: + return str(v or "").replace("\xa0", " ").strip() - timeline: list[dict[str, Any]] = [] for r in rows: try: - tid = r["tid"] + uname = str(r["username"] or "").strip() except Exception: - tid = None - uname = str(r["user_name"] or "").strip() - parsed = _parse_timeline_xml(str(r["content"] or ""), uname) - display = _pick_display_name(contact_rows.get(uname), uname) if uname else uname + uname = "" + if not uname: + continue - timeline.append( - { - "id": str(tid if tid is not None else parsed.get("createTime") or "") or uname, - "tid": tid, - "username": uname or parsed.get("username") or "", - "displayName": display, - "createTime": int(parsed.get("createTime") or 0), - "contentDesc": str(parsed.get("contentDesc") or ""), - "location": str(parsed.get("location") or ""), - "media": parsed.get("media") or [], - "likes": parsed.get("likes") or [], - "comments": parsed.get("comments") or [], - "type": parsed.get("type", 1), - "title": parsed.get("title", ""), - "contentUrl": parsed.get("contentUrl", ""), - "finderFeed": parsed.get("finderFeed", {}), - } - ) + try: + post_count = int(r["postCount"] or 0) + except Exception: + post_count = 0 - return { - "timeline": timeline, - "hasMore": has_more, - "limit": limit, - "offset": offset, - "cover": cover_data, - } + row = contact_rows.get(uname) + display = _clean_name(_pick_display_name(row, uname)) or uname + + if kw: + if kw not in uname.lower() and kw not in display.lower(): + continue + + items.append({"username": uname, "displayName": display, "postCount": post_count}) + if len(items) >= lim: + break + + return {"items": items, "count": len(items), "limit": lim} class SnsMediaPicksSaveRequest(BaseModel): @@ -1203,7 +2386,671 @@ def list_sns_media_candidates( return {"count": total, "items": items, "hasMore": end < total, "limit": limit, "offset": offset} -@router.get("/api/sns/media", summary="获取朋友圈图片(本地缓存优先)") +def _is_allowed_sns_media_host(host: str) -> bool: + h = str(host or "").strip().lower() + if not h: + return False + # Images: qpic/qlogo. Thumbs: *.tc.qq.com. Videos/live photos: *.video.qq.com. + return ( + h.endswith(".qpic.cn") + or h.endswith(".qlogo.cn") + or h.endswith(".tc.qq.com") + or h.endswith(".video.qq.com") + ) + + +def _fix_sns_cdn_url(url: str, *, token: str = "", is_video: bool = False) -> str: + """WeFlow-compatible SNS CDN URL normalization. + + - Force https for Tencent CDNs. + - For images, replace `/150` with `/0` to request the original. + - If token is provided and url doesn't contain it, append `token=&idx=1`. + """ + u = html.unescape(str(url or "")).strip() + if not u: + return "" + + # Only touch Tencent CDNs; keep other URLs intact. + try: + p = urlparse(u) + host = str(p.hostname or "").lower() + if not _is_allowed_sns_media_host(host): + return u + except Exception: + return u + + # http -> https + u = re.sub(r"^http://", "https://", u, flags=re.I) + + # /150 -> /0 (image only) + if not is_video: + u = re.sub(r"/150(?=($|\\?))", "/0", u) + + tok = str(token or "").strip() + if tok and ("token=" not in u): + if is_video: + # Match WeFlow: place `token&idx=1` in front of existing query params. + base, sep, qs = u.partition("?") + if sep: + qs = qs.lstrip("&") + u = f"{base}?token={tok}&idx=1" + if qs: + u = f"{u}&{qs}" + else: + u = f"{u}?token={tok}&idx=1" + else: + connector = "&" if "?" in u else "?" + u = f"{u}{connector}token={tok}&idx=1" + + return u + + +def _detect_mp4_ftyp(head: bytes) -> bool: + return bool(head) and len(head) >= 8 and head[4:8] == b"ftyp" + + +@lru_cache(maxsize=1) +def _weflow_wxisaac64_script_path() -> str: + """Locate the Node helper that wraps WeFlow's wasm_video_decode.* assets.""" + repo_root = Path(__file__).resolve().parents[3] + script = repo_root / "tools" / "weflow_wasm_keystream.js" + if script.exists() and script.is_file(): + return str(script) + return "" + + +@lru_cache(maxsize=64) +def _weflow_wxisaac64_keystream(key: str, size: int) -> bytes: + """Generate keystream via WeFlow's WASM (preferred; matches real decryption).""" + key_text = str(key or "").strip() + if not key_text or size <= 0: + return b"" + + # WeFlow is the source-of-truth; use its WASM first, then fall back to our pure-python ISAAC64. + script = _weflow_wxisaac64_script_path() + if not script: + script = "" + + if script: + try: + # The JS helper prints ONLY base64 bytes to stdout; keep stderr for debugging. + proc = subprocess.run( + ["node", script, key_text, str(int(size))], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=30, + check=False, + ) + if proc.returncode == 0: + out_b64 = (proc.stdout or b"").strip() + if out_b64: + return base64.b64decode(out_b64, validate=False) + except Exception: + pass + + # Fallback: pure python ISAAC64 (WeFlow-compatible reverse). + from ..isaac64 import Isaac64 # pylint: disable=import-outside-toplevel + + want = int(size) + # ISAAC64 generates 8-byte words; generate enough and slice. + size8 = ((want + 7) // 8) * 8 + return Isaac64(key_text).generate_keystream(size8)[:want] + + +_SNS_REMOTE_VIDEO_CACHE_EXTS = [ + ".mp4", + ".bin", # legacy/unknown +] + + +def _sns_remote_video_cache_dir_and_stem(account_dir: Path, *, url: str, key: str) -> tuple[Path, str]: + digest = hashlib.md5(f"video|{url}|{key}".encode("utf-8", errors="ignore")).hexdigest() + cache_dir = account_dir / "sns_remote_video_cache" / digest[:2] + return cache_dir, digest + + +def _sns_remote_video_cache_existing_path(cache_dir: Path, stem: str) -> Optional[Path]: + for ext in _SNS_REMOTE_VIDEO_CACHE_EXTS: + p = cache_dir / f"{stem}{ext}" + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + return None + + +async def _download_sns_remote_to_file(url: str, dest_path: Path, *, max_bytes: int) -> tuple[str, str]: + """Download SNS media to file (streaming) from Tencent CDN. + + Returns: (content_type, x_enc) + """ + u = str(url or "").strip() + if not u: + return "", "" + + # Safety: only allow Tencent CDN hosts. + try: + p = urlparse(u) + host = str(p.hostname or "").lower() + if not _is_allowed_sns_media_host(host): + raise HTTPException(status_code=400, detail="SNS media host not allowed.") + except HTTPException: + raise + except Exception: + raise HTTPException(status_code=400, detail="Invalid SNS media URL.") + + base_headers = { + "User-Agent": "MicroMessenger Client", + "Accept": "*/*", + # Do not request compression for video streams. + "Connection": "keep-alive", + } + + header_variants = [ + {}, + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090719) XWEB/8351", + "Referer": "https://servicewechat.com/", + "Origin": "https://servicewechat.com", + }, + {"Referer": "https://wx.qq.com/", "Origin": "https://wx.qq.com"}, + {"Referer": "https://mp.weixin.qq.com/", "Origin": "https://mp.weixin.qq.com"}, + ] + + last_err: Exception | None = None + async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client: + for extra in header_variants: + headers = dict(base_headers) + headers.update(extra) + try: + if dest_path.exists(): + try: + dest_path.unlink(missing_ok=True) + except Exception: + pass + + total = 0 + async with client.stream("GET", u, headers=headers) as resp: + resp.raise_for_status() + content_type = str(resp.headers.get("Content-Type") or "").strip() + x_enc = str(resp.headers.get("x-enc") or "").strip() + dest_path.parent.mkdir(parents=True, exist_ok=True) + with dest_path.open("wb") as f: + async for chunk in resp.aiter_bytes(): + if not chunk: + continue + total += len(chunk) + if total > max_bytes: + raise HTTPException(status_code=400, detail="SNS video too large.") + f.write(chunk) + return content_type, x_enc + except HTTPException: + raise + except Exception as e: + last_err = e + continue + + raise last_err or RuntimeError("sns remote download failed") + + +def _maybe_decrypt_sns_video_file(path: Path, key: str) -> bool: + """Decrypt the first 128KB of an encrypted mp4 file in-place (WeFlow/Isaac64). + + Returns True if decryption was performed, False otherwise. + """ + key_text = str(key or "").strip() + if not key_text: + return False + + try: + size = int(path.stat().st_size) + except Exception: + return False + + if size <= 8: + return False + + decrypt_size = min(131072, size) + if decrypt_size <= 0: + return False + + try: + with path.open("r+b") as f: + head = f.read(8) + if _detect_mp4_ftyp(head): + return False + + f.seek(0) + buf = bytearray(f.read(decrypt_size)) + if not buf: + return False + + # Prefer WeFlow's real keystream generator (WASM) to ensure compatibility. + ks = _weflow_wxisaac64_keystream(key_text, decrypt_size) + n = min(len(buf), len(ks)) + for i in range(n): + buf[i] ^= ks[i] + + f.seek(0) + f.write(buf) + f.flush() + + f.seek(0) + head2 = f.read(8) + if _detect_mp4_ftyp(head2): + return True + # Still return True to indicate we mutated bytes; caller may treat as failure if desired. + return True + except Exception: + return False + + +async def _materialize_sns_remote_video( + *, + account_dir: Path, + url: str, + key: str, + token: str, + use_cache: bool, +) -> Optional[Path]: + """Download SNS video from CDN, decrypt (if needed), and return a local mp4 path.""" + fixed_url = _fix_sns_cdn_url(str(url or ""), token=str(token or ""), is_video=True) + if not fixed_url: + return None + + cache_dir, cache_stem = _sns_remote_video_cache_dir_and_stem(account_dir, url=fixed_url, key=str(key or "")) + + if use_cache: + existing = _sns_remote_video_cache_existing_path(cache_dir, cache_stem) + if existing is not None: + # Best-effort migrate legacy `.bin` -> `.mp4` when it's already decrypted. + try: + if existing.suffix.lower() == ".bin": + with existing.open("rb") as f: + head = f.read(8) + if _detect_mp4_ftyp(head): + target = cache_dir / f"{cache_stem}.mp4" + cache_dir.mkdir(parents=True, exist_ok=True) + os.replace(str(existing), str(target)) + existing = target + except Exception: + pass + return existing + + # Download to a temp file first. + cache_dir.mkdir(parents=True, exist_ok=True) + tmp_path = cache_dir / f"{cache_stem}.mp4.{time.time_ns()}.tmp" + try: + await _download_sns_remote_to_file(fixed_url, tmp_path, max_bytes=200 * 1024 * 1024) + except Exception: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + return None + + # Decrypt in-place (WeFlow ISAAC64) if the file isn't already a mp4. + _maybe_decrypt_sns_video_file(tmp_path, str(key or "")) + + # Validate: mp4 must have `ftyp` at offset 4. + ok_mp4 = False + try: + with tmp_path.open("rb") as f: + head = f.read(8) + ok_mp4 = _detect_mp4_ftyp(head) + except Exception: + ok_mp4 = False + + if not ok_mp4: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + return None + + if use_cache: + final_path = cache_dir / f"{cache_stem}.mp4" + try: + os.replace(str(tmp_path), str(final_path)) + except Exception: + # If rename fails, keep tmp_path as fallback. + final_path = tmp_path + + # Remove other extensions for the same cache key. + for other_ext in _SNS_REMOTE_VIDEO_CACHE_EXTS: + if other_ext.lower() == ".mp4": + continue + other = cache_dir / f"{cache_stem}{other_ext}" + try: + if other.exists() and other.is_file(): + other.unlink(missing_ok=True) + except Exception: + continue + + return final_path + + # Cache disabled: keep the decrypted tmp_path (caller should delete it). + return tmp_path + + +def _best_effort_unlink(path: str) -> None: + try: + Path(path).unlink(missing_ok=True) + except Exception: + pass + + +def _detect_image_mime(data: bytes) -> str: + """Sniff image mime type by magic bytes. + + IMPORTANT: Do NOT trust HTTP Content-Type as a fallback here. We use this for + validating decrypted bytes. If we blindly trust `image/*`, a failed decrypt + would poison the disk cache and the frontend would keep showing broken images. + """ + if not data: + return "" + + if data.startswith(b"\xFF\xD8\xFF"): + return "image/jpeg" + if data.startswith(b"\x89PNG\r\n\x1a\n"): + return "image/png" + if len(data) >= 6 and data[:6] in (b"GIF87a", b"GIF89a"): + return "image/gif" + if len(data) >= 12 and data[:4] == b"RIFF" and data[8:12] == b"WEBP": + return "image/webp" + if len(data) >= 12 and data[4:8] == b"ftyp": + # ISO BMFF based image formats (HEIF/HEIC/AVIF). + brand = data[8:12] + if brand == b"avif": + return "image/avif" + if brand in (b"heic", b"heix", b"hevc", b"hevx"): + return "image/heic" + if brand in (b"heif", b"mif1", b"msf1"): + return "image/heif" + if data.startswith(b"BM"): + return "image/bmp" + + return "" + + +_SNS_REMOTE_CACHE_EXTS = [ + ".jpg", + ".jpeg", + ".png", + ".gif", + ".webp", + ".bmp", + ".avif", + ".heic", + ".heif", + ".bin", # legacy/unknown +] + + +def _mime_to_ext(mt: str) -> str: + m = str(mt or "").split(";", 1)[0].strip().lower() + return { + "image/jpeg": ".jpg", + "image/jpg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "image/webp": ".webp", + "image/bmp": ".bmp", + "image/avif": ".avif", + "image/heic": ".heic", + "image/heif": ".heif", + }.get(m, ".bin") + + +def _ext_to_mime(ext: str) -> str: + e = str(ext or "").strip().lower().lstrip(".") + return { + "jpg": "image/jpeg", + "jpeg": "image/jpeg", + "png": "image/png", + "gif": "image/gif", + "webp": "image/webp", + "bmp": "image/bmp", + "avif": "image/avif", + "heic": "image/heic", + "heif": "image/heif", + }.get(e, "") + + +def _sns_remote_cache_dir_and_stem(account_dir: Path, *, url: str, key: str) -> tuple[Path, str]: + digest = hashlib.md5(f"{url}|{key}".encode("utf-8", errors="ignore")).hexdigest() + cache_dir = account_dir / "sns_remote_cache" / digest[:2] + return cache_dir, digest + + +def _sns_remote_cache_existing_path(cache_dir: Path, stem: str) -> Optional[Path]: + for ext in _SNS_REMOTE_CACHE_EXTS: + p = cache_dir / f"{stem}{ext}" + try: + if p.exists() and p.is_file(): + return p + except Exception: + continue + return None + + +def _sniff_image_mime_from_file(path: Path) -> str: + try: + with path.open("rb") as f: + head = f.read(64) + return _detect_image_mime(head) + except Exception: + return "" + + +async def _download_sns_remote_bytes(url: str) -> tuple[bytes, str, str]: + """Download SNS media bytes from Tencent CDN with a few safe header variants.""" + u = str(url or "").strip() + if not u: + return b"", "", "" + + max_bytes = 25 * 1024 * 1024 + + base_headers = { + "User-Agent": "MicroMessenger Client", + "Accept": "*/*", + "Accept-Language": "zh-CN,zh;q=0.9", + # Avoid brotli dependency issues; images are already compressed anyway. + "Accept-Encoding": "identity", + "Connection": "keep-alive", + } + + # Some CDN endpoints return a small placeholder image for certain UA/Referer + # combinations but still respond 200. Try the simplest (base headers only) + # first to maximize the chance of getting the real media in one request. + header_variants = [ + {}, + # WeFlow/Electron: MicroMessenger UA + servicewechat.com referer passes some CDN anti-hotlink checks. + { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/107.0.0.0 Safari/537.36 MicroMessenger/7.0.20.1781(0x6700143B) WindowsWechat(0x63090719) XWEB/8351", + "Referer": "https://servicewechat.com/", + "Origin": "https://servicewechat.com", + }, + {"Referer": "https://wx.qq.com/", "Origin": "https://wx.qq.com"}, + {"Referer": "https://mp.weixin.qq.com/", "Origin": "https://mp.weixin.qq.com"}, + ] + + last_err: Exception | None = None + async with httpx.AsyncClient(timeout=20.0, follow_redirects=True) as client: + for extra in header_variants: + headers = dict(base_headers) + headers.update(extra) + try: + resp = await client.get(u, headers=headers) + resp.raise_for_status() + payload = bytes(resp.content or b"") + if len(payload) > max_bytes: + raise HTTPException(status_code=400, detail="SNS media too large (>25MB).") + content_type = str(resp.headers.get("Content-Type") or "").strip() + x_enc = str(resp.headers.get("x-enc") or "").strip() + return payload, content_type, x_enc + except HTTPException: + raise + except Exception as e: + last_err = e + continue + + raise last_err or RuntimeError("sns remote download failed") + + +async def _try_fetch_and_decrypt_sns_remote( + *, + account_dir: Path, + url: str, + key: str, + token: str, + use_cache: bool, +) -> Optional[Response]: + """Try WeFlow-style: download from CDN -> decrypt via wcdb_decrypt_sns_image -> return bytes. + + Returns a Response on success, or None on failure so caller can fall back to local cache matching. + """ + u_fixed = _fix_sns_cdn_url(url, token=token, is_video=False) + if not u_fixed: + return None + + try: + p = urlparse(u_fixed) + host = str(p.hostname or "").strip().lower() + except Exception: + return None + if not _is_allowed_sns_media_host(host): + return None + + cache_dir, cache_stem = _sns_remote_cache_dir_and_stem(account_dir, url=u_fixed, key=str(key or "")) + if use_cache: + try: + existing = _sns_remote_cache_existing_path(cache_dir, cache_stem) + if existing is not None: + mt = _ext_to_mime(existing.suffix) + + # Upgrade legacy `.bin` cache to a proper image extension once. + if (existing.suffix or "").lower() == ".bin" or (not mt): + mt2 = _sniff_image_mime_from_file(existing) + if not mt2: + try: + existing.unlink(missing_ok=True) + except Exception: + pass + existing = None + else: + ext2 = _mime_to_ext(mt2) + if ext2 != ".bin": + try: + cache_dir.mkdir(parents=True, exist_ok=True) + desired = cache_dir / f"{cache_stem}{ext2}" + if desired.exists(): + # Another process/version already wrote the real file; drop legacy bin. + existing.unlink(missing_ok=True) + existing = desired + else: + os.replace(str(existing), str(desired)) + existing = desired + except Exception: + pass + mt = mt2 + + if existing is not None and mt: + return FileResponse( + existing, + media_type=mt, + headers={ + "Cache-Control": "public, max-age=86400", + "X-SNS-Source": "remote-cache", + }, + ) + except Exception: + pass + + try: + raw, content_type, x_enc = await _download_sns_remote_bytes(u_fixed) + except Exception as e: + logger.info("[sns] remote download failed: %s", e) + return None + + if not raw: + return None + + # First, validate whether the CDN already returned a real image. + mt_raw = _detect_image_mime(raw) + + decoded = raw + mt = mt_raw + decrypted = False + k = str(key or "").strip() + + # Only attempt decryption when bytes do NOT look like an image, or when CDN explicitly + # signals encryption (x-enc). Some endpoints return already-decoded PNG/JPEG even when + # urlAttrs.enc_idx == 1, and decrypting those would corrupt the bytes. + need_decrypt = bool(k) and (not mt_raw) and bool(raw) + if k and x_enc and str(x_enc).strip() not in ("0", "false", "False"): + need_decrypt = True + + if need_decrypt: + try: + decoded2 = _wcdb_decrypt_sns_image(raw, k) + mt2 = _detect_image_mime(decoded2) + if mt2: + decoded = decoded2 + mt = mt2 + decrypted = decoded2 != raw + else: + # Decrypt failed; if raw is a real image, keep it. Otherwise treat as failure. + if mt_raw: + decoded = raw + mt = mt_raw + decrypted = False + else: + return None + except Exception as e: + logger.info("[sns] remote decrypt failed: %s", e) + if not mt_raw: + return None + decoded = raw + mt = mt_raw + decrypted = False + + if not mt: + return None + + if use_cache: + try: + ext = _mime_to_ext(mt) + cache_dir.mkdir(parents=True, exist_ok=True) + cache_path = cache_dir / f"{cache_stem}{ext}" + + tmp = cache_path.with_suffix(cache_path.suffix + f".{time.time_ns()}.tmp") + tmp.write_bytes(decoded) + os.replace(str(tmp), str(cache_path)) + + # Remove other extensions for the same cache key to avoid stale duplicates. + for other_ext in _SNS_REMOTE_CACHE_EXTS: + if other_ext.lower() == ext.lower(): + continue + other = cache_dir / f"{cache_stem}{other_ext}" + try: + if other.exists() and other.is_file(): + other.unlink(missing_ok=True) + except Exception: + continue + except Exception: + pass + + resp = Response(content=decoded, media_type=mt) + resp.headers["Cache-Control"] = "public, max-age=86400" if use_cache else "no-store" + resp.headers["X-SNS-Source"] = "remote-decrypt" if decrypted else "remote" + if x_enc: + resp.headers["X-SNS-X-Enc"] = x_enc + return resp + + +@router.get("/api/sns/media", summary="获取朋友圈图片(下载解密优先)") async def get_sns_media( account: Optional[str] = None, create_time: int = 0, @@ -1218,11 +3065,34 @@ async def get_sns_media( media_type: int = 2, pick: Optional[str] = None, md5: Optional[str] = None, + token: Optional[str] = None, + key: Optional[str] = None, + use_cache: int = 1, url: Optional[str] = None, ): account_dir = _resolve_account_dir(account) wxid_dir = _resolve_account_wxid_dir(account_dir) + try: + use_cache_flag = bool(int(use_cache or 1)) + except Exception: + use_cache_flag = True + + # 0) Prefer WeFlow-style remote download + decrypt (accurate, avoids local cache mismatch). + remote_resp = await _try_fetch_and_decrypt_sns_remote( + account_dir=account_dir, + url=str(url or ""), + key=str(key or ""), + token=str(token or ""), + use_cache=use_cache_flag, + ) + if remote_resp is not None: + return remote_resp + + # Cache disabled: do not fall back to local cache heuristics. + if not use_cache_flag: + raise HTTPException(status_code=404, detail="SNS media not found (cache disabled).") + if wxid_dir and post_id and media_id: if int(post_type) == 7: raw_key = f"{post_id}_{media_id}_4" # 硬编码 @@ -1420,6 +3290,46 @@ async def proxy_article_thumb(url: str): raise HTTPException(status_code=404, detail="无法获取文章封面") +@router.get("/api/sns/video_remote", summary="获取朋友圈远程视频/实况(下载解密优先)") +async def get_sns_video_remote( + account: Optional[str] = None, + url: Optional[str] = None, + token: Optional[str] = None, + key: Optional[str] = None, + use_cache: int = 1, +): + account_dir = _resolve_account_dir(account) + + try: + use_cache_flag = bool(int(use_cache or 1)) + except Exception: + use_cache_flag = True + + path = await _materialize_sns_remote_video( + account_dir=account_dir, + url=str(url or ""), + key=str(key or ""), + token=str(token or ""), + use_cache=use_cache_flag, + ) + if path is None: + raise HTTPException(status_code=404, detail="SNS remote video not found.") + + headers = {"X-SNS-Source": "remote-video-cache" if use_cache_flag else "remote-video"} + headers["Cache-Control"] = "public, max-age=86400" if use_cache_flag else "no-store" + + if use_cache_flag: + return FileResponse(str(path), media_type="video/mp4", headers=headers) + + # Cache disabled: delete the temp file after response. + return FileResponse( + str(path), + media_type="video/mp4", + headers=headers, + background=BackgroundTask(_best_effort_unlink, str(path)), + ) + + @router.get("/api/sns/video", summary="获取朋友圈本地缓存视频") async def get_sns_video( account: Optional[str] = None, @@ -1440,4 +3350,4 @@ async def get_sns_video( if not video_path: raise HTTPException(status_code=404, detail="Local video cache not found") - return FileResponse(video_path, media_type="video/mp4") \ No newline at end of file + return FileResponse(video_path, media_type="video/mp4") diff --git a/src/wechat_decrypt_tool/sns_realtime_autosync.py b/src/wechat_decrypt_tool/sns_realtime_autosync.py new file mode 100644 index 0000000..478edd1 --- /dev/null +++ b/src/wechat_decrypt_tool/sns_realtime_autosync.py @@ -0,0 +1,274 @@ +"""SNS (Moments) realtime -> decrypted sqlite incremental sync. + +Why: +- We can read the latest Moments via WCDB realtime, but the decrypted snapshot (`output/databases/{account}/sns.db`) + can lag behind or miss data (e.g. you viewed it when it was visible, then it became "only last 3 days"). +- For export/offline browsing, we want to keep a local append-only cache of Moments that were visible at some point. + +This module runs a lightweight background poller that watches db_storage/sns*.db mtime changes and triggers a cheap +incremental sync of the latest N Moments into the decrypted snapshot. +""" + +from __future__ import annotations + +import os +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Any, Optional + +from fastapi import HTTPException + +from .chat_helpers import _list_decrypted_accounts, _resolve_account_dir +from .logging_config import get_logger +from .wcdb_realtime import WCDB_REALTIME + +logger = get_logger(__name__) + + +def _env_bool(name: str, default: bool) -> bool: + raw = str(os.environ.get(name, "") or "").strip().lower() + if not raw: + return default + return raw not in {"0", "false", "no", "off"} + + +def _env_int(name: str, default: int, *, min_v: int, max_v: int) -> int: + raw = str(os.environ.get(name, "") or "").strip() + try: + v = int(raw) + except Exception: + v = int(default) + if v < min_v: + v = min_v + if v > max_v: + v = max_v + return v + + +def _mtime_ns(path: Path) -> int: + try: + st = path.stat() + m_ns = int(getattr(st, "st_mtime_ns", 0) or 0) + if m_ns <= 0: + m_ns = int(float(getattr(st, "st_mtime", 0.0) or 0.0) * 1_000_000_000) + return int(m_ns) + except Exception: + return 0 + + +def _scan_sns_db_mtime_ns(db_storage_dir: Path) -> int: + """Best-effort "latest mtime" signal for sns.db buckets.""" + base = Path(db_storage_dir) + candidates: list[Path] = [ + base / "sns" / "sns.db", + base / "sns" / "sns.db-wal", + base / "sns" / "sns.db-shm", + base / "sns.db", + base / "sns.db-wal", + base / "sns.db-shm", + ] + max_ns = 0 + for p in candidates: + v = _mtime_ns(p) + if v > max_ns: + max_ns = v + return int(max_ns) + + +@dataclass +class _AccountState: + last_mtime_ns: int = 0 + due_at: float = 0.0 + last_sync_end_at: float = 0.0 + thread: Optional[threading.Thread] = None + + +class SnsRealtimeAutoSyncService: + def __init__(self) -> None: + self._enabled = _env_bool("WECHAT_TOOL_SNS_AUTOSYNC", True) + self._interval_ms = _env_int("WECHAT_TOOL_SNS_AUTOSYNC_INTERVAL_MS", 2000, min_v=500, max_v=60_000) + self._debounce_ms = _env_int("WECHAT_TOOL_SNS_AUTOSYNC_DEBOUNCE_MS", 800, min_v=0, max_v=60_000) + self._min_sync_interval_ms = _env_int( + "WECHAT_TOOL_SNS_AUTOSYNC_MIN_SYNC_INTERVAL_MS", 5000, min_v=0, max_v=300_000 + ) + self._workers = _env_int("WECHAT_TOOL_SNS_AUTOSYNC_WORKERS", 1, min_v=1, max_v=4) + self._max_scan = _env_int("WECHAT_TOOL_SNS_AUTOSYNC_MAX_SCAN", 200, min_v=20, max_v=2000) + + self._mu = threading.Lock() + self._states: dict[str, _AccountState] = {} + self._stop = threading.Event() + self._thread: Optional[threading.Thread] = None + + def start(self) -> None: + if not self._enabled: + logger.info("[sns-autosync] disabled by env WECHAT_TOOL_SNS_AUTOSYNC=0") + return + with self._mu: + if self._thread is not None and self._thread.is_alive(): + return + self._stop.clear() + th = threading.Thread(target=self._run, name="sns-realtime-autosync", daemon=True) + self._thread = th + th.start() + logger.info( + "[sns-autosync] started interval_ms=%s debounce_ms=%s min_sync_interval_ms=%s max_scan=%s workers=%s", + int(self._interval_ms), + int(self._debounce_ms), + int(self._min_sync_interval_ms), + int(self._max_scan), + int(self._workers), + ) + + def stop(self) -> None: + self._stop.set() + with self._mu: + self._thread = None + + def _run(self) -> None: + while not self._stop.is_set(): + tick_t0 = time.perf_counter() + try: + self._tick() + except Exception: + logger.exception("[sns-autosync] tick failed") + + elapsed_ms = (time.perf_counter() - tick_t0) * 1000.0 + sleep_ms = max(200.0, float(self._interval_ms) - elapsed_ms) + self._stop.wait(timeout=sleep_ms / 1000.0) + + def _tick(self) -> None: + accounts = _list_decrypted_accounts() + now = time.time() + if not accounts: + return + + for acc in accounts: + if self._stop.is_set(): + break + try: + account_dir = _resolve_account_dir(acc) + except HTTPException: + continue + except Exception: + continue + + info = WCDB_REALTIME.get_status(account_dir) + available = bool(info.get("dll_present") and info.get("key_present") and info.get("db_storage_dir")) + if not available: + continue + + db_storage_dir = Path(str(info.get("db_storage_dir") or "").strip()) + if not db_storage_dir.exists() or not db_storage_dir.is_dir(): + continue + + mtime_ns = _scan_sns_db_mtime_ns(db_storage_dir) + with self._mu: + st = self._states.setdefault(acc, _AccountState()) + if mtime_ns and mtime_ns != st.last_mtime_ns: + st.last_mtime_ns = int(mtime_ns) + st.due_at = now + (float(self._debounce_ms) / 1000.0) + + # Schedule daemon threads. + to_start: list[threading.Thread] = [] + with self._mu: + keep = set(accounts) + for acc in list(self._states.keys()): + if acc not in keep: + self._states.pop(acc, None) + + running = 0 + for st in self._states.values(): + th = st.thread + if th is not None and th.is_alive(): + running += 1 + elif th is not None and (not th.is_alive()): + st.thread = None + + for acc, st in self._states.items(): + if running >= int(self._workers): + break + if st.due_at <= 0 or st.due_at > now: + continue + if st.thread is not None and st.thread.is_alive(): + continue + + since = now - float(st.last_sync_end_at or 0.0) + min_interval = float(self._min_sync_interval_ms) / 1000.0 + if min_interval > 0 and since < min_interval: + st.due_at = now + (min_interval - since) + continue + + st.due_at = 0.0 + th = threading.Thread( + target=self._sync_account_runner, + args=(acc,), + name=f"sns-autosync-{acc}", + daemon=True, + ) + st.thread = th + to_start.append(th) + running += 1 + + for th in to_start: + if self._stop.is_set(): + break + try: + th.start() + except Exception: + with self._mu: + for acc, st in self._states.items(): + if st.thread is th: + st.thread = None + break + + def _sync_account_runner(self, account: str) -> None: + account = str(account or "").strip() + try: + if self._stop.is_set() or (not account): + return + res = self._sync_account(account) + upserted = int((res or {}).get("upserted") or 0) + logger.info("[sns-autosync] sync done account=%s upserted=%s", account, upserted) + except Exception: + logger.exception("[sns-autosync] sync failed account=%s", account) + finally: + with self._mu: + st = self._states.get(account) + if st is not None: + st.thread = None + st.last_sync_end_at = time.time() + + def _sync_account(self, account: str) -> dict[str, Any]: + account = str(account or "").strip() + if not account: + return {"status": "skipped", "reason": "missing account"} + + try: + account_dir = _resolve_account_dir(account) + except Exception as e: + return {"status": "skipped", "reason": f"resolve account failed: {e}"} + + info = WCDB_REALTIME.get_status(account_dir) + available = bool(info.get("dll_present") and info.get("key_present") and info.get("db_storage_dir")) + if not available: + return {"status": "skipped", "reason": "realtime not available"} + + # Import lazily to avoid startup import ordering issues. + from .routers.sns import sync_sns_realtime_timeline_latest + + try: + return sync_sns_realtime_timeline_latest( + account=account, + max_scan=int(self._max_scan), + force=0, + ) + except HTTPException as e: + return {"status": "error", "error": str(e.detail or "")} + except Exception as e: + return {"status": "error", "error": str(e)} + + +SNS_REALTIME_AUTOSYNC = SnsRealtimeAutoSyncService() + diff --git a/src/wechat_decrypt_tool/wcdb_realtime.py b/src/wechat_decrypt_tool/wcdb_realtime.py index bc02be9..aeee2b3 100644 --- a/src/wechat_decrypt_tool/wcdb_realtime.py +++ b/src/wechat_decrypt_tool/wcdb_realtime.py @@ -1,6 +1,8 @@ import ctypes +import binascii import json import os +import re import sys import threading import time @@ -20,7 +22,51 @@ class WCDBRealtimeError(RuntimeError): _NATIVE_DIR = Path(__file__).resolve().parent / "native" -_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll" +_DEFAULT_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll" +_WCDB_API_DLL_SELECTED: Optional[Path] = None + + +def _candidate_wcdb_api_dll_paths() -> list[Path]: + """Return possible locations for wcdb_api.dll (prefer WeFlow's newer build when present).""" + cands: list[Path] = [] + + env = str(os.environ.get("WECHAT_TOOL_WCDB_API_DLL_PATH", "") or "").strip() + if env: + cands.append(Path(env)) + + # Repo checkout convenience: reuse bundled WeFlow / echotrace DLLs when available. + try: + repo_root = Path(__file__).resolve().parents[2] + except Exception: + repo_root = Path.cwd() + + for p in [ + repo_root / "WeFlow" / "resources" / "wcdb_api.dll", + repo_root / "echotrace" / "assets" / "dll" / "wcdb_api.dll", + _DEFAULT_WCDB_API_DLL, + ]: + if p not in cands: + cands.append(p) + + return cands + + +def _resolve_wcdb_api_dll_path() -> Path: + global _WCDB_API_DLL_SELECTED + if _WCDB_API_DLL_SELECTED is not None: + return _WCDB_API_DLL_SELECTED + + for p in _candidate_wcdb_api_dll_paths(): + try: + if p.exists() and p.is_file(): + _WCDB_API_DLL_SELECTED = p + return p + except Exception: + continue + + # Fall back to the default path even if it doesn't exist; caller will raise a clear error. + _WCDB_API_DLL_SELECTED = _DEFAULT_WCDB_API_DLL + return _WCDB_API_DLL_SELECTED _lib_lock = threading.Lock() _lib: Optional[ctypes.CDLL] = None @@ -40,16 +86,18 @@ def _load_wcdb_lib() -> ctypes.CDLL: if not _is_windows(): raise WCDBRealtimeError("WCDB realtime mode is only supported on Windows.") - if not _WCDB_API_DLL.exists(): - raise WCDBRealtimeError(f"Missing wcdb_api.dll at: {_WCDB_API_DLL}") + wcdb_api_dll = _resolve_wcdb_api_dll_path() + if not wcdb_api_dll.exists(): + raise WCDBRealtimeError(f"Missing wcdb_api.dll at: {wcdb_api_dll}") # Ensure dependent DLLs (e.g. WCDB.dll) can be found. try: - os.add_dll_directory(str(_NATIVE_DIR)) + os.add_dll_directory(str(wcdb_api_dll.parent)) except Exception: pass - lib = ctypes.CDLL(str(_WCDB_API_DLL)) + lib = ctypes.CDLL(str(wcdb_api_dll)) + logger.info("[wcdb] using wcdb_api.dll: %s", wcdb_api_dll) # Signatures lib.wcdb_init.argtypes = [] @@ -144,6 +192,19 @@ def _load_wcdb_lib() -> ctypes.CDLL: # Older wcdb_api.dll may not expose this export. pass + # Optional (newer DLLs): wcdb_decrypt_sns_image(encrypted_data, len, key, out_hex) + # WeFlow uses this to decrypt Moments CDN images. + try: + lib.wcdb_decrypt_sns_image.argtypes = [ + ctypes.c_void_p, + ctypes.c_int32, + ctypes.c_char_p, + ctypes.POINTER(ctypes.c_void_p), + ] + lib.wcdb_decrypt_sns_image.restype = ctypes.c_int32 + except Exception: + pass + lib.wcdb_get_logs.argtypes = [ctypes.POINTER(ctypes.c_char_p)] lib.wcdb_get_logs.restype = ctypes.c_int @@ -488,6 +549,63 @@ def get_sns_timeline( return [] +def decrypt_sns_image(encrypted_data: bytes, key: str) -> bytes: + """Decrypt Moments CDN image bytes using WCDB DLL (WeFlow compatible). + + Notes: + - Requires a newer wcdb_api.dll export: wcdb_decrypt_sns_image. + - On failure, returns the original encrypted_data (best-effort behavior like WeFlow). + """ + _ensure_initialized() + lib = _load_wcdb_lib() + fn = getattr(lib, "wcdb_decrypt_sns_image", None) + if not fn: + raise WCDBRealtimeError("Current wcdb_api.dll does not support sns image decryption.") + + raw = bytes(encrypted_data or b"") + if not raw: + return b"" + + k = str(key or "").strip() + if not k: + return raw + + out_ptr = ctypes.c_void_p() + buf = ctypes.create_string_buffer(raw, len(raw)) + rc = 0 + try: + rc = int( + fn( + ctypes.cast(buf, ctypes.c_void_p), + ctypes.c_int32(len(raw)), + k.encode("utf-8"), + ctypes.byref(out_ptr), + ) + ) + + if rc != 0 or not out_ptr.value: + return raw + + hex_bytes = ctypes.cast(out_ptr, ctypes.c_char_p).value or b"" + if not hex_bytes: + return raw + + # Defensive: keep only hex chars (some builds may include whitespace). + hex_clean = re.sub(rb"[^0-9a-fA-F]", b"", hex_bytes) + if not hex_clean: + return raw + try: + return binascii.unhexlify(hex_clean) + except Exception: + return raw + finally: + try: + if out_ptr.value: + lib.wcdb_free_string(ctypes.cast(out_ptr, ctypes.c_char_p)) + except Exception: + pass + + def shutdown() -> None: global _initialized lib = _load_wcdb_lib() @@ -573,11 +691,16 @@ class WCDBRealtimeManager: except Exception as e: err = str(e) - dll_ok = _WCDB_API_DLL.exists() + dll_path = _resolve_wcdb_api_dll_path() + try: + dll_ok = bool(dll_path.exists()) + except Exception: + dll_ok = False connected = self.is_connected(account) return { "account": account, "dll_present": bool(dll_ok), + "wcdb_api_dll": str(dll_path), "key_present": bool(key_ok), "db_storage_dir": str(db_storage_dir) if db_storage_dir else "", "session_db_path": str(session_db_path) if session_db_path else "", diff --git a/tools/weflow_wasm_keystream.js b/tools/weflow_wasm_keystream.js new file mode 100644 index 0000000..9125bb0 --- /dev/null +++ b/tools/weflow_wasm_keystream.js @@ -0,0 +1,122 @@ +// Generate WeChat/WeFlow WxIsaac64 keystream via WeFlow's WASM module. +// +// Usage: +// node tools/weflow_wasm_keystream.js +// +// Prints a base64-encoded keystream to stdout (no extra logs). + +const fs = require('fs') +const path = require('path') +const vm = require('vm') + +function usageAndExit() { + process.stderr.write('Usage: node tools/weflow_wasm_keystream.js \\n') + process.exit(2) +} + +const key = String(process.argv[2] || '').trim() +const size = Number(process.argv[3] || 0) + +if (!key || !Number.isFinite(size) || size <= 0) usageAndExit() + +const basePath = path.join(__dirname, '..', 'WeFlow', 'electron', 'assets', 'wasm') +const wasmPath = path.join(basePath, 'wasm_video_decode.wasm') +const jsPath = path.join(basePath, 'wasm_video_decode.js') + +if (!fs.existsSync(wasmPath) || !fs.existsSync(jsPath)) { + process.stderr.write(`WeFlow WASM assets not found: ${basePath}\\n`) + process.exit(1) +} + +const wasmBinary = fs.readFileSync(wasmPath) +const jsContent = fs.readFileSync(jsPath, 'utf8') + +let capturedKeystream = null +let resolveInit +let rejectInit +const initPromise = new Promise((res, rej) => { + resolveInit = res + rejectInit = rej +}) + +const mockGlobal = { + console: { log: () => {}, error: () => {} }, // keep stdout clean + Buffer, + Uint8Array, + Int8Array, + Uint16Array, + Int16Array, + Uint32Array, + Int32Array, + Float32Array, + Float64Array, + BigInt64Array, + BigUint64Array, + Array, + Object, + Function, + String, + Number, + Boolean, + Error, + Promise, + require, + process, + setTimeout, + clearTimeout, + setInterval, + clearInterval, +} + +mockGlobal.Module = { + onRuntimeInitialized: () => resolveInit(), + wasmBinary, + print: () => {}, + printErr: () => {}, +} + +mockGlobal.self = mockGlobal +mockGlobal.self.location = { href: jsPath } +mockGlobal.WorkerGlobalScope = function () {} +mockGlobal.VTS_WASM_URL = `file://${wasmPath}` + +mockGlobal.wasm_isaac_generate = (ptr, n) => { + const buf = new Uint8Array(mockGlobal.Module.HEAPU8.buffer, ptr, n) + capturedKeystream = new Uint8Array(buf) // copy view +} + +try { + const context = vm.createContext(mockGlobal) + new vm.Script(jsContent, { filename: jsPath }).runInContext(context) +} catch (e) { + rejectInit(e) +} + +;(async () => { + try { + await initPromise + + if (!mockGlobal.Module.WxIsaac64 && mockGlobal.Module.asm && mockGlobal.Module.asm.WxIsaac64) { + mockGlobal.Module.WxIsaac64 = mockGlobal.Module.asm.WxIsaac64 + } + if (!mockGlobal.Module.WxIsaac64) { + throw new Error('WxIsaac64 not found in WASM module') + } + + capturedKeystream = null + const isaac = new mockGlobal.Module.WxIsaac64(key) + isaac.generate(size) + if (isaac.delete) isaac.delete() + + if (!capturedKeystream) throw new Error('Failed to capture keystream') + + const out = Buffer.from(capturedKeystream) + // Match WeFlow worker logic: reverse the captured Uint8Array. + out.reverse() + process.stdout.write(out.toString('base64')) + } catch (e) { + process.stderr.write(String(e && e.stack ? e.stack : e) + '\\n') + process.exit(1) + } +})() +