mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
fix(chat-search): 将会话列表外聊天纳入搜索索引
构建聊天搜索索引时,合并 SessionTable、contact/stranger 与消息库 Name2Id 中存在消息表的联系人或群聊。 这样左侧会话列表中不存在、但数据库里仍有消息记录的聊天也可以被全局搜索命中。 提升搜索索引 schema 版本,触发旧索引自动重建,并补充默认搜索过滤下的回归测试。
This commit is contained in:
@@ -18,7 +18,7 @@ from .logging_config import get_logger
|
||||
|
||||
logger = get_logger(__name__)
|
||||
|
||||
_SCHEMA_VERSION = 1
|
||||
_SCHEMA_VERSION = 2
|
||||
_INDEX_DB_NAME = "chat_search_index.db"
|
||||
_INDEX_DB_TMP_NAME = "chat_search_index.tmp.db"
|
||||
_LEGACY_INDEX_DB_NAME = "message_fts.db"
|
||||
@@ -188,7 +188,24 @@ def _update_build_state(account_key: str, **kwargs: Any) -> None:
|
||||
st.update(kwargs)
|
||||
|
||||
|
||||
def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
|
||||
def _sqlite_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
|
||||
try:
|
||||
rows = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
|
||||
except Exception:
|
||||
return set()
|
||||
|
||||
columns: set[str] = set()
|
||||
for row in rows:
|
||||
try:
|
||||
name = str(row["name"] if isinstance(row, sqlite3.Row) else row[1] or "").strip().lower()
|
||||
except Exception:
|
||||
name = ""
|
||||
if name:
|
||||
columns.add(name)
|
||||
return columns
|
||||
|
||||
|
||||
def _load_session_table_targets(account_dir: Path) -> dict[str, dict[str, Any]]:
|
||||
session_db_path = account_dir / "session.db"
|
||||
if not session_db_path.exists():
|
||||
return {}
|
||||
@@ -196,7 +213,11 @@ def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
|
||||
conn = sqlite3.connect(str(session_db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
rows = conn.execute("SELECT username, is_hidden FROM SessionTable").fetchall()
|
||||
columns = _sqlite_table_columns(conn, "SessionTable")
|
||||
if "username" not in columns:
|
||||
return {}
|
||||
hidden_expr = "is_hidden" if "is_hidden" in columns else "0"
|
||||
rows = conn.execute(f"SELECT username, {hidden_expr} AS is_hidden FROM SessionTable").fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
@@ -214,6 +235,108 @@ def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
|
||||
return out
|
||||
|
||||
|
||||
def _load_contact_usernames_for_index(account_dir: Path) -> set[str]:
|
||||
contact_db_path = account_dir / "contact.db"
|
||||
if not contact_db_path.exists():
|
||||
return set()
|
||||
|
||||
out: set[str] = set()
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
for table in ("contact", "stranger"):
|
||||
columns = _sqlite_table_columns(conn, table)
|
||||
if "username" not in columns:
|
||||
continue
|
||||
try:
|
||||
rows = conn.execute(f"SELECT username FROM {_quote_ident(table)}").fetchall()
|
||||
except Exception:
|
||||
continue
|
||||
for row in rows:
|
||||
username = _decode_sqlite_text(row["username"]).strip()
|
||||
if username:
|
||||
out.add(username)
|
||||
finally:
|
||||
conn.close()
|
||||
return out
|
||||
|
||||
|
||||
def _load_name2id_usernames_for_index(conn: sqlite3.Connection) -> set[str]:
|
||||
columns = _sqlite_table_columns(conn, "Name2Id")
|
||||
username_col = "user_name" if "user_name" in columns else ("username" if "username" in columns else "")
|
||||
if not username_col:
|
||||
return set()
|
||||
|
||||
out: set[str] = set()
|
||||
try:
|
||||
rows = conn.execute(f"SELECT {_quote_ident(username_col)} AS username FROM Name2Id").fetchall()
|
||||
except Exception:
|
||||
return out
|
||||
|
||||
for row in rows:
|
||||
try:
|
||||
raw = row["username"] if isinstance(row, sqlite3.Row) else row[0]
|
||||
except Exception:
|
||||
raw = ""
|
||||
username = _decode_sqlite_text(raw).strip()
|
||||
if username:
|
||||
out.add(username)
|
||||
return out
|
||||
|
||||
|
||||
def _load_message_backed_index_targets(*, account_dir: Path, seed_usernames: set[str]) -> set[str]:
|
||||
out: set[str] = set()
|
||||
for db_path in _iter_message_db_paths(account_dir):
|
||||
conn: Optional[sqlite3.Connection] = None
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
conn.row_factory = sqlite3.Row
|
||||
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||
table_names = [_decode_sqlite_text(r["name"] if isinstance(r, sqlite3.Row) else r[0]).strip() for r in rows]
|
||||
lower_to_actual = {name.lower(): name for name in table_names if name}
|
||||
if not lower_to_actual:
|
||||
continue
|
||||
|
||||
candidates = set(seed_usernames)
|
||||
candidates.update(_load_name2id_usernames_for_index(conn))
|
||||
for username in candidates:
|
||||
u = str(username or "").strip()
|
||||
if not u or u == account_dir.name:
|
||||
continue
|
||||
if not _should_keep_session(u, include_official=True):
|
||||
continue
|
||||
if _resolve_msg_table_name_by_map(lower_to_actual, u):
|
||||
out.add(u)
|
||||
except Exception:
|
||||
continue
|
||||
finally:
|
||||
if conn is not None:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
return out
|
||||
|
||||
|
||||
def _load_sessions_for_index(account_dir: Path) -> dict[str, dict[str, Any]]:
|
||||
sessions = _load_session_table_targets(account_dir)
|
||||
contact_usernames = _load_contact_usernames_for_index(account_dir)
|
||||
message_backed_usernames = _load_message_backed_index_targets(
|
||||
account_dir=account_dir,
|
||||
seed_usernames=contact_usernames,
|
||||
)
|
||||
|
||||
for u in sorted(message_backed_usernames):
|
||||
if u in sessions:
|
||||
continue
|
||||
sessions[u] = {
|
||||
"is_hidden": 0,
|
||||
"is_official": 1 if u.startswith("gh_") else 0,
|
||||
}
|
||||
|
||||
return sessions
|
||||
|
||||
|
||||
def _init_index_db(conn: sqlite3.Connection) -> None:
|
||||
# NOTE: This index DB is built as a temporary file and then atomically swapped in.
|
||||
# Using WAL here would create `-wal/-shm` side files that are *not* swapped together,
|
||||
|
||||
@@ -0,0 +1,174 @@
|
||||
import hashlib
|
||||
import sqlite3
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
|
||||
class TestChatSearchIndexTargets(unittest.TestCase):
|
||||
def _seed_contact_db(self, path: Path, *, account: str) -> None:
|
||||
conn = sqlite3.connect(str(path))
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE contact (
|
||||
username TEXT,
|
||||
remark TEXT,
|
||||
nick_name TEXT,
|
||||
alias TEXT,
|
||||
local_type INTEGER,
|
||||
verify_flag INTEGER,
|
||||
big_head_url TEXT,
|
||||
small_head_url TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE stranger (
|
||||
username TEXT,
|
||||
remark TEXT,
|
||||
nick_name TEXT,
|
||||
alias TEXT,
|
||||
local_type INTEGER,
|
||||
verify_flag INTEGER,
|
||||
big_head_url TEXT,
|
||||
small_head_url TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
rows = [
|
||||
(account, "", "Me", "", 1, 0, "", ""),
|
||||
("wxid_visible", "", "Visible friend", "", 1, 0, "", ""),
|
||||
("wxid_no_session", "", "No session friend", "", 1, 0, "", ""),
|
||||
("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""),
|
||||
("gh_official_no_session", "", "Official account", "", 1, 24, "", ""),
|
||||
]
|
||||
conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _seed_session_db(self, path: Path) -> None:
|
||||
conn = sqlite3.connect(str(path))
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE SessionTable (
|
||||
username TEXT,
|
||||
is_hidden INTEGER,
|
||||
sort_timestamp INTEGER
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100))
|
||||
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200))
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _seed_message_db(self, path: Path, *, account: str) -> None:
|
||||
conn = sqlite3.connect(str(path))
|
||||
try:
|
||||
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
|
||||
usernames = [
|
||||
account,
|
||||
"wxid_visible",
|
||||
"wxid_no_session",
|
||||
"wxid_session_hidden",
|
||||
"gh_official_no_session",
|
||||
]
|
||||
for idx, username in enumerate(usernames, start=1):
|
||||
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username))
|
||||
|
||||
message_usernames = {
|
||||
"wxid_visible": "visible searchable text",
|
||||
"wxid_no_session": "missing session searchable text",
|
||||
"wxid_session_hidden": "hidden searchable text",
|
||||
"gh_official_no_session": "official searchable text",
|
||||
}
|
||||
for username, content in message_usernames.items():
|
||||
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
|
||||
conn.execute(
|
||||
f"""
|
||||
CREATE TABLE {table_name} (
|
||||
local_id INTEGER,
|
||||
server_id INTEGER,
|
||||
local_type INTEGER,
|
||||
sort_seq INTEGER,
|
||||
real_sender_id INTEGER,
|
||||
create_time INTEGER,
|
||||
message_content TEXT,
|
||||
compress_content BLOB
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(1, 1001, 1, 1, 2, 300, content, None),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def _prepare_account(self, root: Path) -> Path:
|
||||
account = "wxid_account"
|
||||
account_dir = root / account
|
||||
account_dir.mkdir(parents=True, exist_ok=True)
|
||||
self._seed_contact_db(account_dir / "contact.db", account=account)
|
||||
self._seed_session_db(account_dir / "session.db")
|
||||
self._seed_message_db(account_dir / "message_0.db", account=account)
|
||||
return account_dir
|
||||
|
||||
def test_index_includes_message_backed_contacts_missing_from_session_list(self):
|
||||
import wechat_decrypt_tool.chat_search_index as idx
|
||||
from wechat_decrypt_tool.chat_helpers import _build_fts_query
|
||||
|
||||
with TemporaryDirectory() as td:
|
||||
account_dir = self._prepare_account(Path(td))
|
||||
|
||||
idx._build_worker(account_dir, rebuild=True)
|
||||
|
||||
index_path = idx.get_chat_search_index_db_path(account_dir)
|
||||
conn = sqlite3.connect(str(index_path))
|
||||
try:
|
||||
rows = conn.execute(
|
||||
"""
|
||||
SELECT username, is_hidden, is_official
|
||||
FROM message_fts
|
||||
ORDER BY username
|
||||
"""
|
||||
).fetchall()
|
||||
fts_query = _build_fts_query("missing session")
|
||||
default_search_rows = conn.execute(
|
||||
"""
|
||||
SELECT username
|
||||
FROM message_fts
|
||||
WHERE message_fts MATCH ?
|
||||
AND CAST(is_hidden AS INTEGER) = 0
|
||||
AND CAST(is_official AS INTEGER) = 0
|
||||
""",
|
||||
(fts_query,),
|
||||
).fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
by_username = {str(r[0]): (int(r[1] or 0), int(r[2] or 0)) for r in rows}
|
||||
default_search_usernames = [str(r[0]) for r in default_search_rows]
|
||||
self.assertIn("wxid_visible", by_username)
|
||||
self.assertIn("wxid_no_session", by_username)
|
||||
self.assertIn("wxid_session_hidden", by_username)
|
||||
self.assertIn("gh_official_no_session", by_username)
|
||||
self.assertEqual(by_username["wxid_no_session"], (0, 0))
|
||||
self.assertEqual(by_username["wxid_session_hidden"], (1, 0))
|
||||
self.assertEqual(by_username["gh_official_no_session"], (0, 1))
|
||||
self.assertEqual(default_search_usernames, ["wxid_no_session"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user