diff --git a/frontend/components/chat/ChatOverlays.vue b/frontend/components/chat/ChatOverlays.vue index e64d7ec..b158bc6 100644 --- a/frontend/components/chat/ChatOverlays.vue +++ b/frontend/components/chat/ChatOverlays.vue @@ -1308,7 +1308,7 @@ + @@ -1395,7 +1403,7 @@
- 点击上方范围可筛选并默认全选当前结果,再次点击可取消全选;下方整行可点选会话 + 下方整行可点选会话;搜索只影响当前自定义列表
{ + const nextTab = String(tab || 'all') + exportListTab.value = nextTab + exportScope.value = nextTab === 'groups' || nextTab === 'singles' ? nextTab : 'all' + selectExportFilteredContacts(nextTab) + } + + const onExportCustomScopeClick = () => { exportScope.value = 'selected' - onExportListTabClick(tab) + if (exportSelectedUsernames.value.length === 0) { + selectExportFilteredContacts(exportListTab.value) + } } const isDesktopExportRuntime = () => { @@ -488,6 +497,9 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte } } else if (scope === 'selected') { usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : [] + } else if (scope !== 'all' && scope !== 'groups' && scope !== 'singles') { + scope = 'selected' + usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : [] } if (scope === 'selected' && (!usernames || usernames.length === 0)) { @@ -547,7 +559,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte format: exportFormat.value, start_time: startTime, end_time: endTime, - include_hidden: false, + include_hidden: scope === 'all' || scope === 'groups' || scope === 'singles', include_official: false, message_types: messageTypes, include_media: includeMedia, @@ -618,6 +630,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte exportFilteredContacts, exportContactCounts, onExportBatchScopeClick, + onExportCustomScopeClick, onExportListTabClick, isExportContactSelected, hasWebExportFolder, diff --git a/src/wechat_decrypt_tool/chat_export_service.py b/src/wechat_decrypt_tool/chat_export_service.py index ed9ce3d..a0b0444 100644 --- a/src/wechat_decrypt_tool/chat_export_service.py +++ b/src/wechat_decrypt_tool/chat_export_service.py @@ -50,6 +50,7 @@ from .chat_helpers import ( _resource_lookup_chat_id, _should_keep_session, _split_group_sender_prefix, + _resolve_msg_table_name_by_map, ) from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC from .logging_config import get_logger @@ -3527,13 +3528,66 @@ def _resolve_export_targets( uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()])) return uniq + session_rows, session_hidden_by_username = _load_export_session_targets(account_dir) + contact_usernames = _load_export_contact_usernames(account_dir) + discovered_message_targets = _load_message_backed_export_targets( + account_dir=account_dir, + seed_usernames=contact_usernames, + ) + + def should_include(u: str) -> bool: + if not u or u == account_dir.name: + return False + if not include_hidden and int(session_hidden_by_username.get(u) or 0) == 1: + return False + if not _should_keep_session(u, include_official=include_official): + return False + if scope == "groups" and (not u.endswith("@chatroom")): + return False + if scope == "singles" and u.endswith("@chatroom"): + return False + return True + + out: list[str] = [] + seen: set[str] = set() + for u, _sort_ts in session_rows: + if u in seen or (not should_include(u)): + continue + seen.add(u) + out.append(u) + + for u, _sort_ts in sorted(discovered_message_targets.items(), key=lambda item: (-int(item[1] or 0), item[0])): + if u in seen or (not should_include(u)): + continue + seen.add(u) + out.append(u) + + return out + + +def _load_export_session_targets(account_dir: Path) -> tuple[list[tuple[str, int]], dict[str, int]]: session_db_path = account_dir / "session.db" + if not session_db_path.exists(): + return [], {} + conn = sqlite3.connect(str(session_db_path)) conn.row_factory = sqlite3.Row try: + columns = _sqlite_table_columns(conn, "SessionTable") + if "username" not in columns: + return [], {} + + hidden_expr = "is_hidden" if "is_hidden" in columns else "0" + if "sort_timestamp" in columns: + sort_expr = "sort_timestamp" + elif "last_timestamp" in columns: + sort_expr = "last_timestamp" + else: + sort_expr = "0" + rows = conn.execute( - """ - SELECT username, is_hidden + f""" + SELECT username, {hidden_expr} AS is_hidden, {sort_expr} AS sort_timestamp FROM SessionTable ORDER BY sort_timestamp DESC """, @@ -3541,20 +3595,153 @@ def _resolve_export_targets( finally: conn.close() - out: list[str] = [] + out: list[tuple[str, int]] = [] + hidden_by_username: dict[str, int] = {} + seen: set[str] = set() for r in rows: u = str(r["username"] or "").strip() if not u: continue - if not include_hidden and int(r["is_hidden"] or 0) == 1: + try: + hidden = int(r["is_hidden"] or 0) + except Exception: + hidden = 0 + if hidden: + hidden_by_username[u] = 1 + else: + hidden_by_username.setdefault(u, 0) + if u in seen: continue - if not _should_keep_session(u, include_official=include_official): + seen.add(u) + try: + sort_ts = int(r["sort_timestamp"] or 0) + except Exception: + sort_ts = 0 + out.append((u, sort_ts)) + return out, hidden_by_username + + +def _sqlite_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]: + try: + rows = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall() + except Exception: + return set() + + columns: set[str] = set() + for row in rows: + try: + name = str(row["name"] if isinstance(row, sqlite3.Row) else row[1] or "").strip().lower() + except Exception: + name = "" + if name: + columns.add(name) + return columns + + +def _load_export_contact_usernames(account_dir: Path) -> set[str]: + contact_db_path = account_dir / "contact.db" + if not contact_db_path.exists(): + return set() + + out: set[str] = set() + conn = sqlite3.connect(str(contact_db_path)) + conn.row_factory = sqlite3.Row + try: + for table in ("contact", "stranger"): + columns = _sqlite_table_columns(conn, table) + if "username" not in columns: + continue + try: + rows = conn.execute(f"SELECT username FROM {_quote_ident(table)}").fetchall() + except Exception: + continue + for row in rows: + try: + username = str(row["username"] or "").strip() + except Exception: + username = "" + if username: + out.add(username) + finally: + conn.close() + return out + + +def _load_name2id_usernames(conn: sqlite3.Connection) -> set[str]: + columns = _sqlite_table_columns(conn, "Name2Id") + username_col = "user_name" if "user_name" in columns else ("username" if "username" in columns else "") + if not username_col: + return set() + + out: set[str] = set() + try: + rows = conn.execute(f"SELECT {_quote_ident(username_col)} AS username FROM Name2Id").fetchall() + except Exception: + return out + for row in rows: + try: + username = str(row["username"] if isinstance(row, sqlite3.Row) else row[0] or "").strip() + except Exception: + username = "" + if username: + out.add(username) + return out + + +def _message_table_latest_timestamp(conn: sqlite3.Connection, table_name: str) -> Optional[int]: + quoted = _quote_ident(table_name) + try: + row = conn.execute(f"SELECT MAX(create_time) FROM {quoted}").fetchone() + if row is not None and row[0] is not None: + return int(row[0] or 0) + except Exception: + pass + + try: + row = conn.execute(f"SELECT 1 FROM {quoted} LIMIT 1").fetchone() + if row is not None: + return 0 + except Exception: + pass + return None + + +def _load_message_backed_export_targets(*, account_dir: Path, seed_usernames: set[str]) -> dict[str, int]: + out: dict[str, int] = {} + for db_path in _iter_message_db_paths(account_dir): + conn: Optional[sqlite3.Connection] = None + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + table_names = [str(r["name"] if isinstance(r, sqlite3.Row) else r[0] or "") for r in rows] + lower_to_actual = {name.lower(): name for name in table_names if name} + if not lower_to_actual: + continue + + candidates = set(seed_usernames) + candidates.update(_load_name2id_usernames(conn)) + for username in candidates: + u = str(username or "").strip() + if not u or u == account_dir.name: + continue + table_name = _resolve_msg_table_name_by_map(lower_to_actual, u) + if not table_name: + continue + latest_ts = _message_table_latest_timestamp(conn, table_name) + if latest_ts is None: + continue + previous_ts = out.get(u) + if previous_ts is None or int(latest_ts or 0) > int(previous_ts or 0): + out[u] = int(latest_ts or 0) + except Exception: continue - if scope == "groups" and (not u.endswith("@chatroom")): - continue - if scope == "singles" and u.endswith("@chatroom"): - continue - out.append(u) + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass return out diff --git a/tests/test_chat_export_targets.py b/tests/test_chat_export_targets.py new file mode 100644 index 0000000..4325cc7 --- /dev/null +++ b/tests/test_chat_export_targets.py @@ -0,0 +1,189 @@ +import hashlib +import sqlite3 +import sys +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + + +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestChatExportTargets(unittest.TestCase): + def _seed_contact_db(self, path: Path, *, account: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE contact ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE stranger ( + username TEXT, + remark TEXT, + nick_name TEXT, + alias TEXT, + local_type INTEGER, + verify_flag INTEGER, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + rows = [ + (account, "", "Me", "", 1, 0, "", ""), + ("wxid_visible", "", "Visible friend", "", 1, 0, "", ""), + ("wxid_no_session", "", "No session friend", "", 1, 0, "", ""), + ("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""), + ("room_no_session@chatroom", "", "No session group", "", 1, 0, "", ""), + ("gh_official_no_session", "", "Official account", "", 1, 24, "", ""), + ("wxid_no_messages", "", "No messages friend", "", 1, 0, "", ""), + ] + conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows) + conn.commit() + finally: + conn.close() + + def _seed_session_db(self, path: Path) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE SessionTable ( + username TEXT, + is_hidden INTEGER, + sort_timestamp INTEGER + ) + """ + ) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100)) + conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200)) + conn.commit() + finally: + conn.close() + + def _seed_message_db(self, path: Path, *, account: str) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)") + usernames = [ + account, + "wxid_visible", + "wxid_no_session", + "wxid_session_hidden", + "room_no_session@chatroom", + "gh_official_no_session", + "wxid_no_messages", + ] + for idx, username in enumerate(usernames, start=1): + conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username)) + + message_usernames = { + "wxid_visible": 100, + "wxid_no_session": 300, + "wxid_session_hidden": 400, + "room_no_session@chatroom": 350, + "gh_official_no_session": 360, + } + for username, create_time in message_usernames.items(): + table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}" + conn.execute( + f""" + CREATE TABLE {table_name} ( + local_id INTEGER, + server_id INTEGER, + local_type INTEGER, + sort_seq INTEGER, + real_sender_id INTEGER, + create_time INTEGER, + message_content TEXT, + compress_content BLOB + ) + """ + ) + conn.execute( + f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)", + (1, 1001, 1, 1, 2, create_time, f"message for {username}", None), + ) + conn.commit() + finally: + conn.close() + + def _prepare_account(self, root: Path) -> Path: + account = "wxid_account" + account_dir = root / account + account_dir.mkdir(parents=True, exist_ok=True) + self._seed_contact_db(account_dir / "contact.db", account=account) + self._seed_session_db(account_dir / "session.db") + self._seed_message_db(account_dir / "message_0.db", account=account) + return account_dir + + def test_all_scope_includes_contacts_with_messages_missing_from_session_list(self): + import wechat_decrypt_tool.chat_export_service as svc + + with TemporaryDirectory() as td: + account_dir = self._prepare_account(Path(td)) + + targets = svc._resolve_export_targets( + account_dir=account_dir, + scope="all", + usernames=[], + include_hidden=False, + include_official=False, + ) + + self.assertIn("wxid_visible", targets) + self.assertIn("wxid_no_session", targets) + self.assertIn("room_no_session@chatroom", targets) + self.assertNotIn("wxid_session_hidden", targets) + self.assertNotIn("gh_official_no_session", targets) + self.assertNotIn("wxid_no_messages", targets) + + def test_group_single_and_official_filters_apply_to_message_discovered_targets(self): + import wechat_decrypt_tool.chat_export_service as svc + + with TemporaryDirectory() as td: + account_dir = self._prepare_account(Path(td)) + + groups = svc._resolve_export_targets( + account_dir=account_dir, + scope="groups", + usernames=[], + include_hidden=False, + include_official=False, + ) + singles = svc._resolve_export_targets( + account_dir=account_dir, + scope="singles", + usernames=[], + include_hidden=False, + include_official=False, + ) + with_official = svc._resolve_export_targets( + account_dir=account_dir, + scope="all", + usernames=[], + include_hidden=False, + include_official=True, + ) + + self.assertEqual(groups, ["room_no_session@chatroom"]) + self.assertIn("wxid_no_session", singles) + self.assertNotIn("room_no_session@chatroom", singles) + self.assertIn("gh_official_no_session", with_official) + + +if __name__ == "__main__": + unittest.main()