diff --git a/src/wechat_decrypt_tool/routers/chat.py b/src/wechat_decrypt_tool/routers/chat.py index 89b7dc9..e165d0a 100644 --- a/src/wechat_decrypt_tool/routers/chat.py +++ b/src/wechat_decrypt_tool/routers/chat.py @@ -186,6 +186,178 @@ def _sql_literal(value: Any) -> str: return "'" + s.replace("'", "''") + "'" +def _pick_case_insensitive_value(item: Any, *keys: str) -> Any: + if not isinstance(item, dict): + return None + for key in keys: + if key in item and item[key] is not None: + return item[key] + key_lc = str(key or "").strip().lower() + for actual_key, actual_value in item.items(): + if str(actual_key or "").strip().lower() == key_lc and actual_value is not None: + return actual_value + return None + + +def _table_exists_case_insensitive(conn: sqlite3.Connection, table_name: str) -> bool: + try: + row = conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower(?) LIMIT 1", + (str(table_name or "").strip(),), + ).fetchone() + return bool(row) + except Exception: + return False + + +def _ensure_output_name2id_table(conn: sqlite3.Connection) -> bool: + if _table_exists_case_insensitive(conn, "Name2Id"): + return True + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS Name2Id ( + user_name TEXT, + is_session INTEGER DEFAULT 1 + ) + """ + ) + conn.commit() + return True + except Exception: + return False + + +def _best_effort_upsert_output_name2id_rows( + conn: sqlite3.Connection, + *, + account_name: str, + rows: list[dict[str, Any]], +) -> bool: + if not rows: + return _table_exists_case_insensitive(conn, "Name2Id") + if not _ensure_output_name2id_table(conn): + return False + try: + conn.execute( + "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)", + (str(account_name or "").strip(), 1), + ) + except Exception: + pass + + wrote = False + for row in rows: + try: + rid = int(row.get("real_sender_id") or 0) + except Exception: + rid = 0 + username = str(row.get("sender_username") or "").strip() + if rid <= 0 or not username: + continue + try: + conn.execute( + "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)", + (rid, username, 1), + ) + wrote = True + except Exception: + continue + + if wrote: + try: + conn.commit() + except Exception: + return False + return True + + +def _sync_output_name2id_from_live( + conn: sqlite3.Connection, + *, + rt_conn: Any, + msg_db_path_real: Path, +) -> dict[str, Any]: + if not _ensure_output_name2id_table(conn): + return {"status": "missing_local_table", "rows": 0} + + local_row = conn.execute("SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id").fetchone() + try: + local_count = int((local_row["c"] if isinstance(local_row, sqlite3.Row) else local_row[0]) or 0) + except Exception: + local_count = 0 + try: + local_max = int((local_row["mx"] if isinstance(local_row, sqlite3.Row) else local_row[1]) or 0) + except Exception: + local_max = 0 + + sql_stats = "SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id" + with rt_conn.lock: + live_stats_rows = _wcdb_exec_query(rt_conn.handle, kind="message", path=str(msg_db_path_real), sql=sql_stats) + + live_stats = live_stats_rows[0] if live_stats_rows and isinstance(live_stats_rows[0], dict) else {} + try: + live_count = int(_pick_case_insensitive_value(live_stats, "c", "count") or 0) + except Exception: + live_count = 0 + try: + live_max = int(_pick_case_insensitive_value(live_stats, "mx", "max_rowid", "max") or 0) + except Exception: + live_max = 0 + + if local_count == live_count and local_max == live_max: + return { + "status": "up_to_date", + "rows": int(local_count), + "localCount": int(local_count), + "liveCount": int(live_count), + "localMax": int(local_max), + "liveMax": int(live_max), + } + + sql_rows = "SELECT rowid AS rowid, user_name AS user_name, COALESCE(is_session, 1) AS is_session FROM Name2Id ORDER BY rowid ASC" + with rt_conn.lock: + live_rows = _wcdb_exec_query(rt_conn.handle, kind="message", path=str(msg_db_path_real), sql=sql_rows) + + values: list[tuple[int, str, int]] = [] + seen_rowids: set[int] = set() + for item in live_rows: + if not isinstance(item, dict): + continue + try: + rid = int(_pick_case_insensitive_value(item, "rowid") or 0) + except Exception: + rid = 0 + username = str(_pick_case_insensitive_value(item, "user_name", "username") or "").strip() + try: + is_session = int(_pick_case_insensitive_value(item, "is_session") or 0) + except Exception: + is_session = 0 + if rid <= 0 or not username or rid in seen_rowids: + continue + seen_rowids.add(rid) + values.append((rid, username, is_session)) + + if live_count > 0 and not values: + raise ValueError("Live Name2Id rows could not be decoded.") + + conn.execute("DELETE FROM Name2Id") + if values: + conn.executemany( + "INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)", + values, + ) + conn.commit() + return { + "status": "refreshed", + "rows": int(len(values)), + "localCount": int(local_count), + "liveCount": int(live_count), + "localMax": int(local_max), + "liveMax": int(live_max), + } + + def _normalize_edit_value(col: str, value: Any, *, from_snapshot: bool = False) -> Any: c = str(col or "").strip().lower() if value is None: @@ -1271,6 +1443,7 @@ def sync_chat_realtime_messages( # Some sessions may not exist in the decrypted snapshot yet; create the missing Msg_ table # so we can insert the realtime rows and make `/api/chat/messages` work after switching off realtime. msg_db_path, table_name = _ensure_decrypted_message_table(account_dir, username) + msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem) logger.info( "[%s] resolved decrypted table account=%s username=%s db=%s table=%s", trace_id, @@ -1283,6 +1456,34 @@ def sync_chat_realtime_messages( msg_conn = sqlite3.connect(str(msg_db_path)) msg_conn.row_factory = sqlite3.Row try: + name2id_synced = False + try: + sync_t0 = time.perf_counter() + name2id_result = _sync_output_name2id_from_live( + msg_conn, + rt_conn=rt_conn, + msg_db_path_real=msg_db_path_real, + ) + sync_ms = (time.perf_counter() - sync_t0) * 1000.0 + name2id_synced = str(name2id_result.get("status") or "") in {"up_to_date", "refreshed"} + logger.info( + "[%s] Name2Id sync account=%s db=%s status=%s rows=%s ms=%.1f", + trace_id, + account_dir.name, + msg_db_path.stem, + str(name2id_result.get("status") or ""), + int(name2id_result.get("rows") or 0), + sync_ms, + ) + except Exception as e: + logger.warning( + "[%s] Name2Id sync failed account=%s db=%s error=%s", + trace_id, + account_dir.name, + msg_db_path.stem, + str(e), + ) + quoted_table = _quote_ident(table_name) row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone() try: @@ -1425,42 +1626,12 @@ def sync_chat_realtime_messages( inserted = 0 backfilled = 0 - if new_rows: - # Best-effort: keep Name2Id updated so decrypted queries can resolve sender usernames. - # Rowid mapping is important (message.real_sender_id joins Name2Id.rowid). - try: - has_name2id = bool( - msg_conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1" - ).fetchone() - ) - except Exception: - has_name2id = False - - if has_name2id: - try: - msg_conn.execute( - "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)", - (str(account_dir.name), 1), - ) - except Exception: - pass - - for r in new_rows: - try: - rid = int(r.get("real_sender_id") or 0) - except Exception: - rid = 0 - su = str(r.get("sender_username") or "").strip() - if rid <= 0 or not su: - continue - try: - msg_conn.execute( - "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)", - (rid, su, 1), - ) - except Exception: - continue + if new_rows and (not name2id_synced): + _best_effort_upsert_output_name2id_rows( + msg_conn, + account_name=account_dir.name, + rows=new_rows, + ) # Insert older -> newer to keep sqlite btree locality similar to existing data. values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)] @@ -1658,6 +1829,30 @@ def _sync_chat_realtime_messages_for_table( msg_conn = sqlite3.connect(str(msg_db_path)) msg_conn.row_factory = sqlite3.Row try: + msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem) + name2id_synced = False + try: + name2id_result = _sync_output_name2id_from_live( + msg_conn, + rt_conn=rt_conn, + msg_db_path_real=msg_db_path_real, + ) + name2id_synced = str(name2id_result.get("status") or "") in {"up_to_date", "refreshed"} + logger.info( + "[realtime] Name2Id sync account=%s db=%s status=%s rows=%s", + account_dir.name, + msg_db_path.stem, + str(name2id_result.get("status") or ""), + int(name2id_result.get("rows") or 0), + ) + except Exception as e: + logger.warning( + "[realtime] Name2Id sync failed account=%s db=%s error=%s", + account_dir.name, + msg_db_path.stem, + str(e), + ) + quoted_table = _quote_ident(table_name) row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone() try: @@ -1796,40 +1991,12 @@ def _sync_chat_realtime_messages_for_table( inserted = 0 backfilled = 0 - if new_rows: - try: - has_name2id = bool( - msg_conn.execute( - "SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1" - ).fetchone() - ) - except Exception: - has_name2id = False - - if has_name2id: - try: - msg_conn.execute( - "INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)", - (str(account_dir.name), 1), - ) - except Exception: - pass - - for r in new_rows: - try: - rid = int(r.get("real_sender_id") or 0) - except Exception: - rid = 0 - su = str(r.get("sender_username") or "").strip() - if rid <= 0 or not su: - continue - try: - msg_conn.execute( - "INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)", - (rid, su, 1), - ) - except Exception: - continue + if new_rows and (not name2id_synced): + _best_effort_upsert_output_name2id_rows( + msg_conn, + account_name=account_dir.name, + rows=new_rows, + ) values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)] insert_t0 = time.perf_counter() diff --git a/tests/test_chat_realtime_name2id_sync.py b/tests/test_chat_realtime_name2id_sync.py new file mode 100644 index 0000000..3d8fd72 --- /dev/null +++ b/tests/test_chat_realtime_name2id_sync.py @@ -0,0 +1,117 @@ +import hashlib +import sqlite3 +import sys +import threading +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory +from unittest.mock import patch + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + +from wechat_decrypt_tool.routers import chat as chat_router + + +class _DummyConn: + def __init__(self) -> None: + self.handle = 1 + self.lock = threading.Lock() + + +class TestChatRealtimeName2IdSync(unittest.TestCase): + def test_sync_repairs_name2id_even_without_new_messages(self): + with TemporaryDirectory() as td: + account_dir = Path(td) / "acc" + account_dir.mkdir(parents=True, exist_ok=True) + + username = "wxid_friend" + table_name = f"Msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + msg_db_path = account_dir / "message_0.db" + + conn = sqlite3.connect(str(msg_db_path)) + try: + conn.execute("CREATE TABLE Name2Id (user_name TEXT, is_session INTEGER DEFAULT 1)") + conn.execute( + """ + CREATE TABLE "{table_name}" ( + local_id INTEGER PRIMARY KEY, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB, + packed_info_data BLOB + ) + """.format(table_name=table_name) + ) + conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (1, ?, 1)", ("acc",)) + conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (2, ?, 1)", ("wxid_old",)) + conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (5, ?, 1)", ("wxid_gap_tail",)) + conn.execute( + f'INSERT INTO "{table_name}" ' + "(local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content, packed_info_data) " + "VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", + (10, 10010, 1, 10, 3, 1710000010, "hello", None, None), + ) + conn.commit() + finally: + conn.close() + + live_rows = [ + {"rowid": 1, "user_name": "acc", "is_session": 1}, + {"rowid": 2, "user_name": "wxid_old", "is_session": 1}, + {"rowid": 3, "user_name": "wxid_missing_a", "is_session": 1}, + {"rowid": 4, "user_name": "wxid_missing_b", "is_session": 1}, + {"rowid": 5, "user_name": "wxid_gap_tail", "is_session": 1}, + ] + + def _fake_exec_query(_handle, *, kind, path, sql): + self.assertEqual(kind, "message") + self.assertTrue(str(path).endswith("message_0.db")) + if "COUNT(1)" in sql: + return [{"c": len(live_rows), "mx": 5}] + if "ORDER BY rowid ASC" in sql: + return list(live_rows) + raise AssertionError(f"Unexpected SQL: {sql}") + + with ( + patch.object(chat_router, "_resolve_db_storage_message_paths", return_value=(Path(td) / "live_message_0.db", Path(td) / "message_resource.db")), + patch.object(chat_router, "_wcdb_exec_query", side_effect=_fake_exec_query), + patch.object(chat_router, "_wcdb_get_messages", return_value=[]), + ): + result = chat_router._sync_chat_realtime_messages_for_table( + account_dir=account_dir, + rt_conn=_DummyConn(), + username=username, + msg_db_path=msg_db_path, + table_name=table_name, + max_scan=50, + backfill_limit=0, + ) + + self.assertEqual(result.get("inserted"), 0) + + conn = sqlite3.connect(str(msg_db_path)) + try: + rows = conn.execute("SELECT rowid, user_name FROM Name2Id ORDER BY rowid ASC").fetchall() + finally: + conn.close() + + self.assertEqual( + rows, + [ + (1, "acc"), + (2, "wxid_old"), + (3, "wxid_missing_a"), + (4, "wxid_missing_b"), + (5, "wxid_gap_tail"), + ], + ) + + +if __name__ == "__main__": + unittest.main()