-
![]()
+
{{ (c.name || c.username || '?').charAt(0) }}
@@ -1779,6 +1808,7 @@ useHead({
const route = useRoute()
const isSnsRoute = computed(() => route.path?.startsWith('/sns'))
+const isWrappedRoute = computed(() => route.path?.startsWith('/wrapped'))
const routeUsername = computed(() => {
const raw = route.params.username
@@ -2098,6 +2128,10 @@ const goSns = async () => {
await navigateTo('/sns')
}
+const goWrapped = async () => {
+ await navigateTo('/wrapped')
+}
+
// 实时更新(WCDB DLL + db_storage watcher)
const realtimeEnabled = ref(false)
const realtimeAvailable = ref(false)
@@ -4319,10 +4353,10 @@ const normalizeMessage = (msg) => {
}
}
-const onMessageAvatarError = (e, message) => {
+const onAvatarError = (e, target) => {
// Make sure we fall back to the initial avatar if the URL 404s/blocks.
try { e?.target && (e.target.style.display = 'none') } catch {}
- try { if (message) message.avatar = null } catch {}
+ try { if (target) target.avatar = null } catch {}
}
const shouldShowEmojiDownload = (message) => {
@@ -5118,14 +5152,16 @@ const toggleRealtime = async (opts = {}) => {
try {
const api = useApi()
const u = String(selectedContact.value?.username || '').trim()
- if (u) {
- // Use a larger scan window on shutdown to reduce the chance of missing a backlog.
- await api.syncChatRealtimeMessages({
- account: selectedAccount.value,
- username: u,
- max_scan: 5000
- })
- }
+ // Sync all sessions once before falling back to the decrypted snapshot.
+ // This keeps the sidebar session list consistent (e.g. new friends) after a refresh.
+ await api.syncChatRealtimeAll({
+ account: selectedAccount.value,
+ max_scan: 200,
+ priority_username: u,
+ priority_max_scan: 5000,
+ include_hidden: true,
+ include_official: true
+ })
} catch {}
await refreshSessionsForSelectedAccount({ sourceOverride: '' })
if (selectedContact.value?.username) {
diff --git a/src/wechat_decrypt_tool/api.py b/src/wechat_decrypt_tool/api.py
index 14d400e..5eeb36a 100644
--- a/src/wechat_decrypt_tool/api.py
+++ b/src/wechat_decrypt_tool/api.py
@@ -11,6 +11,7 @@ from starlette.staticfiles import StaticFiles
from .logging_config import setup_logging, get_logger
from .path_fix import PathFixRoute
+from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC
from .routers.chat import router as _chat_router
from .routers.chat_export import router as _chat_export_router
from .routers.chat_media import router as _chat_media_router
@@ -121,16 +122,41 @@ def _maybe_mount_frontend() -> None:
_maybe_mount_frontend()
+@app.on_event("startup")
+async def _startup_background_jobs() -> None:
+ try:
+ CHAT_REALTIME_AUTOSYNC.start()
+ except Exception:
+ logger.exception("Failed to start realtime autosync service")
+
+
@app.on_event("shutdown")
async def _shutdown_wcdb_realtime() -> None:
try:
- WCDB_REALTIME.close_all()
+ CHAT_REALTIME_AUTOSYNC.stop()
except Exception:
pass
+ close_ok = False
+ lock_timeout_s: float | None = 0.2
try:
- _wcdb_shutdown()
+ raw = str(os.environ.get("WECHAT_TOOL_WCDB_SHUTDOWN_LOCK_TIMEOUT_S", "0.2") or "").strip()
+ lock_timeout_s = float(raw) if raw else 0.2
+ if lock_timeout_s <= 0:
+ lock_timeout_s = None
except Exception:
- pass
+ lock_timeout_s = 0.2
+ try:
+ close_ok = WCDB_REALTIME.close_all(lock_timeout_s=lock_timeout_s)
+ except Exception:
+ close_ok = False
+ if close_ok:
+ try:
+ _wcdb_shutdown()
+ except Exception:
+ pass
+ else:
+ # If some conn locks were busy, other threads may still be running WCDB calls; avoid shutting down the lib.
+ logger.warning("[wcdb] close_all not fully completed; skip wcdb_shutdown")
if __name__ == "__main__":
diff --git a/src/wechat_decrypt_tool/chat_realtime_autosync.py b/src/wechat_decrypt_tool/chat_realtime_autosync.py
new file mode 100644
index 0000000..55021b8
--- /dev/null
+++ b/src/wechat_decrypt_tool/chat_realtime_autosync.py
@@ -0,0 +1,331 @@
+"""Background auto-sync from WCDB realtime (db_storage) into decrypted sqlite.
+
+Why:
+- The UI can read "latest" messages from WCDB realtime (`source=realtime`), but most APIs default to the
+ decrypted sqlite snapshot (`source=decrypted`).
+- Previously we only synced realtime -> decrypted when the UI toggled realtime off, which caused `/api/chat/messages`
+ to lag behind while realtime was enabled.
+
+This module runs a lightweight background poller that watches db_storage mtime changes and triggers an incremental
+sync_all into decrypted sqlite. It is intentionally conservative (debounced + rate-limited) to avoid hammering the
+backend or the sqlite files.
+"""
+
+from __future__ import annotations
+
+import os
+import threading
+import time
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any, Optional
+
+from fastapi import HTTPException
+
+from .chat_helpers import _list_decrypted_accounts, _resolve_account_dir
+from .logging_config import get_logger
+from .wcdb_realtime import WCDB_REALTIME
+
+logger = get_logger(__name__)
+
+
+def _env_bool(name: str, default: bool) -> bool:
+ raw = str(os.environ.get(name, "") or "").strip().lower()
+ if not raw:
+ return default
+ return raw not in {"0", "false", "no", "off"}
+
+
+def _env_int(name: str, default: int, *, min_v: int, max_v: int) -> int:
+ raw = str(os.environ.get(name, "") or "").strip()
+ try:
+ v = int(raw)
+ except Exception:
+ v = int(default)
+ if v < min_v:
+ v = min_v
+ if v > max_v:
+ v = max_v
+ return v
+
+
+def _scan_db_storage_mtime_ns(db_storage_dir: Path) -> int:
+ """Best-effort scan of db_storage for a "latest mtime" signal.
+
+ We intentionally restrict to common database buckets to reduce walk cost.
+ """
+
+ try:
+ base = str(db_storage_dir)
+ except Exception:
+ return 0
+
+ max_ns = 0
+ try:
+ for root, dirs, files in os.walk(base):
+ if root == base:
+ allow = {"message", "session", "contact", "head_image", "bizchat", "sns", "general", "favorite"}
+ dirs[:] = [d for d in dirs if str(d or "").lower() in allow]
+
+ for fn in files:
+ name = str(fn or "").lower()
+ if not name.endswith((".db", ".db-wal", ".db-shm")):
+ continue
+ if not (
+ ("message" in name)
+ or ("session" in name)
+ or ("contact" in name)
+ or ("name2id" in name)
+ or ("head_image" in name)
+ ):
+ continue
+
+ try:
+ st = os.stat(os.path.join(root, fn))
+ m_ns = int(getattr(st, "st_mtime_ns", 0) or 0)
+ if m_ns <= 0:
+ m_ns = int(float(getattr(st, "st_mtime", 0.0) or 0.0) * 1_000_000_000)
+ if m_ns > max_ns:
+ max_ns = m_ns
+ except Exception:
+ continue
+ except Exception:
+ return 0
+
+ return max_ns
+
+
+@dataclass
+class _AccountState:
+ last_mtime_ns: int = 0
+ due_at: float = 0.0
+ last_sync_end_at: float = 0.0
+ thread: Optional[threading.Thread] = None
+
+
+class ChatRealtimeAutoSyncService:
+ def __init__(self) -> None:
+ self._enabled = _env_bool("WECHAT_TOOL_REALTIME_AUTOSYNC", True)
+ self._interval_ms = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_INTERVAL_MS", 1000, min_v=200, max_v=10_000)
+ self._debounce_ms = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_DEBOUNCE_MS", 600, min_v=0, max_v=10_000)
+ self._min_sync_interval_ms = _env_int(
+ "WECHAT_TOOL_REALTIME_AUTOSYNC_MIN_SYNC_INTERVAL_MS", 800, min_v=0, max_v=60_000
+ )
+ self._workers = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_WORKERS", 1, min_v=1, max_v=4)
+
+ # Sync strategy defaults: cheap incremental write into decrypted sqlite.
+ self._sync_max_scan = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_MAX_SCAN", 200, min_v=20, max_v=5000)
+ self._priority_max_scan = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_PRIORITY_MAX_SCAN", 600, min_v=20, max_v=5000)
+ self._backfill_limit = _env_int("WECHAT_TOOL_REALTIME_AUTOSYNC_BACKFILL_LIMIT", 0, min_v=0, max_v=5000)
+ # Default to the same conservative filtering as the chat UI sidebar (avoid hammering gh_/hidden sessions).
+ self._include_hidden = _env_bool("WECHAT_TOOL_REALTIME_AUTOSYNC_INCLUDE_HIDDEN", False)
+ self._include_official = _env_bool("WECHAT_TOOL_REALTIME_AUTOSYNC_INCLUDE_OFFICIAL", False)
+
+ self._mu = threading.Lock()
+ self._states: dict[str, _AccountState] = {}
+ self._stop = threading.Event()
+ self._thread: Optional[threading.Thread] = None
+
+ def start(self) -> None:
+ if not self._enabled:
+ logger.info("[realtime-autosync] disabled by env WECHAT_TOOL_REALTIME_AUTOSYNC=0")
+ return
+
+ with self._mu:
+ if self._thread is not None and self._thread.is_alive():
+ return
+ self._stop.clear()
+ self._thread = threading.Thread(target=self._run, name="realtime-autosync", daemon=True)
+ self._thread.start()
+
+ logger.info(
+ "[realtime-autosync] started interval_ms=%s debounce_ms=%s min_sync_interval_ms=%s max_scan=%s backfill_limit=%s workers=%s",
+ int(self._interval_ms),
+ int(self._debounce_ms),
+ int(self._min_sync_interval_ms),
+ int(self._sync_max_scan),
+ int(self._backfill_limit),
+ int(self._workers),
+ )
+
+ def stop(self) -> None:
+ with self._mu:
+ th = self._thread
+ self._thread = None
+
+ if th is None:
+ return
+
+ self._stop.set()
+ try:
+ th.join(timeout=5.0)
+ except Exception:
+ pass
+
+ logger.info("[realtime-autosync] stopped")
+
+ def _run(self) -> None:
+ while not self._stop.is_set():
+ tick_t0 = time.perf_counter()
+ try:
+ self._tick()
+ except Exception:
+ logger.exception("[realtime-autosync] tick failed")
+
+ # Avoid busy looping on exceptions; keep a minimum sleep.
+ elapsed_ms = (time.perf_counter() - tick_t0) * 1000.0
+ sleep_ms = max(100.0, float(self._interval_ms) - elapsed_ms)
+ self._stop.wait(timeout=sleep_ms / 1000.0)
+
+ def _tick(self) -> None:
+ accounts = _list_decrypted_accounts()
+ now = time.time()
+
+ if not accounts:
+ return
+
+ for acc in accounts:
+ if self._stop.is_set():
+ break
+
+ try:
+ account_dir = _resolve_account_dir(acc)
+ except HTTPException:
+ continue
+ except Exception:
+ continue
+
+ info = WCDB_REALTIME.get_status(account_dir)
+ available = bool(info.get("dll_present") and info.get("key_present") and info.get("db_storage_dir"))
+ if not available:
+ continue
+
+ db_storage_dir = Path(str(info.get("db_storage_dir") or "").strip())
+ if not db_storage_dir.exists() or not db_storage_dir.is_dir():
+ continue
+
+ scan_t0 = time.perf_counter()
+ mtime_ns = _scan_db_storage_mtime_ns(db_storage_dir)
+ scan_ms = (time.perf_counter() - scan_t0) * 1000.0
+ if scan_ms > 2000:
+ logger.warning("[realtime-autosync] scan slow account=%s ms=%.1f", acc, scan_ms)
+
+ with self._mu:
+ st = self._states.setdefault(acc, _AccountState())
+ if mtime_ns and mtime_ns != st.last_mtime_ns:
+ st.last_mtime_ns = int(mtime_ns)
+ st.due_at = now + (float(self._debounce_ms) / 1000.0)
+
+ # Schedule daemon threads. (Important: do NOT use ThreadPoolExecutor here; its threads are non-daemon on
+ # Windows/Python 3.12 and can prevent Ctrl+C from stopping the process.)
+ to_start: list[threading.Thread] = []
+ with self._mu:
+ # Drop state for removed accounts to keep memory bounded.
+ keep = set(accounts)
+ for acc in list(self._states.keys()):
+ if acc not in keep:
+ self._states.pop(acc, None)
+
+ # Clean up finished threads and compute current concurrency.
+ running = 0
+ for st in self._states.values():
+ th = st.thread
+ if th is not None and th.is_alive():
+ running += 1
+ elif th is not None and (not th.is_alive()):
+ st.thread = None
+
+ for acc, st in self._states.items():
+ if running >= int(self._workers):
+ break
+ if st.due_at <= 0 or st.due_at > now:
+ continue
+ if st.thread is not None and st.thread.is_alive():
+ continue
+
+ since = now - float(st.last_sync_end_at or 0.0)
+ min_interval = float(self._min_sync_interval_ms) / 1000.0
+ if min_interval > 0 and since < min_interval:
+ st.due_at = now + (min_interval - since)
+ continue
+
+ st.due_at = 0.0
+ th = threading.Thread(
+ target=self._sync_account_runner,
+ args=(acc,),
+ name=f"realtime-autosync-{acc}",
+ daemon=True,
+ )
+ st.thread = th
+ to_start.append(th)
+ running += 1
+
+ for th in to_start:
+ if self._stop.is_set():
+ break
+ try:
+ th.start()
+ except Exception:
+ # Best-effort: if a thread fails to start, clear the state so we can retry later.
+ with self._mu:
+ for acc, st in self._states.items():
+ if st.thread is th:
+ st.thread = None
+ break
+
+ def _sync_account_runner(self, account: str) -> None:
+ account = str(account or "").strip()
+ try:
+ if self._stop.is_set() or (not account):
+ return
+ res = self._sync_account(account)
+ inserted = int((res or {}).get("inserted_total") or (res or {}).get("insertedTotal") or 0)
+ synced = int((res or {}).get("synced") or (res or {}).get("sessionsSynced") or 0)
+ logger.info("[realtime-autosync] sync done account=%s synced=%s inserted=%s", account, synced, inserted)
+ except Exception:
+ logger.exception("[realtime-autosync] sync failed account=%s", account)
+ finally:
+ with self._mu:
+ st = self._states.get(account)
+ if st is not None:
+ st.thread = None
+ st.last_sync_end_at = time.time()
+
+ def _sync_account(self, account: str) -> dict[str, Any]:
+ """Run a cheap incremental sync_all for one account."""
+
+ account = str(account or "").strip()
+ if not account:
+ return {"status": "skipped", "reason": "missing account"}
+
+ try:
+ account_dir = _resolve_account_dir(account)
+ except Exception as e:
+ return {"status": "skipped", "reason": f"resolve account failed: {e}"}
+
+ info = WCDB_REALTIME.get_status(account_dir)
+ available = bool(info.get("dll_present") and info.get("key_present") and info.get("db_storage_dir"))
+ if not available:
+ return {"status": "skipped", "reason": "realtime not available"}
+
+ # Import lazily to avoid any startup import ordering issues.
+ from .routers.chat import sync_chat_realtime_messages_all
+
+ try:
+ return sync_chat_realtime_messages_all(
+ request=None, # not used by the handler logic; we run it as an internal job
+ account=account,
+ max_scan=int(self._sync_max_scan),
+ priority_username=None,
+ priority_max_scan=int(self._priority_max_scan),
+ include_hidden=bool(self._include_hidden),
+ include_official=bool(self._include_official),
+ backfill_limit=int(self._backfill_limit),
+ )
+ except HTTPException as e:
+ return {"status": "error", "error": str(e.detail or "")}
+ except Exception as e:
+ return {"status": "error", "error": str(e)}
+
+
+CHAT_REALTIME_AUTOSYNC = ChatRealtimeAutoSyncService()
diff --git a/src/wechat_decrypt_tool/routers/chat.py b/src/wechat_decrypt_tool/routers/chat.py
index 116ab0d..7c02107 100644
--- a/src/wechat_decrypt_tool/routers/chat.py
+++ b/src/wechat_decrypt_tool/routers/chat.py
@@ -291,6 +291,130 @@ def _resolve_decrypted_message_table(account_dir: Path, username: str) -> Option
return None
+def _pick_message_db_for_new_table(account_dir: Path, username: str) -> Optional[Path]:
+ """Pick a target decrypted sqlite db to place a new Msg_
table.
+
+ Some accounts have both `message_*.db` and `biz_message_*.db`. For normal users we prefer
+ `message*.db`; for official accounts (`gh_`) we prefer `biz_message*.db`.
+ """
+
+ db_paths = _iter_message_db_paths(account_dir)
+ if not db_paths:
+ return None
+
+ uname = str(username or "").strip()
+ want_biz = bool(uname and uname.startswith("gh_"))
+
+ msg_paths: list[Path] = []
+ biz_paths: list[Path] = []
+ other_paths: list[Path] = []
+ for p in db_paths:
+ ln = p.name.lower()
+ if re.match(r"^message(_\d+)?\.db$", ln):
+ msg_paths.append(p)
+ elif re.match(r"^biz_message(_\d+)?\.db$", ln):
+ biz_paths.append(p)
+ else:
+ other_paths.append(p)
+
+ if want_biz and biz_paths:
+ return biz_paths[0]
+ if msg_paths:
+ return msg_paths[0]
+ if biz_paths:
+ return biz_paths[0]
+ return other_paths[0] if other_paths else db_paths[0]
+
+
+def _ensure_decrypted_message_table(account_dir: Path, username: str) -> tuple[Path, str]:
+ """Ensure the decrypted sqlite has a Msg_ table for this conversation.
+
+ Why:
+ - The decrypted snapshot can miss newly created sessions, so WCDB realtime can show messages
+ while the decrypted message_*.db has no table -> `/api/chat/messages` returns empty.
+ - Realtime sync should be able to create the missing conversation table and then insert rows.
+ """
+
+ uname = str(username or "").strip()
+ if not uname:
+ raise HTTPException(status_code=400, detail="Missing username.")
+
+ resolved = _resolve_decrypted_message_table(account_dir, uname)
+ if resolved:
+ return resolved
+
+ target_db = _pick_message_db_for_new_table(account_dir, uname)
+ if target_db is None:
+ raise HTTPException(status_code=404, detail="No message databases found for this account.")
+
+ # Use the conventional WeChat naming (`Msg_`). Resolution is case-insensitive.
+ import hashlib
+
+ md5_hex = hashlib.md5(uname.encode("utf-8")).hexdigest()
+ table_name = f"Msg_{md5_hex}"
+ quoted_table = _quote_ident(table_name)
+
+ conn = sqlite3.connect(str(target_db))
+ try:
+ conn.execute(
+ f"""
+ CREATE TABLE IF NOT EXISTS {quoted_table}(
+ local_id INTEGER PRIMARY KEY AUTOINCREMENT,
+ server_id INTEGER,
+ local_type INTEGER,
+ sort_seq INTEGER,
+ real_sender_id INTEGER,
+ create_time INTEGER,
+ status INTEGER,
+ upload_status INTEGER,
+ download_status INTEGER,
+ server_seq INTEGER,
+ origin_source INTEGER,
+ source TEXT,
+ message_content TEXT,
+ compress_content TEXT,
+ packed_info_data BLOB,
+ WCDB_CT_message_content INTEGER DEFAULT NULL,
+ WCDB_CT_source INTEGER DEFAULT NULL
+ )
+ """
+ )
+
+ # Match the common indexes we observe on existing Msg_* tables for query performance.
+ idx_sender = _quote_ident(f"{table_name}_SENDERID")
+ idx_server = _quote_ident(f"{table_name}_SERVERID")
+ idx_sort = _quote_ident(f"{table_name}_SORTSEQ")
+ idx_type_seq = _quote_ident(f"{table_name}_TYPE_SEQ")
+ conn.execute(f"CREATE INDEX IF NOT EXISTS {idx_sender} ON {quoted_table}(real_sender_id)")
+ conn.execute(f"CREATE INDEX IF NOT EXISTS {idx_server} ON {quoted_table}(server_id)")
+ conn.execute(f"CREATE INDEX IF NOT EXISTS {idx_sort} ON {quoted_table}(sort_seq)")
+ conn.execute(f"CREATE INDEX IF NOT EXISTS {idx_type_seq} ON {quoted_table}(local_type, sort_seq)")
+
+ conn.commit()
+ finally:
+ conn.close()
+
+ return target_db, table_name
+
+
+def _ensure_decrypted_message_tables(
+ account_dir: Path, usernames: list[str]
+) -> dict[str, tuple[Path, str]]:
+ """Bulk resolver that also creates missing Msg_ tables when needed."""
+
+ table_map = _resolve_decrypted_message_tables(account_dir, usernames)
+ for u in usernames:
+ uname = str(u or "").strip()
+ if not uname or uname in table_map:
+ continue
+ try:
+ table_map[uname] = _ensure_decrypted_message_table(account_dir, uname)
+ except Exception:
+ # Best-effort: if we can't create the table, keep it missing and let callers skip.
+ continue
+ return table_map
+
+
def _resolve_decrypted_message_tables(
account_dir: Path, usernames: list[str]
) -> dict[str, tuple[Path, str]]:
@@ -358,18 +482,160 @@ def _ensure_session_last_message_table(conn: sqlite3.Connection) -> None:
)
+def _get_session_table_columns(conn: sqlite3.Connection) -> set[str]:
+ try:
+ rows = conn.execute("PRAGMA table_info(SessionTable)").fetchall()
+ # PRAGMA table_info returns: cid, name, type, notnull, dflt_value, pk
+ cols = {str(r[1]) for r in rows if r and r[1]}
+ return cols
+ except Exception:
+ return set()
+
+
+def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, Any]]) -> None:
+ """Best-effort upsert of WCDB Session rows into decrypted session.db::SessionTable.
+
+ Why:
+ - WCDB realtime can observe newly created sessions (e.g., new friends) immediately.
+ - The decrypted snapshot's session.db can become stale and miss those sessions entirely, causing
+ the left sidebar list to differ after a refresh (when the UI falls back to decrypted).
+
+ This upsert intentionally avoids depending on message tables; it only keeps SessionTable fresh.
+ """
+
+ if not rows:
+ return
+
+ # Ensure SessionTable exists; if not, silently skip (older/partial accounts).
+ try:
+ conn.execute("SELECT 1 FROM SessionTable LIMIT 1").fetchone()
+ except Exception:
+ return
+
+ cols = _get_session_table_columns(conn)
+ if "username" not in cols:
+ return
+
+ uniq_usernames: list[str] = []
+ for r in rows:
+ u = str((r or {}).get("username") or "").strip()
+ if not u:
+ continue
+ uniq_usernames.append(u)
+ uniq_usernames = list(dict.fromkeys(uniq_usernames))
+ if not uniq_usernames:
+ return
+
+ # Insert missing rows first so UPDATE always has a target.
+ try:
+ conn.executemany(
+ "INSERT OR IGNORE INTO SessionTable(username) VALUES (?)",
+ [(u,) for u in uniq_usernames],
+ )
+ except Exception:
+ # If the schema is unusual, don't fail the whole sync.
+ return
+
+ # Only update columns that exist in this account's schema.
+ # Keep the order stable so executemany parameters line up.
+ desired_cols = [
+ "unread_count",
+ "is_hidden",
+ "summary",
+ "draft",
+ "last_timestamp",
+ "sort_timestamp",
+ "last_msg_type",
+ "last_msg_sub_type",
+ ]
+ update_cols = [c for c in desired_cols if c in cols]
+ if not update_cols:
+ return
+
+ def _int(v: Any) -> int:
+ try:
+ return int(v or 0)
+ except Exception:
+ return 0
+
+ def _text(v: Any) -> str:
+ try:
+ return str(v or "")
+ except Exception:
+ return ""
+
+ params: list[tuple[Any, ...]] = []
+ for r in rows:
+ u = str((r or {}).get("username") or "").strip()
+ if not u:
+ continue
+ values: list[Any] = []
+ for c in update_cols:
+ if c in {"unread_count", "is_hidden", "last_timestamp", "sort_timestamp", "last_msg_type", "last_msg_sub_type"}:
+ values.append(_int((r or {}).get(c)))
+ else:
+ values.append(_text((r or {}).get(c)))
+ values.append(u)
+ params.append(tuple(values))
+
+ if not params:
+ return
+
+ set_expr = ", ".join([f"{c} = ?" for c in update_cols])
+ conn.executemany(f"UPDATE SessionTable SET {set_expr} WHERE username = ?", params)
+
+
+def _load_session_last_message_times(conn: sqlite3.Connection, usernames: list[str]) -> dict[str, int]:
+ """Load last synced message create_time per conversation from session.db::session_last_message.
+
+ Note: This is used as the *sync watermark* for realtime -> decrypted, because SessionTable timestamps may be
+ updated from WCDB session rows for UI consistency.
+ """
+
+ uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
+ if not uniq:
+ return {}
+
+ out: dict[str, int] = {}
+ chunk_size = 900
+ for i in range(0, len(uniq), chunk_size):
+ chunk = uniq[i : i + chunk_size]
+ placeholders = ",".join(["?"] * len(chunk))
+ try:
+ rows = conn.execute(
+ f"SELECT username, create_time FROM session_last_message WHERE username IN ({placeholders})",
+ chunk,
+ ).fetchall()
+ except Exception:
+ continue
+ for r in rows:
+ u = str((r["username"] if isinstance(r, sqlite3.Row) else r[0]) or "").strip()
+ if not u:
+ continue
+ try:
+ ts = int((r["create_time"] if isinstance(r, sqlite3.Row) else r[1]) or 0)
+ except Exception:
+ ts = 0
+ out[u] = int(ts or 0)
+ return out
+
+
@router.post("/api/chat/realtime/sync", summary="实时消息同步到解密库(按会话增量)")
def sync_chat_realtime_messages(
request: Request,
username: str,
account: Optional[str] = None,
max_scan: int = 600,
+ backfill_limit: int = 200,
):
"""
设计目的:实时模式只用来“同步增量”到 output/databases 下的解密库,前端始终从解密库读取显示,
避免 WCDB realtime 返回格式差异(如 compress_content/message_content 的 hex 编码)直接影响渲染。
同步策略:从 WCDB 获取最新消息(从新到旧),直到遇到解密库中已存在的最大 local_id 为止。
+
+ backfill_limit:同步过程中额外“回填”旧消息的 packed_info_data 的最大行数(用于修复旧库缺失字段)。
+ - 设为 0 可显著降低每次同步的扫描/写入开销(更适合前端实时轮询/推送触发的高频增量同步)。
"""
if not username:
raise HTTPException(status_code=400, detail="Missing username.")
@@ -377,6 +643,10 @@ def sync_chat_realtime_messages(
max_scan = 50
if max_scan > 5000:
max_scan = 5000
+ if backfill_limit < 0:
+ backfill_limit = 0
+ if backfill_limit > 5000:
+ backfill_limit = 5000
account_dir = _resolve_account_dir(account)
trace_id = f"rt-sync-{int(time.time() * 1000)}-{threading.get_ident()}"
@@ -399,10 +669,9 @@ def sync_chat_realtime_messages(
except WCDBRealtimeError as e:
raise HTTPException(status_code=400, detail=str(e))
- resolved = _resolve_decrypted_message_table(account_dir, username)
- if not resolved:
- raise HTTPException(status_code=404, detail="Conversation table not found in decrypted databases.")
- msg_db_path, table_name = resolved
+ # Some sessions may not exist in the decrypted snapshot yet; create the missing Msg_ table
+ # so we can insert the realtime rows and make `/api/chat/messages` work after switching off realtime.
+ msg_db_path, table_name = _ensure_decrypted_message_table(account_dir, username)
logger.info(
"[%s] resolved decrypted table account=%s username=%s db=%s table=%s",
trace_id,
@@ -493,7 +762,6 @@ def sync_chat_realtime_messages(
offset = 0
new_rows: list[dict[str, Any]] = []
backfill_rows: list[dict[str, Any]] = []
- backfill_limit = min(200, int(max_scan))
reached_existing = False
stop = False
@@ -545,8 +813,11 @@ def sync_chat_realtime_messages(
continue
reached_existing = True
+ if int(backfill_limit) <= 0:
+ stop = True
+ break
backfill_rows.append(norm)
- if len(backfill_rows) >= backfill_limit:
+ if len(backfill_rows) >= int(backfill_limit):
stop = True
break
@@ -772,11 +1043,18 @@ def _sync_chat_realtime_messages_for_table(
msg_db_path: Path,
table_name: str,
max_scan: int,
+ backfill_limit: int = 200,
) -> dict[str, Any]:
if max_scan < 50:
max_scan = 50
if max_scan > 5000:
max_scan = 5000
+ if backfill_limit < 0:
+ backfill_limit = 0
+ if backfill_limit > 5000:
+ backfill_limit = 5000
+ if backfill_limit > max_scan:
+ backfill_limit = max_scan
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
@@ -858,13 +1136,12 @@ def _sync_chat_realtime_messages_for_table(
offset = 0
new_rows: list[dict[str, Any]] = []
backfill_rows: list[dict[str, Any]] = []
- backfill_limit = min(200, int(max_scan))
reached_existing = False
stop = False
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
- logger.info(
+ logger.debug(
"[realtime] wcdb_get_messages account=%s username=%s take=%s offset=%s",
account_dir.name,
username,
@@ -875,7 +1152,7 @@ def _sync_chat_realtime_messages_for_table(
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, username, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
- logger.info(
+ logger.debug(
"[realtime] wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
account_dir.name,
username,
@@ -907,8 +1184,11 @@ def _sync_chat_realtime_messages_for_table(
continue
reached_existing = True
+ if int(backfill_limit) <= 0:
+ stop = True
+ break
backfill_rows.append(norm)
- if len(backfill_rows) >= backfill_limit:
+ if len(backfill_rows) >= int(backfill_limit):
stop = True
break
@@ -1115,6 +1395,7 @@ def sync_chat_realtime_messages_all(
priority_max_scan: int = 600,
include_hidden: bool = True,
include_official: bool = True,
+ backfill_limit: int = 200,
):
"""
全量会话同步(增量):遍历会话列表,对每个会话调用与 /realtime/sync 相同的“遇到已同步 local_id 即停止”逻辑。
@@ -1141,6 +1422,12 @@ def sync_chat_realtime_messages_all(
priority_max_scan = max_scan
if priority_max_scan > 5000:
priority_max_scan = 5000
+ if backfill_limit < 0:
+ backfill_limit = 0
+ if backfill_limit > 5000:
+ backfill_limit = 5000
+ if backfill_limit > max_scan:
+ backfill_limit = max_scan
priority = str(priority_username or "").strip()
started = time.time()
@@ -1172,6 +1459,7 @@ def sync_chat_realtime_messages_all(
raw_sessions = []
sessions: list[tuple[int, str]] = []
+ realtime_rows_by_user: dict[str, dict[str, Any]] = {}
for item in raw_sessions:
if not isinstance(item, dict):
continue
@@ -1198,6 +1486,34 @@ def sync_chat_realtime_messages_all(
break
sessions.append((ts, uname))
+ # Keep a normalized SessionTable row for upserting into decrypted session.db.
+ norm_row = {
+ "username": uname,
+ "unread_count": item.get("unread_count", item.get("unreadCount", 0)),
+ "is_hidden": item.get("is_hidden", item.get("isHidden", 0)),
+ "summary": item.get("summary", ""),
+ "draft": item.get("draft", ""),
+ "last_timestamp": item.get("last_timestamp", item.get("lastTimestamp", 0)),
+ "sort_timestamp": item.get(
+ "sort_timestamp",
+ item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))),
+ ),
+ "last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
+ "last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
+ }
+ # Prefer the row with the newer sort timestamp for the same username.
+ prev = realtime_rows_by_user.get(uname)
+ try:
+ prev_sort = int((prev or {}).get("sort_timestamp") or 0)
+ except Exception:
+ prev_sort = 0
+ try:
+ cur_sort = int(norm_row.get("sort_timestamp") or 0)
+ except Exception:
+ cur_sort = 0
+ if prev is None or cur_sort >= prev_sort:
+ realtime_rows_by_user[uname] = norm_row
+
def _dedupe(items: list[tuple[int, str]]) -> list[tuple[int, str]]:
seen = set()
out: list[tuple[int, str]] = []
@@ -1219,7 +1535,8 @@ def sync_chat_realtime_messages_all(
len(all_usernames),
)
- # Skip sessions whose decrypted session.db already has a newer/equal sort_timestamp.
+ # Keep SessionTable fresh for UI consistency, and use session_last_message.create_time as the
+ # "sync watermark" (instead of SessionTable timestamps) to decide whether a session needs syncing.
decrypted_ts_by_user: dict[str, int] = {}
if all_usernames:
try:
@@ -1227,45 +1544,49 @@ def sync_chat_realtime_messages_all(
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
try:
- uniq = list(dict.fromkeys([u for u in all_usernames if u]))
- chunk_size = 900
- for i in range(0, len(uniq), chunk_size):
- chunk = uniq[i : i + chunk_size]
- placeholders = ",".join(["?"] * len(chunk))
+ _ensure_session_last_message_table(sconn)
+
+ # If the cache table exists but is empty (older accounts), attempt a one-time build so we
+ # don't keep treating every session as "needs_sync".
+ try:
+ cnt = int(sconn.execute("SELECT COUNT(1) FROM session_last_message").fetchone()[0] or 0)
+ except Exception:
+ cnt = 0
+ if cnt <= 0:
try:
- rows = sconn.execute(
- f"SELECT username, sort_timestamp, last_timestamp FROM SessionTable WHERE username IN ({placeholders})",
- chunk,
- ).fetchall()
- for r in rows:
- u = str(r["username"] or "").strip()
- if not u:
- continue
- try:
- ts = int(r["sort_timestamp"] or 0)
- except Exception:
- ts = 0
- if ts <= 0:
- try:
- ts = int(r["last_timestamp"] or 0)
- except Exception:
- ts = 0
- decrypted_ts_by_user[u] = int(ts or 0)
- except sqlite3.OperationalError:
- rows = sconn.execute(
- f"SELECT username, last_timestamp FROM SessionTable WHERE username IN ({placeholders})",
- chunk,
- ).fetchall()
- for r in rows:
- u = str(r["username"] or "").strip()
- if not u:
- continue
- try:
- decrypted_ts_by_user[u] = int(r["last_timestamp"] or 0)
- except Exception:
- decrypted_ts_by_user[u] = 0
+ sconn.close()
+ except Exception:
+ pass
+ try:
+ build_session_last_message_table(
+ account_dir,
+ rebuild=False,
+ include_hidden=True,
+ include_official=True,
+ )
+ except Exception:
+ pass
+ sconn = sqlite3.connect(str(session_db_path))
+ sconn.row_factory = sqlite3.Row
+ _ensure_session_last_message_table(sconn)
+
+ # Upsert latest WCDB sessions into decrypted SessionTable so the sidebar list remains stable
+ # after switching off realtime (or refreshing the page).
+ try:
+ _upsert_session_table_rows(sconn, list(realtime_rows_by_user.values()))
+ sconn.commit()
+ except Exception:
+ try:
+ sconn.rollback()
+ except Exception:
+ pass
+
+ decrypted_ts_by_user = _load_session_last_message_times(sconn, all_usernames)
finally:
- sconn.close()
+ try:
+ sconn.close()
+ except Exception:
+ pass
except Exception:
decrypted_ts_by_user = {}
@@ -1291,7 +1612,7 @@ def sync_chat_realtime_messages_all(
if priority and priority in sync_usernames:
sync_usernames = [priority] + [u for u in sync_usernames if u != priority]
- table_map = _resolve_decrypted_message_tables(account_dir, sync_usernames)
+ table_map = _ensure_decrypted_message_tables(account_dir, sync_usernames)
logger.info(
"[%s] resolved decrypted tables account=%s resolved=%s need_sync=%s",
trace_id,
@@ -1324,6 +1645,7 @@ def sync_chat_realtime_messages_all(
msg_db_path=msg_db_path,
table_name=table_name,
max_scan=int(cur_scan),
+ backfill_limit=int(backfill_limit),
)
synced += 1
scanned_total += int(result.get("scanned") or 0)
@@ -2595,8 +2917,13 @@ def list_chat_sessions(
row = contact_rows.get(u)
if _pick_display_name(row, u) == u:
need_display.append(u)
- if (not _pick_avatar_url(row)) and (u not in local_avatar_usernames):
- need_avatar.append(u)
+ if source_norm == "realtime":
+ # In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale).
+ if u not in local_avatar_usernames:
+ need_avatar.append(u)
+ else:
+ if (not _pick_avatar_url(row)) and (u not in local_avatar_usernames):
+ need_avatar.append(u)
need_display = list(dict.fromkeys(need_display))
need_avatar = list(dict.fromkeys(need_avatar))
@@ -2655,13 +2982,17 @@ def list_chat_sessions(
if wd and wd != username:
display_name = wd
- avatar_url = _pick_avatar_url(c_row)
- if not avatar_url and username in local_avatar_usernames:
+ # Prefer local head_image avatars when available: decrypted contact.db URLs can be stale
+ # (or hotlink-protected for browsers). WCDB realtime (when available) is the next best.
+ avatar_url = ""
+ if username in local_avatar_usernames:
avatar_url = base_url + _build_avatar_url(account_dir.name, username)
if not avatar_url:
wa = str(wcdb_avatar_urls.get(username) or "").strip()
if wa.lower().startswith(("http://", "https://")):
avatar_url = wa
+ if not avatar_url:
+ avatar_url = _pick_avatar_url(c_row) or ""
last_message = ""
if preview_mode == "session":
@@ -3426,6 +3757,63 @@ def list_chat_messages(
break
scan_take = next_take
+ # Self-heal (default source only): if the decrypted snapshot has no conversation table yet (new session),
+ # do a one-shot realtime->decrypted sync and re-query once. This avoids "暂无聊天记录" after turning off realtime.
+ if (
+ source_norm != "realtime"
+ and (source is None or not str(source).strip())
+ and (not merged)
+ and int(offset) == 0
+ ):
+ missing_table = False
+ try:
+ missing_table = _resolve_decrypted_message_table(account_dir, username) is None
+ except Exception:
+ missing_table = True
+
+ if missing_table:
+ rt_conn2 = None
+ try:
+ rt_conn2 = WCDB_REALTIME.ensure_connected(account_dir)
+ except WCDBRealtimeError:
+ rt_conn2 = None
+ except Exception:
+ rt_conn2 = None
+
+ if rt_conn2 is not None:
+ try:
+ with _realtime_sync_lock(account_dir.name, username):
+ msg_db_path2, table_name2 = _ensure_decrypted_message_table(account_dir, username)
+ _sync_chat_realtime_messages_for_table(
+ account_dir=account_dir,
+ rt_conn=rt_conn2,
+ username=username,
+ msg_db_path=msg_db_path2,
+ table_name=table_name2,
+ max_scan=max(200, int(limit) + 50),
+ backfill_limit=0,
+ )
+ except Exception:
+ pass
+
+ (
+ merged,
+ has_more_any,
+ sender_usernames,
+ quote_usernames,
+ pat_usernames,
+ ) = _collect_chat_messages(
+ username=username,
+ account_dir=account_dir,
+ db_paths=db_paths,
+ resource_conn=resource_conn,
+ resource_chat_id=resource_chat_id,
+ take=scan_take,
+ want_types=want_types,
+ )
+ if want_types is not None:
+ merged = [m for m in merged if _normalize_render_type_key(m.get("renderType")) in want_types]
+
r"""
take = int(limit) + int(offset)
take_probe = take + 1
diff --git a/src/wechat_decrypt_tool/wcdb_realtime.py b/src/wechat_decrypt_tool/wcdb_realtime.py
index 53d9423..ce954d6 100644
--- a/src/wechat_decrypt_tool/wcdb_realtime.py
+++ b/src/wechat_decrypt_tool/wcdb_realtime.py
@@ -615,16 +615,36 @@ class WCDBRealtimeManager:
except Exception:
pass
- def close_all(self) -> None:
+ def close_all(self, *, lock_timeout_s: float | None = None) -> bool:
+ """Close all known WCDB realtime connections.
+
+ When `lock_timeout_s` is None, this waits indefinitely for per-connection locks.
+ When provided, this will skip busy connections after the timeout and return False.
+ """
with self._mu:
conns = list(self._conns.values())
self._conns.clear()
+ ok = True
for conn in conns:
try:
- with conn.lock:
+ if lock_timeout_s is None:
+ with conn.lock:
+ close_account(conn.handle)
+ continue
+
+ acquired = conn.lock.acquire(timeout=float(lock_timeout_s))
+ if not acquired:
+ ok = False
+ logger.warning("[wcdb] close_all skip busy conn account=%s", conn.account)
+ continue
+ try:
close_account(conn.handle)
+ finally:
+ conn.lock.release()
except Exception:
+ ok = False
continue
+ return ok
WCDB_REALTIME = WCDBRealtimeManager()
diff --git a/tests/test_realtime_sync_table_creation.py b/tests/test_realtime_sync_table_creation.py
new file mode 100644
index 0000000..3c33836
--- /dev/null
+++ b/tests/test_realtime_sync_table_creation.py
@@ -0,0 +1,102 @@
+import hashlib
+import sqlite3
+import sys
+import unittest
+from pathlib import Path
+from tempfile import TemporaryDirectory
+
+
+# Ensure "src/" is importable when running tests from repo root.
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+
+class TestRealtimeSyncTableCreation(unittest.TestCase):
+ def _touch_sqlite(self, path: Path) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ conn = sqlite3.connect(str(path))
+ try:
+ # Ensure a valid sqlite file is created.
+ conn.execute("PRAGMA user_version = 1")
+ conn.commit()
+ finally:
+ conn.close()
+
+ def test_ensure_creates_msg_table_and_indexes_in_message_db(self):
+ from wechat_decrypt_tool.routers import chat as chat_router
+
+ with TemporaryDirectory() as td:
+ account_dir = Path(td)
+ self._touch_sqlite(account_dir / "message_0.db")
+
+ username = "wxid_foo"
+ md5_hex = hashlib.md5(username.encode("utf-8")).hexdigest()
+ expected_table = f"Msg_{md5_hex}"
+
+ db_path, table_name = chat_router._ensure_decrypted_message_table(account_dir, username)
+ self.assertEqual(table_name, expected_table)
+ self.assertEqual(db_path.name, "message_0.db")
+
+ conn = sqlite3.connect(str(db_path))
+ try:
+ r = conn.execute(
+ "SELECT 1 FROM sqlite_master WHERE type='table' AND lower(name)=lower(?)",
+ (expected_table,),
+ ).fetchone()
+ self.assertIsNotNone(r, "Msg_ table should be created")
+
+ idx_names = [
+ f"{expected_table}_SENDERID",
+ f"{expected_table}_SERVERID",
+ f"{expected_table}_SORTSEQ",
+ f"{expected_table}_TYPE_SEQ",
+ ]
+ for idx in idx_names:
+ r = conn.execute(
+ "SELECT 1 FROM sqlite_master WHERE type='index' AND lower(name)=lower(?)",
+ (idx,),
+ ).fetchone()
+ self.assertIsNotNone(r, f"Index {idx} should be created")
+ finally:
+ conn.close()
+
+ def test_ensure_prefers_biz_message_for_official_accounts(self):
+ from wechat_decrypt_tool.routers import chat as chat_router
+
+ with TemporaryDirectory() as td:
+ account_dir = Path(td)
+ self._touch_sqlite(account_dir / "message_0.db")
+ self._touch_sqlite(account_dir / "biz_message_0.db")
+
+ username = "gh_12345"
+ db_path, _ = chat_router._ensure_decrypted_message_table(account_dir, username)
+ self.assertEqual(db_path.name, "biz_message_0.db")
+
+ def test_bulk_ensure_creates_missing_tables(self):
+ from wechat_decrypt_tool.routers import chat as chat_router
+
+ with TemporaryDirectory() as td:
+ account_dir = Path(td)
+ self._touch_sqlite(account_dir / "message_0.db")
+
+ usernames = ["wxid_a", "wxid_b"]
+ table_map = chat_router._ensure_decrypted_message_tables(account_dir, usernames)
+ self.assertEqual(set(table_map.keys()), set(usernames))
+
+ conn = sqlite3.connect(str(account_dir / "message_0.db"))
+ try:
+ for u in usernames:
+ md5_hex = hashlib.md5(u.encode("utf-8")).hexdigest()
+ expected_table = f"Msg_{md5_hex}"
+ r = conn.execute(
+ "SELECT 1 FROM sqlite_master WHERE type='table' AND lower(name)=lower(?)",
+ (expected_table,),
+ ).fetchone()
+ self.assertIsNotNone(r, f"{expected_table} should be created for {u}")
+ finally:
+ conn.close()
+
+
+if __name__ == "__main__":
+ unittest.main()
+