improvement(chat): 会话列表改用 session_last_message 缓存表

- 用 session.db::session_last_message 缓存会话最后一条消息预览,减少会话列表查询开销
- 增加缓存表构建/状态接口,列表缺失时自动补建
- 解密阶段支持自动构建,环境变量切换为 WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE
- 移除 session_preview_index 实现
This commit is contained in:
2977094657
2025-12-25 23:08:50 +08:00
parent 5a8075ca8a
commit e7d977ae94
5 changed files with 633 additions and 609 deletions

View File

@@ -963,6 +963,45 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
remaining = {u for u in usernames if u} remaining = {u for u in usernames if u}
best: dict[str, tuple[tuple[int, int, int], str]] = {} best: dict[str, tuple[tuple[int, int, int], str]] = {}
expected_ts_by_user: dict[str, int] = {}
session_db_path = Path(account_dir) / "session.db"
if session_db_path.exists() and remaining:
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
try:
uniq = list(dict.fromkeys([u for u in remaining if u]))
chunk_size = 900
for i in range(0, len(uniq), chunk_size):
chunk = uniq[i : i + chunk_size]
placeholders = ",".join(["?"] * len(chunk))
try:
rows = sconn.execute(
f"SELECT username, sort_timestamp, last_timestamp FROM SessionTable WHERE username IN ({placeholders})",
chunk,
).fetchall()
for r in rows:
u = str(r["username"] or "").strip()
if not u:
continue
ts = int(r["sort_timestamp"] or 0)
if ts <= 0:
ts = int(r["last_timestamp"] or 0)
expected_ts_by_user[u] = int(ts or 0)
except sqlite3.OperationalError:
rows = sconn.execute(
f"SELECT username, last_timestamp FROM SessionTable WHERE username IN ({placeholders})",
chunk,
).fetchall()
for r in rows:
u = str(r["username"] or "").strip()
if not u:
continue
expected_ts_by_user[u] = int(r["last_timestamp"] or 0)
except Exception:
expected_ts_by_user = {}
finally:
sconn.close()
if _DEBUG_SESSIONS: if _DEBUG_SESSIONS:
logger.info( logger.info(
@@ -1024,7 +1063,39 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
create_time = int(r["create_time"] or 0) create_time = int(r["create_time"] or 0)
sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0 sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0
local_id = int(r["local_id"] or 0) local_id = int(r["local_id"] or 0)
sort_key = (sort_seq, local_id, create_time)
expected_ts = int(expected_ts_by_user.get(u) or 0)
if expected_ts > 0 and create_time > 0 and create_time < expected_ts:
try:
r2 = conn.execute(
"SELECT "
"m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
"n.user_name AS sender_username "
f"FROM {quoted} m "
"LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid "
"ORDER BY COALESCE(m.create_time, 0) DESC, COALESCE(m.sort_seq, 0) DESC, m.local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
try:
r2 = conn.execute(
"SELECT "
"local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
f"FROM {quoted} "
"ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
r2 = None
if r2 is not None:
r = r2
local_type = int(r["local_type"] or 0)
create_time = int(r["create_time"] or 0)
sort_seq = int(r["sort_seq"] or 0) if r["sort_seq"] is not None else 0
local_id = int(r["local_id"] or 0)
sort_key = (create_time, sort_seq, local_id)
raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip() raw_text = _decode_message_content(r["compress_content"], r["message_content"]).strip()
sender_username = _decode_sqlite_text(r["sender_username"]).strip() sender_username = _decode_sqlite_text(r["sender_username"]).strip()

View File

@@ -49,10 +49,10 @@ from ..chat_helpers import (
) )
from ..media_helpers import _try_find_decrypted_resource from ..media_helpers import _try_find_decrypted_resource
from ..path_fix import PathFixRoute from ..path_fix import PathFixRoute
from ..session_preview_index import ( from ..session_last_message import (
build_session_preview_index, build_session_last_message_table,
get_session_preview_index_status, get_session_last_message_status,
load_session_previews, load_session_last_messages,
) )
logger = get_logger(__name__) logger = get_logger(__name__)
@@ -84,21 +84,21 @@ async def chat_search_index_build(account: Optional[str] = None, rebuild: bool =
return start_chat_search_index_build(account_dir, rebuild=bool(rebuild)) return start_chat_search_index_build(account_dir, rebuild=bool(rebuild))
@router.get("/api/chat/session-preview/status", summary="会话最新消息预览索引状态") @router.get("/api/chat/session-last-message/status", summary="会话最后一条消息缓存表状态")
async def session_preview_status(account: Optional[str] = None): async def session_last_message_status(account: Optional[str] = None):
account_dir = _resolve_account_dir(account) account_dir = _resolve_account_dir(account)
return get_session_preview_index_status(account_dir) return get_session_last_message_status(account_dir)
@router.post("/api/chat/session-preview/build", summary="构建/重建会话最新消息预览索引") @router.post("/api/chat/session-last-message/build", summary="构建/重建会话最后一条消息缓存表")
async def session_preview_build( async def session_last_message_build(
account: Optional[str] = None, account: Optional[str] = None,
rebuild: bool = False, rebuild: bool = False,
include_hidden: bool = True, include_hidden: bool = True,
include_official: bool = True, include_official: bool = True,
): ):
account_dir = _resolve_account_dir(account) account_dir = _resolve_account_dir(account)
return build_session_preview_index( return build_session_last_message_table(
account_dir, account_dir,
rebuild=bool(rebuild), rebuild=bool(rebuild),
include_hidden=bool(include_hidden), include_hidden=bool(include_hidden),
@@ -984,24 +984,32 @@ async def list_chat_sessions(
preview_mode = str(preview or "").strip().lower() preview_mode = str(preview or "").strip().lower()
if preview_mode not in {"latest", "index", "session", "db", "none"}: if preview_mode not in {"latest", "index", "session", "db", "none"}:
preview_mode = "latest" preview_mode = "latest"
if preview_mode == "index":
preview_mode = "latest"
latest_previews: dict[str, str] = {} last_previews: dict[str, str] = {}
index_status: Optional[dict[str, Any]] = None if preview_mode == "latest":
if preview_mode in {"latest", "index", "db"}:
try: try:
index_status = get_session_preview_index_status(account_dir) last_previews = load_session_last_messages(account_dir, usernames)
latest_previews = load_session_previews(account_dir, usernames) # Backward-compatible: old decrypted accounts may not have built the cache table yet.
if (not last_previews) and usernames:
build_session_last_message_table(
account_dir,
rebuild=False,
include_hidden=True,
include_official=True,
)
last_previews = load_session_last_messages(account_dir, usernames)
except Exception: except Exception:
index_status = None last_previews = {}
latest_previews = {}
if preview_mode in {"latest", "db"}: if preview_mode in {"latest", "db"}:
missing = [u for u in usernames if u and (u not in latest_previews)] targets = usernames if preview_mode == "db" else [u for u in usernames if u and (u not in last_previews)]
if missing: if targets:
legacy = _load_latest_message_previews(account_dir, missing) legacy = _load_latest_message_previews(account_dir, targets)
for u, v in legacy.items(): for u, v in legacy.items():
if v: if v:
latest_previews[u] = v last_previews[u] = v
sessions: list[dict[str, Any]] = [] sessions: list[dict[str, Any]] = []
for r in filtered: for r in filtered:
@@ -1014,12 +1022,28 @@ async def list_chat_sessions(
avatar_url = base_url + _build_avatar_url(account_dir.name, username) avatar_url = base_url + _build_avatar_url(account_dir.name, username)
last_message = "" last_message = ""
if preview_mode == "session":
draft_text = _decode_sqlite_text(r["draft"]).strip() draft_text = _decode_sqlite_text(r["draft"]).strip()
if draft_text: if draft_text:
draft_text = re.sub(r"\s+", " ", draft_text).strip() draft_text = re.sub(r"\s+", " ", draft_text).strip()
last_message = f"[草稿] {draft_text}" if draft_text else "[草稿]" last_message = f"[草稿] {draft_text}" if draft_text else "[草稿]"
elif preview_mode in {"latest", "db", "index"} and str(latest_previews.get(username) or "").strip(): else:
last_message = str(latest_previews.get(username) or "").strip() summary_text = _decode_sqlite_text(r["summary"]).strip()
summary_text = re.sub(r"\s+", " ", summary_text).strip()
if summary_text:
last_message = summary_text
else:
last_message = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"])
elif preview_mode in {"latest", "db"}:
if str(last_previews.get(username) or "").strip():
last_message = str(last_previews.get(username) or "").strip()
elif preview_mode != "none":
summary_text = _decode_sqlite_text(r["summary"]).strip()
summary_text = re.sub(r"\s+", " ", summary_text).strip()
if summary_text:
last_message = summary_text
else:
last_message = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"])
elif preview_mode != "none": elif preview_mode != "none":
summary_text = _decode_sqlite_text(r["summary"]).strip() summary_text = _decode_sqlite_text(r["summary"]).strip()
summary_text = re.sub(r"\s+", " ", summary_text).strip() summary_text = re.sub(r"\s+", " ", summary_text).strip()
@@ -1048,7 +1072,6 @@ async def list_chat_sessions(
"account": account_dir.name, "account": account_dir.name,
"total": len(sessions), "total": len(sessions),
"sessions": sessions, "sessions": sessions,
"previewIndex": index_status.get("index") if isinstance(index_status, dict) else None,
} }

View File

@@ -0,0 +1,504 @@
from __future__ import annotations
import hashlib
import re
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional
from .chat_helpers import (
_build_latest_message_preview,
_decode_message_content,
_decode_sqlite_text,
_infer_last_message_brief,
_is_mostly_printable_text,
_iter_message_db_paths,
_quote_ident,
_should_keep_session,
)
from .logging_config import get_logger
logger = get_logger(__name__)
_TABLE_NAME = "session_last_message"
_TABLE_NAME_RE = re.compile(r"^(msg_|chat_)([0-9a-f]{32})", re.IGNORECASE)
_PREVIEW_MAX_LEN = 400
def _session_db_path(account_dir: Path) -> Path:
return Path(account_dir) / "session.db"
def _row_get(row: sqlite3.Row, key: str) -> Any:
try:
return row[key]
except Exception:
return None
def _normalize_preview(text: str) -> str:
s = str(text or "").strip()
if not s:
return ""
s = re.sub(r"\s+", " ", s).strip()
if len(s) > _PREVIEW_MAX_LEN:
return s[:_PREVIEW_MAX_LEN]
return s
def _ensure_table(conn: sqlite3.Connection) -> None:
conn.execute(
f"""
CREATE TABLE IF NOT EXISTS {_TABLE_NAME} (
username TEXT PRIMARY KEY,
sort_seq INTEGER NOT NULL DEFAULT 0,
local_id INTEGER NOT NULL DEFAULT 0,
create_time INTEGER NOT NULL DEFAULT 0,
local_type INTEGER NOT NULL DEFAULT 0,
sender_username TEXT NOT NULL DEFAULT '',
preview TEXT NOT NULL DEFAULT '',
db_stem TEXT NOT NULL DEFAULT '',
table_name TEXT NOT NULL DEFAULT '',
built_at INTEGER NOT NULL DEFAULT 0
)
"""
)
def get_session_last_message_status(account_dir: Path) -> dict[str, Any]:
account_dir = Path(account_dir)
session_db_path = _session_db_path(account_dir)
if not session_db_path.exists():
return {
"status": "error",
"account": account_dir.name,
"message": "session.db not found.",
}
conn = sqlite3.connect(str(session_db_path))
try:
row = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower(?) LIMIT 1",
(_TABLE_NAME,),
).fetchone()
exists = bool(row and row[0])
if not exists:
return {
"status": "success",
"account": account_dir.name,
"table": {
"name": _TABLE_NAME,
"exists": False,
"rowCount": 0,
"builtAt": None,
},
}
count = int(conn.execute(f"SELECT COUNT(1) FROM {_TABLE_NAME}").fetchone()[0] or 0)
built_at = conn.execute(f"SELECT MAX(built_at) FROM {_TABLE_NAME}").fetchone()[0]
try:
built_at_int: Optional[int] = int(built_at) if built_at is not None else None
except Exception:
built_at_int = None
return {
"status": "success",
"account": account_dir.name,
"table": {
"name": _TABLE_NAME,
"exists": True,
"rowCount": count,
"builtAt": built_at_int,
},
}
finally:
conn.close()
def load_session_last_messages(account_dir: Path, usernames: list[str]) -> dict[str, str]:
if not usernames:
return {}
account_dir = Path(account_dir)
session_db_path = _session_db_path(account_dir)
if not session_db_path.exists():
return {}
uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
if not uniq:
return {}
out: dict[str, str] = {}
conn = sqlite3.connect(str(session_db_path))
conn.row_factory = sqlite3.Row
try:
chunk_size = 900
for i in range(0, len(uniq), chunk_size):
chunk = uniq[i : i + chunk_size]
placeholders = ",".join(["?"] * len(chunk))
rows = conn.execute(
f"SELECT username, preview FROM {_TABLE_NAME} WHERE username IN ({placeholders})",
chunk,
).fetchall()
for r in rows:
u = str(r["username"] or "").strip()
if not u:
continue
out[u] = str(r["preview"] or "")
return out
except Exception:
return {}
finally:
conn.close()
def build_session_last_message_table(
account_dir: Path,
*,
rebuild: bool = False,
include_hidden: bool = True,
include_official: bool = True,
) -> dict[str, Any]:
"""
Build a per-account cache table `{account}/session.db::{session_last_message}`.
The UI session list needs "last message preview" per conversation; querying message_*.db on every refresh is slow.
This shifts that work to decrypt-time (or one-time manual rebuild).
"""
account_dir = Path(account_dir)
session_db_path = _session_db_path(account_dir)
if not session_db_path.exists():
return {
"status": "error",
"account": account_dir.name,
"message": "session.db not found.",
}
db_paths = _iter_message_db_paths(account_dir)
if not db_paths:
return {
"status": "error",
"account": account_dir.name,
"message": "No message databases found.",
}
started = time.time()
logger.info(f"[session_last_message] build start account={account_dir.name} dbs={len(db_paths)}")
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
try:
try:
srows = sconn.execute(
"""
SELECT username, is_hidden, summary, draft, last_msg_type, last_msg_sub_type, sort_timestamp, last_timestamp
FROM SessionTable
ORDER BY sort_timestamp DESC
"""
).fetchall()
except sqlite3.OperationalError:
srows = sconn.execute(
"""
SELECT username, is_hidden, summary, draft, sort_timestamp, last_timestamp
FROM SessionTable
ORDER BY sort_timestamp DESC
"""
).fetchall()
finally:
sconn.close()
sessions: list[sqlite3.Row] = []
usernames: list[str] = []
expected_ts_by_user: dict[str, int] = {}
for r in srows:
u = str(_row_get(r, "username") or "").strip()
if not u:
continue
if not include_hidden and int(_row_get(r, "is_hidden") or 0) == 1:
continue
if not _should_keep_session(u, include_official=bool(include_official)):
continue
sessions.append(r)
usernames.append(u)
ts = int(_row_get(r, "sort_timestamp") or 0)
if ts <= 0:
ts = int(_row_get(r, "last_timestamp") or 0)
expected_ts_by_user[u] = int(ts or 0)
if not usernames:
return {
"status": "success",
"account": account_dir.name,
"message": "No sessions to build.",
"built": 0,
"durationSec": 0.0,
}
md5_to_users: dict[str, list[str]] = {}
for u in usernames:
h = hashlib.md5(u.encode("utf-8")).hexdigest()
md5_to_users.setdefault(h, []).append(u)
best: dict[str, tuple[tuple[int, int, int], dict[str, Any]]] = {}
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
try:
trows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
md5_to_table: dict[str, str] = {}
for tr in trows:
if not tr or tr[0] is None:
continue
name = _decode_sqlite_text(tr[0]).strip()
if not name:
continue
m = _TABLE_NAME_RE.match(name.lower())
if not m:
continue
md5_hex = str(m.group(2) or "").lower()
if md5_hex not in md5_to_users:
continue
prefix = str(m.group(1) or "").lower()
if md5_hex not in md5_to_table or prefix == "msg_":
md5_to_table[md5_hex] = name
if not md5_to_table:
continue
for md5_hex, table_name in md5_to_table.items():
users = md5_to_users.get(md5_hex) or []
if not users:
continue
quoted = _quote_ident(table_name)
row = None
try:
row = conn.execute(
"SELECT "
"m.local_id, m.local_type, m.sort_seq, m.create_time, "
"m.message_content, m.compress_content, n.user_name AS sender_username "
f"FROM {quoted} m "
"LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid "
"ORDER BY m.sort_seq DESC, m.local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
try:
row = conn.execute(
"SELECT "
"local_id, local_type, sort_seq, create_time, "
"message_content, compress_content, '' AS sender_username "
f"FROM {quoted} "
"ORDER BY sort_seq DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
row = None
if row is None:
continue
try:
sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0
except Exception:
sort_seq = 0
try:
local_id = int(row["local_id"] or 0)
except Exception:
local_id = 0
try:
create_time = int(row["create_time"] or 0)
except Exception:
create_time = 0
# If session.db indicates a newer timestamp, fall back to slower but correct ordering.
need_slow = False
for username in users:
expected_ts = int(expected_ts_by_user.get(username) or 0)
if expected_ts > 0 and int(create_time or 0) > 0 and int(create_time or 0) < expected_ts:
need_slow = True
break
if need_slow:
try:
row2 = conn.execute(
"SELECT "
"m.local_id, m.local_type, m.sort_seq, m.create_time, "
"m.message_content, m.compress_content, n.user_name AS sender_username "
f"FROM {quoted} m "
"LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid "
"ORDER BY COALESCE(m.create_time, 0) DESC, COALESCE(m.sort_seq, 0) DESC, m.local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
try:
row2 = conn.execute(
"SELECT "
"local_id, local_type, sort_seq, create_time, "
"message_content, compress_content, '' AS sender_username "
f"FROM {quoted} "
"ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
row2 = None
if row2 is not None:
row = row2
try:
sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0
except Exception:
sort_seq = 0
try:
local_id = int(row["local_id"] or 0)
except Exception:
local_id = 0
try:
create_time = int(row["create_time"] or 0)
except Exception:
create_time = 0
sort_key = (int(create_time), int(sort_seq), int(local_id))
raw_text = _decode_message_content(row["compress_content"], row["message_content"]).strip()
if raw_text and (not raw_text.lstrip().startswith("<")) and (not raw_text.lstrip().startswith('"<')):
if not _is_mostly_printable_text(raw_text):
raw_text = ""
sender_username = _decode_sqlite_text(row["sender_username"]).strip()
for username in users:
prev = best.get(username)
if prev is not None and sort_key <= prev[0]:
continue
is_group = bool(username.endswith("@chatroom"))
try:
preview = _build_latest_message_preview(
username=username,
local_type=int(row["local_type"] or 0),
raw_text=raw_text,
is_group=is_group,
sender_username=sender_username,
)
except Exception:
preview = ""
if preview and (not _is_mostly_printable_text(preview)):
try:
preview = _build_latest_message_preview(
username=username,
local_type=int(row["local_type"] or 0),
raw_text="",
is_group=is_group,
sender_username=sender_username,
)
except Exception:
preview = ""
preview = _normalize_preview(preview)
if not preview:
continue
best[username] = (
sort_key,
{
"username": username,
"sort_seq": int(sort_seq),
"local_id": int(local_id),
"create_time": int(create_time),
"local_type": int(row["local_type"] or 0),
"sender_username": sender_username,
"preview": preview,
"db_stem": str(db_path.stem),
"table_name": str(table_name),
},
)
finally:
try:
conn.close()
except Exception:
pass
# Fallback: always have a non-empty preview for UI.
for r in sessions:
u = str(_row_get(r, "username") or "").strip()
if not u:
continue
if u in best:
continue
draft_text = _normalize_preview(_decode_sqlite_text(_row_get(r, "draft")).strip())
if draft_text:
preview = f"[草稿] {draft_text}" if draft_text else "[草稿]"
else:
summary_text = _normalize_preview(_decode_sqlite_text(_row_get(r, "summary")).strip())
if summary_text:
preview = summary_text
else:
preview = _infer_last_message_brief(_row_get(r, "last_msg_type"), _row_get(r, "last_msg_sub_type"))
preview = _normalize_preview(preview)
best[u] = (
(0, 0, 0),
{
"username": u,
"sort_seq": 0,
"local_id": 0,
"create_time": 0,
"local_type": 0,
"sender_username": "",
"preview": preview,
"db_stem": "",
"table_name": "",
},
)
built_at = int(time.time())
conn_out = sqlite3.connect(str(session_db_path))
try:
_ensure_table(conn_out)
if rebuild:
try:
conn_out.execute(f"DELETE FROM {_TABLE_NAME}")
except Exception:
pass
rows_to_insert: list[tuple[Any, ...]] = []
for _, rec in best.values():
rows_to_insert.append(
(
rec["username"],
int(rec["sort_seq"] or 0),
int(rec["local_id"] or 0),
int(rec["create_time"] or 0),
int(rec["local_type"] or 0),
str(rec["sender_username"] or ""),
str(rec["preview"] or ""),
str(rec["db_stem"] or ""),
str(rec["table_name"] or ""),
int(built_at),
)
)
conn_out.executemany(
f"INSERT OR REPLACE INTO {_TABLE_NAME}("
"username, sort_seq, local_id, create_time, local_type, sender_username, preview, db_stem, table_name, built_at"
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows_to_insert,
)
conn_out.commit()
finally:
conn_out.close()
duration = max(0.0, time.time() - started)
logger.info(
f"[session_last_message] build done account={account_dir.name} sessions={len(best)} "
f"durationSec={round(duration, 3)} table={_TABLE_NAME}"
)
return {
"status": "success",
"account": account_dir.name,
"built": len(best),
"table": _TABLE_NAME,
"durationSec": round(duration, 3),
}

View File

@@ -1,574 +0,0 @@
from __future__ import annotations
import hashlib
import json
import os
import re
import sqlite3
import time
from pathlib import Path
from typing import Any, Optional
from .chat_helpers import (
_build_latest_message_preview,
_decode_message_content,
_decode_sqlite_text,
_infer_last_message_brief,
_is_mostly_printable_text,
_iter_message_db_paths,
_quote_ident,
_should_keep_session,
)
from .logging_config import get_logger
logger = get_logger(__name__)
_SCHEMA_VERSION = 1
_INDEX_DB_NAME = "session_preview.db"
_INDEX_DB_TMP_NAME = "session_preview.tmp.db"
_TABLE_NAME_RE = re.compile(r"^(msg_|chat_)([0-9a-f]{32})", re.IGNORECASE)
def get_session_preview_index_db_path(account_dir: Path) -> Path:
return account_dir / _INDEX_DB_NAME
def _index_db_tmp_path(account_dir: Path) -> Path:
return account_dir / _INDEX_DB_TMP_NAME
def _file_sig(path: Path) -> tuple[str, int, int]:
st = path.stat()
mtime_ns = getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))
return (path.name, int(st.st_size), int(mtime_ns))
def _compute_source_fingerprint(account_dir: Path) -> dict[str, Any]:
"""Compute a stable fingerprint for the current decrypted data set."""
session_db_path = account_dir / "session.db"
msg_paths = _iter_message_db_paths(account_dir)
items: list[tuple[str, int, int]] = []
try:
if session_db_path.exists():
items.append(_file_sig(session_db_path))
except Exception:
pass
for p in msg_paths:
try:
if p.exists():
items.append(_file_sig(p))
except Exception:
continue
items.sort()
payload = json.dumps(items, ensure_ascii=False, separators=(",", ":")).encode("utf-8", errors="ignore")
return {
"fingerprint": hashlib.sha256(payload).hexdigest(),
"files": items,
"dbCount": len(msg_paths),
}
def _inspect_index(index_path: Path) -> dict[str, Any]:
if not index_path.exists():
return {
"exists": False,
"ready": False,
"schemaVersion": None,
"hasMetaTable": False,
"hasPreviewTable": False,
}
conn = sqlite3.connect(str(index_path))
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = {str(r[0]).lower() for r in rows if r and r[0]}
has_meta = "meta" in names
has_preview = "session_preview" in names
schema_version: Optional[int] = None
if has_meta:
try:
r = conn.execute("SELECT value FROM meta WHERE key='schema_version' LIMIT 1").fetchone()
if r and r[0] is not None:
schema_version = int(str(r[0]).strip() or "0")
except Exception:
schema_version = None
ready = bool(has_preview and (schema_version is None or schema_version >= _SCHEMA_VERSION))
return {
"exists": True,
"ready": ready,
"schemaVersion": schema_version,
"hasMetaTable": bool(has_meta),
"hasPreviewTable": bool(has_preview),
}
except Exception:
return {
"exists": True,
"ready": False,
"schemaVersion": None,
"hasMetaTable": False,
"hasPreviewTable": False,
}
finally:
conn.close()
def get_session_preview_index_status(account_dir: Path) -> dict[str, Any]:
index_path = get_session_preview_index_db_path(account_dir)
inspect = _inspect_index(index_path)
meta: dict[str, str] = {}
current: dict[str, Any] = {}
stale = False
if bool(inspect.get("ready")):
conn = sqlite3.connect(str(index_path))
try:
rows = conn.execute("SELECT key, value FROM meta").fetchall()
for k, v in rows:
if k is None:
continue
meta[str(k)] = "" if v is None else str(v)
except Exception:
meta = {}
finally:
conn.close()
current = _compute_source_fingerprint(account_dir)
expected = str(meta.get("source_fingerprint") or "").strip()
actual = str(current.get("fingerprint") or "").strip()
if expected and actual and expected != actual:
stale = True
return {
"status": "success",
"account": account_dir.name,
"index": {
"path": str(index_path),
"exists": bool(inspect.get("exists")),
"ready": bool(inspect.get("ready")),
"stale": bool(stale),
"needsRebuild": (not bool(inspect.get("ready"))) or bool(stale),
"schemaVersion": inspect.get("schemaVersion"),
"meta": meta,
"current": current,
},
}
def load_session_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
status = get_session_preview_index_status(account_dir)
index = dict(status.get("index") or {})
if not bool(index.get("ready")):
return {}
if bool(index.get("stale")):
return {}
index_path = get_session_preview_index_db_path(account_dir)
uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
if not uniq:
return {}
out: dict[str, str] = {}
conn = sqlite3.connect(str(index_path))
conn.row_factory = sqlite3.Row
try:
chunk_size = 900 # sqlite 默认变量上限常见为 999
for i in range(0, len(uniq), chunk_size):
chunk = uniq[i : i + chunk_size]
placeholders = ",".join(["?"] * len(chunk))
rows = conn.execute(
f"SELECT username, preview FROM session_preview WHERE username IN ({placeholders})",
chunk,
).fetchall()
for r in rows:
u = str(r["username"] or "").strip()
if not u:
continue
out[u] = str(r["preview"] or "")
return out
finally:
conn.close()
def _init_index_db(conn: sqlite3.Connection) -> None:
conn.execute("PRAGMA journal_mode=DELETE")
conn.execute("PRAGMA synchronous=OFF")
conn.execute("PRAGMA temp_store=MEMORY")
conn.execute("CREATE TABLE IF NOT EXISTS meta (key TEXT PRIMARY KEY, value TEXT NOT NULL)")
conn.execute(
"""
CREATE TABLE IF NOT EXISTS session_preview (
username TEXT PRIMARY KEY,
sort_seq INTEGER NOT NULL DEFAULT 0,
local_id INTEGER NOT NULL DEFAULT 0,
create_time INTEGER NOT NULL DEFAULT 0,
local_type INTEGER NOT NULL DEFAULT 0,
sender_username TEXT NOT NULL DEFAULT '',
preview TEXT NOT NULL DEFAULT '',
db_stem TEXT NOT NULL DEFAULT '',
table_name TEXT NOT NULL DEFAULT ''
)
"""
)
conn.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("schema_version", str(_SCHEMA_VERSION)),
)
def build_session_preview_index(
account_dir: Path,
*,
rebuild: bool = False,
include_hidden: bool = True,
include_official: bool = True,
) -> dict[str, Any]:
"""
Build a per-account session preview index as `{account}/session_preview.db`.
Why: `message_*.db` tables do NOT index `create_time`, so `ORDER BY create_time DESC LIMIT 1`
is extremely slow when done per-session at runtime. This index shifts that work to a one-time build.
"""
account_dir = Path(account_dir)
session_db_path = account_dir / "session.db"
if not session_db_path.exists():
return {
"status": "error",
"account": account_dir.name,
"message": "session.db not found.",
}
db_paths = _iter_message_db_paths(account_dir)
if not db_paths:
return {
"status": "error",
"account": account_dir.name,
"message": "No message databases found.",
}
started = time.time()
logger.info(f"[session_preview] build start account={account_dir.name} dbs={len(db_paths)}")
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
try:
srows = sconn.execute(
"""
SELECT username, is_hidden, summary, draft, last_msg_type, last_msg_sub_type, sort_timestamp, last_timestamp
FROM SessionTable
ORDER BY sort_timestamp DESC
"""
).fetchall()
finally:
sconn.close()
sessions: list[sqlite3.Row] = []
usernames: list[str] = []
for r in srows:
u = str(r["username"] or "").strip()
if not u:
continue
if not include_hidden and int(r["is_hidden"] or 0) == 1:
continue
if not _should_keep_session(u, include_official=bool(include_official)):
continue
sessions.append(r)
usernames.append(u)
if not usernames:
return {
"status": "success",
"account": account_dir.name,
"message": "No sessions to index.",
"indexed": 0,
"path": str(get_session_preview_index_db_path(account_dir)),
}
md5_to_users: dict[str, list[str]] = {}
for u in usernames:
h = hashlib.md5(u.encode("utf-8")).hexdigest()
md5_to_users.setdefault(h, []).append(u)
best: dict[str, tuple[tuple[int, int, int], dict[str, Any]]] = {}
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
try:
trows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
md5_to_table: dict[str, str] = {}
for tr in trows:
if not tr or tr[0] is None:
continue
name = _decode_sqlite_text(tr[0]).strip()
if not name:
continue
m = _TABLE_NAME_RE.match(name.lower())
if not m:
continue
md5_hex = str(m.group(2) or "").lower()
if md5_hex not in md5_to_users:
continue
prefix = str(m.group(1) or "").lower()
if md5_hex not in md5_to_table or prefix == "msg_":
md5_to_table[md5_hex] = name
if not md5_to_table:
continue
for md5_hex, table_name in md5_to_table.items():
users = md5_to_users.get(md5_hex) or []
if not users:
continue
quoted = _quote_ident(table_name)
row = None
try:
row = conn.execute(
"SELECT "
"m.local_id, m.local_type, m.sort_seq, m.create_time, "
"m.message_content, m.compress_content, n.user_name AS sender_username "
f"FROM {quoted} m "
"LEFT JOIN Name2Id n ON m.real_sender_id = n.rowid "
"ORDER BY m.sort_seq DESC, m.local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
try:
row = conn.execute(
"SELECT "
"local_id, local_type, sort_seq, create_time, "
"message_content, compress_content, '' AS sender_username "
f"FROM {quoted} "
"ORDER BY sort_seq DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except Exception:
row = None
if row is None:
continue
try:
sort_seq = int(row["sort_seq"] or 0) if row["sort_seq"] is not None else 0
except Exception:
sort_seq = 0
try:
local_id = int(row["local_id"] or 0)
except Exception:
local_id = 0
try:
create_time = int(row["create_time"] or 0)
except Exception:
create_time = 0
sort_key = (int(sort_seq), int(local_id), int(create_time))
raw_text = _decode_message_content(row["compress_content"], row["message_content"]).strip()
if raw_text and (not raw_text.lstrip().startswith("<")) and (not raw_text.lstrip().startswith('"<')):
# Avoid leaking unreadable compressed/binary payloads into UI.
if not _is_mostly_printable_text(raw_text):
raw_text = ""
sender_username = _decode_sqlite_text(row["sender_username"]).strip()
for username in users:
prev = best.get(username)
if prev is not None and sort_key <= prev[0]:
continue
is_group = bool(username.endswith("@chatroom"))
try:
preview = _build_latest_message_preview(
username=username,
local_type=int(row["local_type"] or 0),
raw_text=raw_text,
is_group=is_group,
sender_username=sender_username,
)
except Exception:
preview = ""
if preview and (not _is_mostly_printable_text(preview)):
try:
preview = _build_latest_message_preview(
username=username,
local_type=int(row["local_type"] or 0),
raw_text="",
is_group=is_group,
sender_username=sender_username,
)
except Exception:
preview = ""
if not preview:
continue
best[username] = (
sort_key,
{
"username": username,
"sort_seq": int(sort_seq),
"local_id": int(local_id),
"create_time": int(create_time),
"local_type": int(row["local_type"] or 0),
"sender_username": sender_username,
"preview": preview,
"db_stem": str(db_path.stem),
"table_name": str(table_name),
},
)
finally:
try:
conn.close()
except Exception:
pass
# Fallback: ensure we always have a non-empty lastMessage for UI (even if message tables missing).
for r in sessions:
u = str(r["username"] or "").strip()
if not u:
continue
if u in best:
continue
draft_text = _decode_sqlite_text(r["draft"]).strip()
if draft_text:
draft_text = re.sub(r"\s+", " ", draft_text).strip()
preview = f"[草稿] {draft_text}" if draft_text else "[草稿]"
else:
summary_text = _decode_sqlite_text(r["summary"]).strip()
summary_text = re.sub(r"\s+", " ", summary_text).strip()
if summary_text:
preview = summary_text
else:
preview = _infer_last_message_brief(r["last_msg_type"], r["last_msg_sub_type"])
best[u] = (
(0, 0, 0),
{
"username": u,
"sort_seq": 0,
"local_id": 0,
"create_time": 0,
"local_type": 0,
"sender_username": "",
"preview": str(preview or ""),
"db_stem": "",
"table_name": "",
},
)
final_path = get_session_preview_index_db_path(account_dir)
tmp_path = _index_db_tmp_path(account_dir)
try:
if tmp_path.exists():
tmp_path.unlink()
except Exception:
pass
try:
conn_out = sqlite3.connect(str(tmp_path))
try:
_init_index_db(conn_out)
try:
conn_out.commit()
except Exception:
pass
conn_out.execute("BEGIN")
rows_to_insert: list[tuple[Any, ...]] = []
for _, rec in best.values():
rows_to_insert.append(
(
rec["username"],
int(rec["sort_seq"] or 0),
int(rec["local_id"] or 0),
int(rec["create_time"] or 0),
int(rec["local_type"] or 0),
str(rec["sender_username"] or ""),
str(rec["preview"] or ""),
str(rec["db_stem"] or ""),
str(rec["table_name"] or ""),
)
)
conn_out.executemany(
"INSERT OR REPLACE INTO session_preview("
"username, sort_seq, local_id, create_time, local_type, sender_username, preview, db_stem, table_name"
") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
rows_to_insert,
)
conn_out.commit()
built_at = int(time.time())
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("built_at", str(built_at)),
)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("session_count", str(len(best))),
)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("db_count", str(len(db_paths))),
)
src = _compute_source_fingerprint(account_dir)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("source_fingerprint", str(src.get("fingerprint") or "")),
)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("source_files", json.dumps(src.get("files") or [], ensure_ascii=False)),
)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("built_include_hidden", "1" if include_hidden else "0"),
)
conn_out.execute(
"INSERT INTO meta(key, value) VALUES(?, ?) "
"ON CONFLICT(key) DO UPDATE SET value = excluded.value",
("built_include_official", "1" if include_official else "0"),
)
conn_out.commit()
finally:
conn_out.close()
os.replace(str(tmp_path), str(final_path))
except Exception as e:
logger.exception(f"[session_preview] build failed: {e}")
try:
if tmp_path.exists():
tmp_path.unlink()
except Exception:
pass
return {
"status": "error",
"account": account_dir.name,
"message": str(e),
}
duration = max(0.0, time.time() - started)
logger.info(
f"[session_preview] build done account={account_dir.name} indexed={len(best)} "
f"durationSec={round(duration, 3)} path={final_path}"
)
return {
"status": "success",
"account": account_dir.name,
"indexed": len(best),
"path": str(final_path),
"durationSec": round(duration, 3),
}

View File

@@ -441,20 +441,20 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed_files": account_failed "failed_files": account_failed
} }
# 构建“会话最后一条消息”索引:把耗时挪到解密阶段,一劳永逸 # 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_PREVIEW", "1") != "0": if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
try: try:
from .session_preview_index import build_session_preview_index from .session_last_message import build_session_last_message_table
account_results[account_name]["session_preview_index"] = build_session_preview_index( account_results[account_name]["session_last_message"] = build_session_last_message_table(
account_output_dir, account_output_dir,
rebuild=True, rebuild=True,
include_hidden=True, include_hidden=True,
include_official=True, include_official=True,
) )
except Exception as e: except Exception as e:
logger.warning(f"构建会话预览索引失败: {account_name}: {e}") logger.warning(f"构建会话最后一条消息缓存表失败: {account_name}: {e}")
account_results[account_name]["session_preview_index"] = { account_results[account_name]["session_last_message"] = {
"status": "error", "status": "error",
"message": str(e), "message": str(e),
} }