diff --git a/src/wechat_decrypt_tool/chat_helpers.py b/src/wechat_decrypt_tool/chat_helpers.py index 196ae75..d3a2767 100644 --- a/src/wechat_decrypt_tool/chat_helpers.py +++ b/src/wechat_decrypt_tool/chat_helpers.py @@ -963,6 +963,45 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di remaining = {u for u in usernames if u} best: dict[str, tuple[tuple[int, int, int], str]] = {} + expected_ts_by_user: dict[str, int] = {} + + session_db_path = Path(account_dir) / "session.db" + if session_db_path.exists() and remaining: + sconn = sqlite3.connect(str(session_db_path)) + sconn.row_factory = sqlite3.Row + try: + uniq = list(dict.fromkeys([u for u in remaining if u])) + chunk_size = 900 + for i in range(0, len(uniq), chunk_size): + chunk = uniq[i : i + chunk_size] + placeholders = ",".join(["?"] * len(chunk)) + try: + rows = sconn.execute( + f"SELECT username, sort_timestamp, last_timestamp FROM SessionTable WHERE username IN ({placeholders})", + chunk, + ).fetchall() + for r in rows: + u = str(r["username"] or "").strip() + if not u: + continue + ts = int(r["sort_timestamp"] or 0) + if ts <= 0: + ts = int(r["last_timestamp"] or 0) + expected_ts_by_user[u] = int(ts or 0) + except sqlite3.OperationalError: + rows = sconn.execute( + f"SELECT username, last_timestamp FROM SessionTable WHERE username IN ({placeholders})", + chunk, + ).fetchall() + for r in rows: + u = str(r["username"] or "").strip() + if not u: + continue + expected_ts_by_user[u] = int(r["last_timestamp"] or 0) + except Exception: + expected_ts_by_user = {} + finally: + sconn.close() if _DEBUG_SESSIONS: logger.info( @@ -1024,7 +1063,39 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di create_time = int(r["create_time"] or 0) sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0 local_id = int(r["local_id"] or 0) - sort_key = (sort_seq, local_id, create_time) + + expected_ts = int(expected_ts_by_user.get(u) or 0) + if expected_ts > 0 and create_time > 0 and create_time < expected_ts: + try: + r2 = conn.execute( + "SELECT " + "m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, " + "n.user_name AS sender_username " + f"FROM {quoted} m " + "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " + "ORDER BY COALESCE(m.create_time, 0) DESC, COALESCE(m.sort_seq, 0) DESC, m.local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + try: + r2 = conn.execute( + "SELECT " + "local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username " + f"FROM {quoted} " + "ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + r2 = None + + if r2 is not None: + r = r2 + local_type = int(r["local_type"] or 0) + create_time = int(r["create_time"] or 0) + sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0 + local_id = int(r["local_id"] or 0) + + sort_key = (create_time, sort_seq, local_id) raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip() sender_username = _decode_sqlite_text(r["sender_username"]).strip() diff --git a/src/wechat_decrypt_tool/routers/chat.py b/src/wechat_decrypt_tool/routers/chat.py index d9b0efc..0ca83b6 100644 --- a/src/wechat_decrypt_tool/routers/chat.py +++ b/src/wechat_decrypt_tool/routers/chat.py @@ -49,10 +49,10 @@ from ..chat_helpers import ( ) from ..media_helpers import _try_find_decrypted_resource from ..path_fix import PathFixRoute -from ..session_preview_index import ( - build_session_preview_index, - get_session_preview_index_status, - load_session_previews, +from ..session_last_message import ( + build_session_last_message_table, + get_session_last_message_status, + load_session_last_messages, ) logger = get_logger(__name__) @@ -84,21 +84,21 @@ async def chat_search_index_build(account: Optional[str] = None, rebuild: bool = return start_chat_search_index_build(account_dir, rebuild=bool(rebuild)) -@router.get("/api/chat/session-preview/status", summary="会话最新消息预览索引状态") -async def session_preview_status(account: Optional[str] = None): +@router.get("/api/chat/session-last-message/status", summary="会话最后一条消息缓存表状态") +async def session_last_message_status(account: Optional[str] = None): account_dir = _resolve_account_dir(account) - return get_session_preview_index_status(account_dir) + return get_session_last_message_status(account_dir) -@router.post("/api/chat/session-preview/build", summary="构建/重建会话最新消息预览索引") -async def session_preview_build( +@router.post("/api/chat/session-last-message/build", summary="构建/重建会话最后一条消息缓存表") +async def session_last_message_build( account: Optional[str] = None, rebuild: bool = False, include_hidden: bool = True, include_official: bool = True, ): account_dir = _resolve_account_dir(account) - return build_session_preview_index( + return build_session_last_message_table( account_dir, rebuild=bool(rebuild), include_hidden=bool(include_hidden), @@ -984,24 +984,32 @@ async def list_chat_sessions( preview_mode = str(preview or "").strip().lower() if preview_mode not in {"latest", "index", "session", "db", "none"}: preview_mode = "latest" + if preview_mode == "index": + preview_mode = "latest" - latest_previews: dict[str, str] = {} - index_status: Optional[dict[str, Any]] = None - if preview_mode in {"latest", "index", "db"}: + last_previews: dict[str, str] = {} + if preview_mode == "latest": try: - index_status = get_session_preview_index_status(account_dir) - latest_previews = load_session_previews(account_dir, usernames) + last_previews = load_session_last_messages(account_dir, usernames) + # Backward-compatible: old decrypted accounts may not have built the cache table yet. + if (not last_previews) and usernames: + build_session_last_message_table( + account_dir, + rebuild=False, + include_hidden=True, + include_official=True, + ) + last_previews = load_session_last_messages(account_dir, usernames) except Exception: - index_status = None - latest_previews = {} + last_previews = {} if preview_mode in {"latest", "db"}: - missing = [u for u in usernames if u and (u not in latest_previews)] - if missing: - legacy = _load_latest_message_previews(account_dir, missing) + targets = usernames if preview_mode == "db" else [u for u in usernames if u and (u not in last_previews)] + if targets: + legacy = _load_latest_message_previews(account_dir, targets) for u, v in legacy.items(): if v: - latest_previews[u] = v + last_previews[u] = v sessions: list[dict[str, Any]] = [] for r in filtered: @@ -1014,12 +1022,28 @@ async def list_chat_sessions( avatar_url = base_url + _build_avatar_url(account_dir.name, username) last_message = "" - draft_text = _decode_sqlite_text(r["draft"]).strip() - if draft_text: - draft_text = re.sub(r"\s+", " ", draft_text).strip() - last_message = f"[草稿] {draft_text}" if draft_text else "[草稿]" - elif preview_mode in {"latest", "db", "index"} and str(latest_previews.get(username) or "").strip(): - last_message = str(latest_previews.get(username) or "").strip() + if preview_mode == "session": + draft_text = _decode_sqlite_text(r["draft"]).strip() + if draft_text: + draft_text = re.sub(r"\s+", " ", draft_text).strip() + last_message = f"[草稿] {draft_text}" if draft_text else "[草稿]" + else: + summary_text = _decode_sqlite_text(r["summary"]).strip() + summary_text = re.sub(r"\s+", " ", summary_text).strip() + if summary_text: + last_message = summary_text + else: + last_message = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"]) + elif preview_mode in {"latest", "db"}: + if str(last_previews.get(username) or "").strip(): + last_message = str(last_previews.get(username) or "").strip() + elif preview_mode != "none": + summary_text = _decode_sqlite_text(r["summary"]).strip() + summary_text = re.sub(r"\s+", " ", summary_text).strip() + if summary_text: + last_message = summary_text + else: + last_message = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"]) elif preview_mode != "none": summary_text = _decode_sqlite_text(r["summary"]).strip() summary_text = re.sub(r"\s+", " ", summary_text).strip() @@ -1048,7 +1072,6 @@ async def list_chat_sessions( "account": account_dir.name, "total": len(sessions), "sessions": sessions, - "previewIndex": index_status.get("index") if isinstance(index_status, dict) else None, } diff --git a/src/wechat_decrypt_tool/session_last_message.py b/src/wechat_decrypt_tool/session_last_message.py new file mode 100644 index 0000000..45a0c22 --- /dev/null +++ b/src/wechat_decrypt_tool/session_last_message.py @@ -0,0 +1,504 @@ +from __future__ import annotations + +import hashlib +import re +import sqlite3 +import time +from pathlib import Path +from typing import Any, Optional + +from .chat_helpers import ( + _build_latest_message_preview, + _decode_message_content, + _decode_sqlite_text, + _infer_last_message_brief, + _is_mostly_printable_text, + _iter_message_db_paths, + _quote_ident, + _should_keep_session, +) +from .logging_config import get_logger + +logger = get_logger(__name__) + +_TABLE_NAME = "session_last_message" +_TABLE_NAME_RE = re.compile(r"^(msg_|chat_)([0-9a-f]{32})", re.IGNORECASE) +_PREVIEW_MAX_LEN = 400 + + +def _session_db_path(account_dir: Path) -> Path: + return Path(account_dir) / "session.db" + + +def _row_get(row: sqlite3.Row, key: str) -> Any: + try: + return row[key] + except Exception: + return None + + +def _normalize_preview(text: str) -> str: + s = str(text or "").strip() + if not s: + return "" + s = re.sub(r"\s+", " ", s).strip() + if len(s) > _PREVIEW_MAX_LEN: + return s[:_PREVIEW_MAX_LEN] + return s + + +def _ensure_table(conn: sqlite3.Connection) -> None: + conn.execute( + f""" + CREATE TABLE IF NOT EXISTS {_TABLE_NAME} ( + username TEXT PRIMARY KEY, + sort_seq INTEGER NOT NULL DEFAULT 0, + local_id INTEGER NOT NULL DEFAULT 0, + create_time INTEGER NOT NULL DEFAULT 0, + local_type INTEGER NOT NULL DEFAULT 0, + sender_username TEXT NOT NULL DEFAULT '', + preview TEXT NOT NULL DEFAULT '', + db_stem TEXT NOT NULL DEFAULT '', + table_name TEXT NOT NULL DEFAULT '', + built_at INTEGER NOT NULL DEFAULT 0 + ) + """ + ) + + +def get_session_last_message_status(account_dir: Path) -> dict[str, Any]: + account_dir = Path(account_dir) + session_db_path = _session_db_path(account_dir) + if not session_db_path.exists(): + return { + "status": "error", + "account": account_dir.name, + "message": "session.db not found.", + } + + conn = sqlite3.connect(str(session_db_path)) + try: + row = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower(?) LIMIT 1", + (_TABLE_NAME,), + ).fetchone() + exists = bool(row and row[0]) + if not exists: + return { + "status": "success", + "account": account_dir.name, + "table": { + "name": _TABLE_NAME, + "exists": False, + "rowCount": 0, + "builtAt": None, + }, + } + count = int(conn.execute(f"SELECT COUNT(1) FROM {_TABLE_NAME}").fetchone()[0] or 0) + built_at = conn.execute(f"SELECT MAX(built_at) FROM {_TABLE_NAME}").fetchone()[0] + try: + built_at_int: Optional[int] = int(built_at) if built_at is not None else None + except Exception: + built_at_int = None + return { + "status": "success", + "account": account_dir.name, + "table": { + "name": _TABLE_NAME, + "exists": True, + "rowCount": count, + "builtAt": built_at_int, + }, + } + finally: + conn.close() + + +def load_session_last_messages(account_dir: Path, usernames: list[str]) -> dict[str, str]: + if not usernames: + return {} + + account_dir = Path(account_dir) + session_db_path = _session_db_path(account_dir) + if not session_db_path.exists(): + return {} + + uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()])) + if not uniq: + return {} + + out: dict[str, str] = {} + conn = sqlite3.connect(str(session_db_path)) + conn.row_factory = sqlite3.Row + try: + chunk_size = 900 + for i in range(0, len(uniq), chunk_size): + chunk = uniq[i : i + chunk_size] + placeholders = ",".join(["?"] * len(chunk)) + rows = conn.execute( + f"SELECT username, preview FROM {_TABLE_NAME} WHERE username IN ({placeholders})", + chunk, + ).fetchall() + for r in rows: + u = str(r["username"] or "").strip() + if not u: + continue + out[u] = str(r["preview"] or "") + return out + except Exception: + return {} + finally: + conn.close() + + +def build_session_last_message_table( + account_dir: Path, + *, + rebuild: bool = False, + include_hidden: bool = True, + include_official: bool = True, +) -> dict[str, Any]: + """ + Build a per-account cache table `{account}/session.db::{session_last_message}`. + + The UI session list needs "last message preview" per conversation; querying message_*.db on every refresh is slow. + This shifts that work to decrypt-time (or one-time manual rebuild). + """ + + account_dir = Path(account_dir) + session_db_path = _session_db_path(account_dir) + if not session_db_path.exists(): + return { + "status": "error", + "account": account_dir.name, + "message": "session.db not found.", + } + + db_paths = _iter_message_db_paths(account_dir) + if not db_paths: + return { + "status": "error", + "account": account_dir.name, + "message": "No message databases found.", + } + + started = time.time() + logger.info(f"[session_last_message] build start account={account_dir.name} dbs={len(db_paths)}") + + sconn = sqlite3.connect(str(session_db_path)) + sconn.row_factory = sqlite3.Row + try: + try: + srows = sconn.execute( + """ + SELECT username, is_hidden, summary, draft, last_msg_type, last_msg_sub_type, sort_timestamp, last_timestamp + FROM SessionTable + ORDER BY sort_timestamp DESC + """ + ).fetchall() + except sqlite3.OperationalError: + srows = sconn.execute( + """ + SELECT username, is_hidden, summary, draft, sort_timestamp, last_timestamp + FROM SessionTable + ORDER BY sort_timestamp DESC + """ + ).fetchall() + finally: + sconn.close() + + sessions: list[sqlite3.Row] = [] + usernames: list[str] = [] + expected_ts_by_user: dict[str, int] = {} + for r in srows: + u = str(_row_get(r, "username") or "").strip() + if not u: + continue + if not include_hidden and int(_row_get(r, "is_hidden") or 0) == 1: + continue + if not _should_keep_session(u, include_official=bool(include_official)): + continue + sessions.append(r) + usernames.append(u) + ts = int(_row_get(r, "sort_timestamp") or 0) + if ts <= 0: + ts = int(_row_get(r, "last_timestamp") or 0) + expected_ts_by_user[u] = int(ts or 0) + + if not usernames: + return { + "status": "success", + "account": account_dir.name, + "message": "No sessions to build.", + "built": 0, + "durationSec": 0.0, + } + + md5_to_users: dict[str, list[str]] = {} + for u in usernames: + h = hashlib.md5(u.encode("utf-8")).hexdigest() + md5_to_users.setdefault(h, []).append(u) + + best: dict[str, tuple[tuple[int, int, int], dict[str, Any]]] = {} + + for db_path in db_paths: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.text_factory = bytes + try: + trows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + md5_to_table: dict[str, str] = {} + for tr in trows: + if not tr or tr[0] is None: + continue + name = _decode_sqlite_text(tr[0]).strip() + if not name: + continue + m = _TABLE_NAME_RE.match(name.lower()) + if not m: + continue + md5_hex = str(m.group(2) or "").lower() + if md5_hex not in md5_to_users: + continue + prefix = str(m.group(1) or "").lower() + if md5_hex not in md5_to_table or prefix == "msg_": + md5_to_table[md5_hex] = name + + if not md5_to_table: + continue + + for md5_hex, table_name in md5_to_table.items(): + users = md5_to_users.get(md5_hex) or [] + if not users: + continue + + quoted = _quote_ident(table_name) + + row = None + try: + row = conn.execute( + "SELECT " + "m.local_id, m.local_type, m.sort_seq, m.create_time, " + "m.message_content, m.compress_content, n.user_name AS sender_username " + f"FROM {quoted} m " + "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " + "ORDER BY m.sort_seq DESC, m.local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + try: + row = conn.execute( + "SELECT " + "local_id, local_type, sort_seq, create_time, " + "message_content, compress_content, '' AS sender_username " + f"FROM {quoted} " + "ORDER BY sort_seq DESC, local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + row = None + + if row is None: + continue + + try: + sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0 + except Exception: + sort_seq = 0 + try: + local_id = int(row["local_id"] or 0) + except Exception: + local_id = 0 + try: + create_time = int(row["create_time"] or 0) + except Exception: + create_time = 0 + + # If session.db indicates a newer timestamp, fall back to slower but correct ordering. + need_slow = False + for username in users: + expected_ts = int(expected_ts_by_user.get(username) or 0) + if expected_ts > 0 and int(create_time or 0) > 0 and int(create_time or 0) < expected_ts: + need_slow = True + break + + if need_slow: + try: + row2 = conn.execute( + "SELECT " + "m.local_id, m.local_type, m.sort_seq, m.create_time, " + "m.message_content, m.compress_content, n.user_name AS sender_username " + f"FROM {quoted} m " + "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " + "ORDER BY COALESCE(m.create_time, 0) DESC, COALESCE(m.sort_seq, 0) DESC, m.local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + try: + row2 = conn.execute( + "SELECT " + "local_id, local_type, sort_seq, create_time, " + "message_content, compress_content, '' AS sender_username " + f"FROM {quoted} " + "ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC " + "LIMIT 1" + ).fetchone() + except Exception: + row2 = None + if row2 is not None: + row = row2 + try: + sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0 + except Exception: + sort_seq = 0 + try: + local_id = int(row["local_id"] or 0) + except Exception: + local_id = 0 + try: + create_time = int(row["create_time"] or 0) + except Exception: + create_time = 0 + + sort_key = (int(create_time), int(sort_seq), int(local_id)) + + raw_text = _decode_message_content(row["compress_content"], row["message_content"]).strip() + if raw_text and (not raw_text.lstrip().startswith("<")) and (not raw_text.lstrip().startswith('"<')): + if not _is_mostly_printable_text(raw_text): + raw_text = "" + sender_username = _decode_sqlite_text(row["sender_username"]).strip() + + for username in users: + prev = best.get(username) + if prev is not None and sort_key <= prev[0]: + continue + + is_group = bool(username.endswith("@chatroom")) + try: + preview = _build_latest_message_preview( + username=username, + local_type=int(row["local_type"] or 0), + raw_text=raw_text, + is_group=is_group, + sender_username=sender_username, + ) + except Exception: + preview = "" + if preview and (not _is_mostly_printable_text(preview)): + try: + preview = _build_latest_message_preview( + username=username, + local_type=int(row["local_type"] or 0), + raw_text="", + is_group=is_group, + sender_username=sender_username, + ) + except Exception: + preview = "" + + preview = _normalize_preview(preview) + if not preview: + continue + + best[username] = ( + sort_key, + { + "username": username, + "sort_seq": int(sort_seq), + "local_id": int(local_id), + "create_time": int(create_time), + "local_type": int(row["local_type"] or 0), + "sender_username": sender_username, + "preview": preview, + "db_stem": str(db_path.stem), + "table_name": str(table_name), + }, + ) + finally: + try: + conn.close() + except Exception: + pass + + # Fallback: always have a non-empty preview for UI. + for r in sessions: + u = str(_row_get(r, "username") or "").strip() + if not u: + continue + if u in best: + continue + + draft_text = _normalize_preview(_decode_sqlite_text(_row_get(r, "draft")).strip()) + if draft_text: + preview = f"[草稿] {draft_text}" if draft_text else "[草稿]" + else: + summary_text = _normalize_preview(_decode_sqlite_text(_row_get(r, "summary")).strip()) + if summary_text: + preview = summary_text + else: + preview = _infer_last_message_brief(_row_get(r, "last_msg_type"), _row_get(r, "last_msg_sub_type")) + preview = _normalize_preview(preview) + best[u] = ( + (0, 0, 0), + { + "username": u, + "sort_seq": 0, + "local_id": 0, + "create_time": 0, + "local_type": 0, + "sender_username": "", + "preview": preview, + "db_stem": "", + "table_name": "", + }, + ) + + built_at = int(time.time()) + conn_out = sqlite3.connect(str(session_db_path)) + try: + _ensure_table(conn_out) + if rebuild: + try: + conn_out.execute(f"DELETE FROM {_TABLE_NAME}") + except Exception: + pass + + rows_to_insert: list[tuple[Any, ...]] = [] + for _, rec in best.values(): + rows_to_insert.append( + ( + rec["username"], + int(rec["sort_seq"] or 0), + int(rec["local_id"] or 0), + int(rec["create_time"] or 0), + int(rec["local_type"] or 0), + str(rec["sender_username"] or ""), + str(rec["preview"] or ""), + str(rec["db_stem"] or ""), + str(rec["table_name"] or ""), + int(built_at), + ) + ) + + conn_out.executemany( + f"INSERT OR REPLACE INTO {_TABLE_NAME}(" + "username, sort_seq, local_id, create_time, local_type, sender_username, preview, db_stem, table_name, built_at" + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + rows_to_insert, + ) + conn_out.commit() + finally: + conn_out.close() + + duration = max(0.0, time.time() - started) + logger.info( + f"[session_last_message] build done account={account_dir.name} sessions={len(best)} " + f"durationSec={round(duration, 3)} table={_TABLE_NAME}" + ) + return { + "status": "success", + "account": account_dir.name, + "built": len(best), + "table": _TABLE_NAME, + "durationSec": round(duration, 3), + } diff --git a/src/wechat_decrypt_tool/session_preview_index.py b/src/wechat_decrypt_tool/session_preview_index.py deleted file mode 100644 index d043047..0000000 --- a/src/wechat_decrypt_tool/session_preview_index.py +++ /dev/null @@ -1,574 +0,0 @@ -from __future__ import annotations - -import hashlib -import json -import os -import re -import sqlite3 -import time -from pathlib import Path -from typing import Any, Optional - -from .chat_helpers import ( - _build_latest_message_preview, - _decode_message_content, - _decode_sqlite_text, - _infer_last_message_brief, - _is_mostly_printable_text, - _iter_message_db_paths, - _quote_ident, - _should_keep_session, -) -from .logging_config import get_logger - -logger = get_logger(__name__) - -_SCHEMA_VERSION = 1 -_INDEX_DB_NAME = "session_preview.db" -_INDEX_DB_TMP_NAME = "session_preview.tmp.db" - -_TABLE_NAME_RE = re.compile(r"^(msg_|chat_)([0-9a-f]{32})", re.IGNORECASE) - - -def get_session_preview_index_db_path(account_dir: Path) -> Path: - return account_dir / _INDEX_DB_NAME - - -def _index_db_tmp_path(account_dir: Path) -> Path: - return account_dir / _INDEX_DB_TMP_NAME - - -def _file_sig(path: Path) -> tuple[str, int, int]: - st = path.stat() - mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000)) - return (path.name, int(st.st_size), int(mtime_ns)) - - -def _compute_source_fingerprint(account_dir: Path) -> dict[str, Any]: - """Compute a stable fingerprint for the current decrypted data set.""" - session_db_path = account_dir / "session.db" - msg_paths = _iter_message_db_paths(account_dir) - - items: list[tuple[str, int, int]] = [] - try: - if session_db_path.exists(): - items.append(_file_sig(session_db_path)) - except Exception: - pass - - for p in msg_paths: - try: - if p.exists(): - items.append(_file_sig(p)) - except Exception: - continue - - items.sort() - payload = json.dumps(items, ensure_ascii=False, separators=(",", ":")).encode("utf-8", errors="ignore") - return { - "fingerprint": hashlib.sha256(payload).hexdigest(), - "files": items, - "dbCount": len(msg_paths), - } - - -def _inspect_index(index_path: Path) -> dict[str, Any]: - if not index_path.exists(): - return { - "exists": False, - "ready": False, - "schemaVersion": None, - "hasMetaTable": False, - "hasPreviewTable": False, - } - - conn = sqlite3.connect(str(index_path)) - try: - rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() - names = {str(r[0]).lower() for r in rows if r and r[0]} - has_meta = "meta" in names - has_preview = "session_preview" in names - - schema_version: Optional[int] = None - if has_meta: - try: - r = conn.execute("SELECT value FROM meta WHERE key='schema_version' LIMIT 1").fetchone() - if r and r[0] is not None: - schema_version = int(str(r[0]).strip() or "0") - except Exception: - schema_version = None - - ready = bool(has_preview and (schema_version is None or schema_version >= _SCHEMA_VERSION)) - - return { - "exists": True, - "ready": ready, - "schemaVersion": schema_version, - "hasMetaTable": bool(has_meta), - "hasPreviewTable": bool(has_preview), - } - except Exception: - return { - "exists": True, - "ready": False, - "schemaVersion": None, - "hasMetaTable": False, - "hasPreviewTable": False, - } - finally: - conn.close() - - -def get_session_preview_index_status(account_dir: Path) -> dict[str, Any]: - index_path = get_session_preview_index_db_path(account_dir) - inspect = _inspect_index(index_path) - meta: dict[str, str] = {} - current: dict[str, Any] = {} - stale = False - - if bool(inspect.get("ready")): - conn = sqlite3.connect(str(index_path)) - try: - rows = conn.execute("SELECT key, value FROM meta").fetchall() - for k, v in rows: - if k is None: - continue - meta[str(k)] = "" if v is None else str(v) - except Exception: - meta = {} - finally: - conn.close() - - current = _compute_source_fingerprint(account_dir) - expected = str(meta.get("source_fingerprint") or "").strip() - actual = str(current.get("fingerprint") or "").strip() - if expected and actual and expected != actual: - stale = True - - return { - "status": "success", - "account": account_dir.name, - "index": { - "path": str(index_path), - "exists": bool(inspect.get("exists")), - "ready": bool(inspect.get("ready")), - "stale": bool(stale), - "needsRebuild": (not bool(inspect.get("ready"))) or bool(stale), - "schemaVersion": inspect.get("schemaVersion"), - "meta": meta, - "current": current, - }, - } - - -def load_session_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]: - status = get_session_preview_index_status(account_dir) - index = dict(status.get("index") or {}) - if not bool(index.get("ready")): - return {} - if bool(index.get("stale")): - return {} - - index_path = get_session_preview_index_db_path(account_dir) - - uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()])) - if not uniq: - return {} - - out: dict[str, str] = {} - conn = sqlite3.connect(str(index_path)) - conn.row_factory = sqlite3.Row - try: - chunk_size = 900 # sqlite 默认变量上限常见为 999 - for i in range(0, len(uniq), chunk_size): - chunk = uniq[i : i + chunk_size] - placeholders = ",".join(["?"] * len(chunk)) - rows = conn.execute( - f"SELECT username, preview FROM session_preview WHERE username IN ({placeholders})", - chunk, - ).fetchall() - for r in rows: - u = str(r["username"] or "").strip() - if not u: - continue - out[u] = str(r["preview"] or "") - return out - finally: - conn.close() - - -def _init_index_db(conn: sqlite3.Connection) -> None: - conn.execute("PRAGMA journal_mode=DELETE") - conn.execute("PRAGMA synchronous=OFF") - conn.execute("PRAGMA temp_store=MEMORY") - - conn.execute("CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value TEXT NOT NULL)") - conn.execute( - """ - CREATE TABLE IF NOT EXISTS session_preview ( - username TEXT PRIMARY KEY, - sort_seq INTEGER NOT NULL DEFAULT 0, - local_id INTEGER NOT NULL DEFAULT 0, - create_time INTEGER NOT NULL DEFAULT 0, - local_type INTEGER NOT NULL DEFAULT 0, - sender_username TEXT NOT NULL DEFAULT '', - preview TEXT NOT NULL DEFAULT '', - db_stem TEXT NOT NULL DEFAULT '', - table_name TEXT NOT NULL DEFAULT '' - ) - """ - ) - conn.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("schema_version", str(_SCHEMA_VERSION)), - ) - - -def build_session_preview_index( - account_dir: Path, - *, - rebuild: bool = False, - include_hidden: bool = True, - include_official: bool = True, -) -> dict[str, Any]: - """ - Build a per-account session preview index as `{account}/session_preview.db`. - - Why: `message_*.db` tables do NOT index `create_time`, so `ORDER BY create_time DESC LIMIT 1` - is extremely slow when done per-session at runtime. This index shifts that work to a one-time build. - """ - - account_dir = Path(account_dir) - session_db_path = account_dir / "session.db" - if not session_db_path.exists(): - return { - "status": "error", - "account": account_dir.name, - "message": "session.db not found.", - } - - db_paths = _iter_message_db_paths(account_dir) - if not db_paths: - return { - "status": "error", - "account": account_dir.name, - "message": "No message databases found.", - } - - started = time.time() - logger.info(f"[session_preview] build start account={account_dir.name} dbs={len(db_paths)}") - - sconn = sqlite3.connect(str(session_db_path)) - sconn.row_factory = sqlite3.Row - try: - srows = sconn.execute( - """ - SELECT username, is_hidden, summary, draft, last_msg_type, last_msg_sub_type, sort_timestamp, last_timestamp - FROM SessionTable - ORDER BY sort_timestamp DESC - """ - ).fetchall() - finally: - sconn.close() - - sessions: list[sqlite3.Row] = [] - usernames: list[str] = [] - for r in srows: - u = str(r["username"] or "").strip() - if not u: - continue - if not include_hidden and int(r["is_hidden"] or 0) == 1: - continue - if not _should_keep_session(u, include_official=bool(include_official)): - continue - sessions.append(r) - usernames.append(u) - - if not usernames: - return { - "status": "success", - "account": account_dir.name, - "message": "No sessions to index.", - "indexed": 0, - "path": str(get_session_preview_index_db_path(account_dir)), - } - - md5_to_users: dict[str, list[str]] = {} - for u in usernames: - h = hashlib.md5(u.encode("utf-8")).hexdigest() - md5_to_users.setdefault(h, []).append(u) - - best: dict[str, tuple[tuple[int, int, int], dict[str, Any]]] = {} - - for db_path in db_paths: - conn = sqlite3.connect(str(db_path)) - conn.row_factory = sqlite3.Row - conn.text_factory = bytes - try: - trows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() - md5_to_table: dict[str, str] = {} - for tr in trows: - if not tr or tr[0] is None: - continue - name = _decode_sqlite_text(tr[0]).strip() - if not name: - continue - m = _TABLE_NAME_RE.match(name.lower()) - if not m: - continue - md5_hex = str(m.group(2) or "").lower() - if md5_hex not in md5_to_users: - continue - prefix = str(m.group(1) or "").lower() - if md5_hex not in md5_to_table or prefix == "msg_": - md5_to_table[md5_hex] = name - - if not md5_to_table: - continue - - for md5_hex, table_name in md5_to_table.items(): - users = md5_to_users.get(md5_hex) or [] - if not users: - continue - - quoted = _quote_ident(table_name) - - row = None - try: - row = conn.execute( - "SELECT " - "m.local_id, m.local_type, m.sort_seq, m.create_time, " - "m.message_content, m.compress_content, n.user_name AS sender_username " - f"FROM {quoted} m " - "LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid " - "ORDER BY m.sort_seq DESC, m.local_id DESC " - "LIMIT 1" - ).fetchone() - except Exception: - try: - row = conn.execute( - "SELECT " - "local_id, local_type, sort_seq, create_time, " - "message_content, compress_content, '' AS sender_username " - f"FROM {quoted} " - "ORDER BY sort_seq DESC, local_id DESC " - "LIMIT 1" - ).fetchone() - except Exception: - row = None - - if row is None: - continue - - try: - sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0 - except Exception: - sort_seq = 0 - try: - local_id = int(row["local_id"] or 0) - except Exception: - local_id = 0 - try: - create_time = int(row["create_time"] or 0) - except Exception: - create_time = 0 - sort_key = (int(sort_seq), int(local_id), int(create_time)) - - raw_text = _decode_message_content(row["compress_content"], row["message_content"]).strip() - if raw_text and (not raw_text.lstrip().startswith("<")) and (not raw_text.lstrip().startswith('"<')): - # Avoid leaking unreadable compressed/binary payloads into UI. - if not _is_mostly_printable_text(raw_text): - raw_text = "" - sender_username = _decode_sqlite_text(row["sender_username"]).strip() - - for username in users: - prev = best.get(username) - if prev is not None and sort_key <= prev[0]: - continue - - is_group = bool(username.endswith("@chatroom")) - try: - preview = _build_latest_message_preview( - username=username, - local_type=int(row["local_type"] or 0), - raw_text=raw_text, - is_group=is_group, - sender_username=sender_username, - ) - except Exception: - preview = "" - if preview and (not _is_mostly_printable_text(preview)): - try: - preview = _build_latest_message_preview( - username=username, - local_type=int(row["local_type"] or 0), - raw_text="", - is_group=is_group, - sender_username=sender_username, - ) - except Exception: - preview = "" - if not preview: - continue - - best[username] = ( - sort_key, - { - "username": username, - "sort_seq": int(sort_seq), - "local_id": int(local_id), - "create_time": int(create_time), - "local_type": int(row["local_type"] or 0), - "sender_username": sender_username, - "preview": preview, - "db_stem": str(db_path.stem), - "table_name": str(table_name), - }, - ) - finally: - try: - conn.close() - except Exception: - pass - - # Fallback: ensure we always have a non-empty lastMessage for UI (even if message tables missing). - for r in sessions: - u = str(r["username"] or "").strip() - if not u: - continue - if u in best: - continue - draft_text = _decode_sqlite_text(r["draft"]).strip() - if draft_text: - draft_text = re.sub(r"\s+", " ", draft_text).strip() - preview = f"[草稿] {draft_text}" if draft_text else "[草稿]" - else: - summary_text = _decode_sqlite_text(r["summary"]).strip() - summary_text = re.sub(r"\s+", " ", summary_text).strip() - if summary_text: - preview = summary_text - else: - preview = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"]) - best[u] = ( - (0, 0, 0), - { - "username": u, - "sort_seq": 0, - "local_id": 0, - "create_time": 0, - "local_type": 0, - "sender_username": "", - "preview": str(preview or ""), - "db_stem": "", - "table_name": "", - }, - ) - - final_path = get_session_preview_index_db_path(account_dir) - tmp_path = _index_db_tmp_path(account_dir) - try: - if tmp_path.exists(): - tmp_path.unlink() - except Exception: - pass - - try: - conn_out = sqlite3.connect(str(tmp_path)) - try: - _init_index_db(conn_out) - try: - conn_out.commit() - except Exception: - pass - conn_out.execute("BEGIN") - rows_to_insert: list[tuple[Any, ...]] = [] - for _, rec in best.values(): - rows_to_insert.append( - ( - rec["username"], - int(rec["sort_seq"] or 0), - int(rec["local_id"] or 0), - int(rec["create_time"] or 0), - int(rec["local_type"] or 0), - str(rec["sender_username"] or ""), - str(rec["preview"] or ""), - str(rec["db_stem"] or ""), - str(rec["table_name"] or ""), - ) - ) - conn_out.executemany( - "INSERT OR REPLACE INTO session_preview(" - "username, sort_seq, local_id, create_time, local_type, sender_username, preview, db_stem, table_name" - ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", - rows_to_insert, - ) - conn_out.commit() - - built_at = int(time.time()) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("built_at", str(built_at)), - ) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("session_count", str(len(best))), - ) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("db_count", str(len(db_paths))), - ) - src = _compute_source_fingerprint(account_dir) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("source_fingerprint", str(src.get("fingerprint") or "")), - ) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("source_files", json.dumps(src.get("files") or [], ensure_ascii=False)), - ) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("built_include_hidden", "1" if include_hidden else "0"), - ) - conn_out.execute( - "INSERT INTO meta(key, value) VALUES(?, ?) " - "ON CONFLICT(key) DO UPDATE SET value = excluded.value", - ("built_include_official", "1" if include_official else "0"), - ) - conn_out.commit() - finally: - conn_out.close() - - os.replace(str(tmp_path), str(final_path)) - except Exception as e: - logger.exception(f"[session_preview] build failed: {e}") - try: - if tmp_path.exists(): - tmp_path.unlink() - except Exception: - pass - return { - "status": "error", - "account": account_dir.name, - "message": str(e), - } - - duration = max(0.0, time.time() - started) - logger.info( - f"[session_preview] build done account={account_dir.name} indexed={len(best)} " - f"durationSec={round(duration, 3)} path={final_path}" - ) - return { - "status": "success", - "account": account_dir.name, - "indexed": len(best), - "path": str(final_path), - "durationSec": round(duration, 3), - } diff --git a/src/wechat_decrypt_tool/wechat_decrypt.py b/src/wechat_decrypt_tool/wechat_decrypt.py index 21b2e80..a0d5b5c 100644 --- a/src/wechat_decrypt_tool/wechat_decrypt.py +++ b/src/wechat_decrypt_tool/wechat_decrypt.py @@ -441,20 +441,20 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di "failed_files": account_failed } - # 构建“会话最后一条消息”索引:把耗时挪到解密阶段,一劳永逸 - if os.environ.get("WECHAT_TOOL_BUILD_SESSION_PREVIEW", "1") != "0": + # 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表 + if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0": try: - from .session_preview_index import build_session_preview_index + from .session_last_message import build_session_last_message_table - account_results[account_name]["session_preview_index"] = build_session_preview_index( + account_results[account_name]["session_last_message"] = build_session_last_message_table( account_output_dir, rebuild=True, include_hidden=True, include_official=True, ) except Exception as e: - logger.warning(f"构建会话预览索引失败: {account_name}: {e}") - account_results[account_name]["session_preview_index"] = { + logger.warning(f"构建会话最后一条消息缓存表失败: {account_name}: {e}") + account_results[account_name]["session_last_message"] = { "status": "error", "message": str(e), }