mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
fix(chat-export): 补充导出会话列表外的有消息聊天
批量导出全部、群聊、单聊时,不再只依赖 SessionTable。 导出目标会补充 contact/stranger 与消息库 Name2Id 中存在消息表的联系人或群聊,避免微信不显示会话从左侧列表消失后漏导。 同时新增自定义范围,保留当前会话列表手动勾选导出的语义,并补充对应回归测试。
This commit is contained in:
@@ -1308,7 +1308,7 @@
|
|||||||
<button
|
<button
|
||||||
type="button"
|
type="button"
|
||||||
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
||||||
:class="exportScope === 'selected' && exportListTab === 'all' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
:class="exportScope === 'all' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
||||||
@click="onExportBatchScopeClick('all')"
|
@click="onExportBatchScopeClick('all')"
|
||||||
>
|
>
|
||||||
全部 {{ exportContactCounts.total }}
|
全部 {{ exportContactCounts.total }}
|
||||||
@@ -1316,7 +1316,7 @@
|
|||||||
<button
|
<button
|
||||||
type="button"
|
type="button"
|
||||||
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
||||||
:class="exportScope === 'selected' && exportListTab === 'groups' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
:class="exportScope === 'groups' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
||||||
@click="onExportBatchScopeClick('groups')"
|
@click="onExportBatchScopeClick('groups')"
|
||||||
>
|
>
|
||||||
群聊 {{ exportContactCounts.groups }}
|
群聊 {{ exportContactCounts.groups }}
|
||||||
@@ -1324,11 +1324,19 @@
|
|||||||
<button
|
<button
|
||||||
type="button"
|
type="button"
|
||||||
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
||||||
:class="exportScope === 'selected' && exportListTab === 'singles' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
:class="exportScope === 'singles' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
||||||
@click="onExportBatchScopeClick('singles')"
|
@click="onExportBatchScopeClick('singles')"
|
||||||
>
|
>
|
||||||
单聊 {{ exportContactCounts.singles }}
|
单聊 {{ exportContactCounts.singles }}
|
||||||
</button>
|
</button>
|
||||||
|
<button
|
||||||
|
type="button"
|
||||||
|
class="px-2.5 py-1 text-xs rounded-md border transition-colors"
|
||||||
|
:class="exportScope === 'selected' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'"
|
||||||
|
@click="onExportCustomScopeClick"
|
||||||
|
>
|
||||||
|
自定义
|
||||||
|
</button>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
@@ -1395,7 +1403,7 @@
|
|||||||
|
|
||||||
<div v-if="exportScope === 'selected'" class="mt-3">
|
<div v-if="exportScope === 'selected'" class="mt-3">
|
||||||
<div class="mb-2 text-xs text-gray-500">
|
<div class="mb-2 text-xs text-gray-500">
|
||||||
点击上方范围可筛选并默认全选当前结果,再次点击可取消全选;下方整行可点选会话
|
下方整行可点选会话;搜索只影响当前自定义列表
|
||||||
</div>
|
</div>
|
||||||
<div class="flex items-center gap-2 mb-2">
|
<div class="flex items-center gap-2 mb-2">
|
||||||
<input
|
<input
|
||||||
|
|||||||
@@ -198,8 +198,17 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
|
|||||||
}
|
}
|
||||||
|
|
||||||
const onExportBatchScopeClick = (tab) => {
|
const onExportBatchScopeClick = (tab) => {
|
||||||
|
const nextTab = String(tab || 'all')
|
||||||
|
exportListTab.value = nextTab
|
||||||
|
exportScope.value = nextTab === 'groups' || nextTab === 'singles' ? nextTab : 'all'
|
||||||
|
selectExportFilteredContacts(nextTab)
|
||||||
|
}
|
||||||
|
|
||||||
|
const onExportCustomScopeClick = () => {
|
||||||
exportScope.value = 'selected'
|
exportScope.value = 'selected'
|
||||||
onExportListTabClick(tab)
|
if (exportSelectedUsernames.value.length === 0) {
|
||||||
|
selectExportFilteredContacts(exportListTab.value)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const isDesktopExportRuntime = () => {
|
const isDesktopExportRuntime = () => {
|
||||||
@@ -488,6 +497,9 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
|
|||||||
}
|
}
|
||||||
} else if (scope === 'selected') {
|
} else if (scope === 'selected') {
|
||||||
usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : []
|
usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : []
|
||||||
|
} else if (scope !== 'all' && scope !== 'groups' && scope !== 'singles') {
|
||||||
|
scope = 'selected'
|
||||||
|
usernames = Array.isArray(exportSelectedUsernames.value) ? exportSelectedUsernames.value.filter(Boolean) : []
|
||||||
}
|
}
|
||||||
|
|
||||||
if (scope === 'selected' && (!usernames || usernames.length === 0)) {
|
if (scope === 'selected' && (!usernames || usernames.length === 0)) {
|
||||||
@@ -547,7 +559,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
|
|||||||
format: exportFormat.value,
|
format: exportFormat.value,
|
||||||
start_time: startTime,
|
start_time: startTime,
|
||||||
end_time: endTime,
|
end_time: endTime,
|
||||||
include_hidden: false,
|
include_hidden: scope === 'all' || scope === 'groups' || scope === 'singles',
|
||||||
include_official: false,
|
include_official: false,
|
||||||
message_types: messageTypes,
|
message_types: messageTypes,
|
||||||
include_media: includeMedia,
|
include_media: includeMedia,
|
||||||
@@ -618,6 +630,7 @@ export const useChatExport = ({ api, apiBase, contacts, selectedAccount, selecte
|
|||||||
exportFilteredContacts,
|
exportFilteredContacts,
|
||||||
exportContactCounts,
|
exportContactCounts,
|
||||||
onExportBatchScopeClick,
|
onExportBatchScopeClick,
|
||||||
|
onExportCustomScopeClick,
|
||||||
onExportListTabClick,
|
onExportListTabClick,
|
||||||
isExportContactSelected,
|
isExportContactSelected,
|
||||||
hasWebExportFolder,
|
hasWebExportFolder,
|
||||||
|
|||||||
@@ -50,6 +50,7 @@ from .chat_helpers import (
|
|||||||
_resource_lookup_chat_id,
|
_resource_lookup_chat_id,
|
||||||
_should_keep_session,
|
_should_keep_session,
|
||||||
_split_group_sender_prefix,
|
_split_group_sender_prefix,
|
||||||
|
_resolve_msg_table_name_by_map,
|
||||||
)
|
)
|
||||||
from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC
|
from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC
|
||||||
from .logging_config import get_logger
|
from .logging_config import get_logger
|
||||||
@@ -3527,13 +3528,66 @@ def _resolve_export_targets(
|
|||||||
uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
|
uniq = list(dict.fromkeys([str(u or "").strip() for u in usernames if str(u or "").strip()]))
|
||||||
return uniq
|
return uniq
|
||||||
|
|
||||||
|
session_rows, session_hidden_by_username = _load_export_session_targets(account_dir)
|
||||||
|
contact_usernames = _load_export_contact_usernames(account_dir)
|
||||||
|
discovered_message_targets = _load_message_backed_export_targets(
|
||||||
|
account_dir=account_dir,
|
||||||
|
seed_usernames=contact_usernames,
|
||||||
|
)
|
||||||
|
|
||||||
|
def should_include(u: str) -> bool:
|
||||||
|
if not u or u == account_dir.name:
|
||||||
|
return False
|
||||||
|
if not include_hidden and int(session_hidden_by_username.get(u) or 0) == 1:
|
||||||
|
return False
|
||||||
|
if not _should_keep_session(u, include_official=include_official):
|
||||||
|
return False
|
||||||
|
if scope == "groups" and (not u.endswith("@chatroom")):
|
||||||
|
return False
|
||||||
|
if scope == "singles" and u.endswith("@chatroom"):
|
||||||
|
return False
|
||||||
|
return True
|
||||||
|
|
||||||
|
out: list[str] = []
|
||||||
|
seen: set[str] = set()
|
||||||
|
for u, _sort_ts in session_rows:
|
||||||
|
if u in seen or (not should_include(u)):
|
||||||
|
continue
|
||||||
|
seen.add(u)
|
||||||
|
out.append(u)
|
||||||
|
|
||||||
|
for u, _sort_ts in sorted(discovered_message_targets.items(), key=lambda item: (-int(item[1] or 0), item[0])):
|
||||||
|
if u in seen or (not should_include(u)):
|
||||||
|
continue
|
||||||
|
seen.add(u)
|
||||||
|
out.append(u)
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _load_export_session_targets(account_dir: Path) -> tuple[list[tuple[str, int]], dict[str, int]]:
|
||||||
session_db_path = account_dir / "session.db"
|
session_db_path = account_dir / "session.db"
|
||||||
|
if not session_db_path.exists():
|
||||||
|
return [], {}
|
||||||
|
|
||||||
conn = sqlite3.connect(str(session_db_path))
|
conn = sqlite3.connect(str(session_db_path))
|
||||||
conn.row_factory = sqlite3.Row
|
conn.row_factory = sqlite3.Row
|
||||||
try:
|
try:
|
||||||
|
columns = _sqlite_table_columns(conn, "SessionTable")
|
||||||
|
if "username" not in columns:
|
||||||
|
return [], {}
|
||||||
|
|
||||||
|
hidden_expr = "is_hidden" if "is_hidden" in columns else "0"
|
||||||
|
if "sort_timestamp" in columns:
|
||||||
|
sort_expr = "sort_timestamp"
|
||||||
|
elif "last_timestamp" in columns:
|
||||||
|
sort_expr = "last_timestamp"
|
||||||
|
else:
|
||||||
|
sort_expr = "0"
|
||||||
|
|
||||||
rows = conn.execute(
|
rows = conn.execute(
|
||||||
"""
|
f"""
|
||||||
SELECT username, is_hidden
|
SELECT username, {hidden_expr} AS is_hidden, {sort_expr} AS sort_timestamp
|
||||||
FROM SessionTable
|
FROM SessionTable
|
||||||
ORDER BY sort_timestamp DESC
|
ORDER BY sort_timestamp DESC
|
||||||
""",
|
""",
|
||||||
@@ -3541,20 +3595,153 @@ def _resolve_export_targets(
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
out: list[str] = []
|
out: list[tuple[str, int]] = []
|
||||||
|
hidden_by_username: dict[str, int] = {}
|
||||||
|
seen: set[str] = set()
|
||||||
for r in rows:
|
for r in rows:
|
||||||
u = str(r["username"] or "").strip()
|
u = str(r["username"] or "").strip()
|
||||||
if not u:
|
if not u:
|
||||||
continue
|
continue
|
||||||
if not include_hidden and int(r["is_hidden"] or 0) == 1:
|
try:
|
||||||
|
hidden = int(r["is_hidden"] or 0)
|
||||||
|
except Exception:
|
||||||
|
hidden = 0
|
||||||
|
if hidden:
|
||||||
|
hidden_by_username[u] = 1
|
||||||
|
else:
|
||||||
|
hidden_by_username.setdefault(u, 0)
|
||||||
|
if u in seen:
|
||||||
continue
|
continue
|
||||||
if not _should_keep_session(u, include_official=include_official):
|
seen.add(u)
|
||||||
|
try:
|
||||||
|
sort_ts = int(r["sort_timestamp"] or 0)
|
||||||
|
except Exception:
|
||||||
|
sort_ts = 0
|
||||||
|
out.append((u, sort_ts))
|
||||||
|
return out, hidden_by_username
|
||||||
|
|
||||||
|
|
||||||
|
def _sqlite_table_columns(conn: sqlite3.Connection, table_name: str) -> set[str]:
|
||||||
|
try:
|
||||||
|
rows = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
|
||||||
|
except Exception:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
columns: set[str] = set()
|
||||||
|
for row in rows:
|
||||||
|
try:
|
||||||
|
name = str(row["name"] if isinstance(row, sqlite3.Row) else row[1] or "").strip().lower()
|
||||||
|
except Exception:
|
||||||
|
name = ""
|
||||||
|
if name:
|
||||||
|
columns.add(name)
|
||||||
|
return columns
|
||||||
|
|
||||||
|
|
||||||
|
def _load_export_contact_usernames(account_dir: Path) -> set[str]:
|
||||||
|
contact_db_path = account_dir / "contact.db"
|
||||||
|
if not contact_db_path.exists():
|
||||||
|
return set()
|
||||||
|
|
||||||
|
out: set[str] = set()
|
||||||
|
conn = sqlite3.connect(str(contact_db_path))
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
try:
|
||||||
|
for table in ("contact", "stranger"):
|
||||||
|
columns = _sqlite_table_columns(conn, table)
|
||||||
|
if "username" not in columns:
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
rows = conn.execute(f"SELECT username FROM {_quote_ident(table)}").fetchall()
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
for row in rows:
|
||||||
|
try:
|
||||||
|
username = str(row["username"] or "").strip()
|
||||||
|
except Exception:
|
||||||
|
username = ""
|
||||||
|
if username:
|
||||||
|
out.add(username)
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _load_name2id_usernames(conn: sqlite3.Connection) -> set[str]:
|
||||||
|
columns = _sqlite_table_columns(conn, "Name2Id")
|
||||||
|
username_col = "user_name" if "user_name" in columns else ("username" if "username" in columns else "")
|
||||||
|
if not username_col:
|
||||||
|
return set()
|
||||||
|
|
||||||
|
out: set[str] = set()
|
||||||
|
try:
|
||||||
|
rows = conn.execute(f"SELECT {_quote_ident(username_col)} AS username FROM Name2Id").fetchall()
|
||||||
|
except Exception:
|
||||||
|
return out
|
||||||
|
for row in rows:
|
||||||
|
try:
|
||||||
|
username = str(row["username"] if isinstance(row, sqlite3.Row) else row[0] or "").strip()
|
||||||
|
except Exception:
|
||||||
|
username = ""
|
||||||
|
if username:
|
||||||
|
out.add(username)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def _message_table_latest_timestamp(conn: sqlite3.Connection, table_name: str) -> Optional[int]:
|
||||||
|
quoted = _quote_ident(table_name)
|
||||||
|
try:
|
||||||
|
row = conn.execute(f"SELECT MAX(create_time) FROM {quoted}").fetchone()
|
||||||
|
if row is not None and row[0] is not None:
|
||||||
|
return int(row[0] or 0)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
row = conn.execute(f"SELECT 1 FROM {quoted} LIMIT 1").fetchone()
|
||||||
|
if row is not None:
|
||||||
|
return 0
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _load_message_backed_export_targets(*, account_dir: Path, seed_usernames: set[str]) -> dict[str, int]:
|
||||||
|
out: dict[str, int] = {}
|
||||||
|
for db_path in _iter_message_db_paths(account_dir):
|
||||||
|
conn: Optional[sqlite3.Connection] = None
|
||||||
|
try:
|
||||||
|
conn = sqlite3.connect(str(db_path))
|
||||||
|
conn.row_factory = sqlite3.Row
|
||||||
|
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
|
||||||
|
table_names = [str(r["name"] if isinstance(r, sqlite3.Row) else r[0] or "") for r in rows]
|
||||||
|
lower_to_actual = {name.lower(): name for name in table_names if name}
|
||||||
|
if not lower_to_actual:
|
||||||
|
continue
|
||||||
|
|
||||||
|
candidates = set(seed_usernames)
|
||||||
|
candidates.update(_load_name2id_usernames(conn))
|
||||||
|
for username in candidates:
|
||||||
|
u = str(username or "").strip()
|
||||||
|
if not u or u == account_dir.name:
|
||||||
|
continue
|
||||||
|
table_name = _resolve_msg_table_name_by_map(lower_to_actual, u)
|
||||||
|
if not table_name:
|
||||||
|
continue
|
||||||
|
latest_ts = _message_table_latest_timestamp(conn, table_name)
|
||||||
|
if latest_ts is None:
|
||||||
|
continue
|
||||||
|
previous_ts = out.get(u)
|
||||||
|
if previous_ts is None or int(latest_ts or 0) > int(previous_ts or 0):
|
||||||
|
out[u] = int(latest_ts or 0)
|
||||||
|
except Exception:
|
||||||
continue
|
continue
|
||||||
if scope == "groups" and (not u.endswith("@chatroom")):
|
finally:
|
||||||
continue
|
if conn is not None:
|
||||||
if scope == "singles" and u.endswith("@chatroom"):
|
try:
|
||||||
continue
|
conn.close()
|
||||||
out.append(u)
|
except Exception:
|
||||||
|
pass
|
||||||
return out
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,189 @@
|
|||||||
|
import hashlib
|
||||||
|
import sqlite3
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
from pathlib import Path
|
||||||
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
|
|
||||||
|
ROOT = Path(__file__).resolve().parents[1]
|
||||||
|
sys.path.insert(0, str(ROOT / "src"))
|
||||||
|
|
||||||
|
|
||||||
|
class TestChatExportTargets(unittest.TestCase):
|
||||||
|
def _seed_contact_db(self, path: Path, *, account: str) -> None:
|
||||||
|
conn = sqlite3.connect(str(path))
|
||||||
|
try:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE contact (
|
||||||
|
username TEXT,
|
||||||
|
remark TEXT,
|
||||||
|
nick_name TEXT,
|
||||||
|
alias TEXT,
|
||||||
|
local_type INTEGER,
|
||||||
|
verify_flag INTEGER,
|
||||||
|
big_head_url TEXT,
|
||||||
|
small_head_url TEXT
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE stranger (
|
||||||
|
username TEXT,
|
||||||
|
remark TEXT,
|
||||||
|
nick_name TEXT,
|
||||||
|
alias TEXT,
|
||||||
|
local_type INTEGER,
|
||||||
|
verify_flag INTEGER,
|
||||||
|
big_head_url TEXT,
|
||||||
|
small_head_url TEXT
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
rows = [
|
||||||
|
(account, "", "Me", "", 1, 0, "", ""),
|
||||||
|
("wxid_visible", "", "Visible friend", "", 1, 0, "", ""),
|
||||||
|
("wxid_no_session", "", "No session friend", "", 1, 0, "", ""),
|
||||||
|
("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""),
|
||||||
|
("room_no_session@chatroom", "", "No session group", "", 1, 0, "", ""),
|
||||||
|
("gh_official_no_session", "", "Official account", "", 1, 24, "", ""),
|
||||||
|
("wxid_no_messages", "", "No messages friend", "", 1, 0, "", ""),
|
||||||
|
]
|
||||||
|
conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def _seed_session_db(self, path: Path) -> None:
|
||||||
|
conn = sqlite3.connect(str(path))
|
||||||
|
try:
|
||||||
|
conn.execute(
|
||||||
|
"""
|
||||||
|
CREATE TABLE SessionTable (
|
||||||
|
username TEXT,
|
||||||
|
is_hidden INTEGER,
|
||||||
|
sort_timestamp INTEGER
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100))
|
||||||
|
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200))
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def _seed_message_db(self, path: Path, *, account: str) -> None:
|
||||||
|
conn = sqlite3.connect(str(path))
|
||||||
|
try:
|
||||||
|
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
|
||||||
|
usernames = [
|
||||||
|
account,
|
||||||
|
"wxid_visible",
|
||||||
|
"wxid_no_session",
|
||||||
|
"wxid_session_hidden",
|
||||||
|
"room_no_session@chatroom",
|
||||||
|
"gh_official_no_session",
|
||||||
|
"wxid_no_messages",
|
||||||
|
]
|
||||||
|
for idx, username in enumerate(usernames, start=1):
|
||||||
|
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username))
|
||||||
|
|
||||||
|
message_usernames = {
|
||||||
|
"wxid_visible": 100,
|
||||||
|
"wxid_no_session": 300,
|
||||||
|
"wxid_session_hidden": 400,
|
||||||
|
"room_no_session@chatroom": 350,
|
||||||
|
"gh_official_no_session": 360,
|
||||||
|
}
|
||||||
|
for username, create_time in message_usernames.items():
|
||||||
|
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
|
||||||
|
conn.execute(
|
||||||
|
f"""
|
||||||
|
CREATE TABLE {table_name} (
|
||||||
|
local_id INTEGER,
|
||||||
|
server_id INTEGER,
|
||||||
|
local_type INTEGER,
|
||||||
|
sort_seq INTEGER,
|
||||||
|
real_sender_id INTEGER,
|
||||||
|
create_time INTEGER,
|
||||||
|
message_content TEXT,
|
||||||
|
compress_content BLOB
|
||||||
|
)
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
conn.execute(
|
||||||
|
f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||||
|
(1, 1001, 1, 1, 2, create_time, f"message for {username}", None),
|
||||||
|
)
|
||||||
|
conn.commit()
|
||||||
|
finally:
|
||||||
|
conn.close()
|
||||||
|
|
||||||
|
def _prepare_account(self, root: Path) -> Path:
|
||||||
|
account = "wxid_account"
|
||||||
|
account_dir = root / account
|
||||||
|
account_dir.mkdir(parents=True, exist_ok=True)
|
||||||
|
self._seed_contact_db(account_dir / "contact.db", account=account)
|
||||||
|
self._seed_session_db(account_dir / "session.db")
|
||||||
|
self._seed_message_db(account_dir / "message_0.db", account=account)
|
||||||
|
return account_dir
|
||||||
|
|
||||||
|
def test_all_scope_includes_contacts_with_messages_missing_from_session_list(self):
|
||||||
|
import wechat_decrypt_tool.chat_export_service as svc
|
||||||
|
|
||||||
|
with TemporaryDirectory() as td:
|
||||||
|
account_dir = self._prepare_account(Path(td))
|
||||||
|
|
||||||
|
targets = svc._resolve_export_targets(
|
||||||
|
account_dir=account_dir,
|
||||||
|
scope="all",
|
||||||
|
usernames=[],
|
||||||
|
include_hidden=False,
|
||||||
|
include_official=False,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertIn("wxid_visible", targets)
|
||||||
|
self.assertIn("wxid_no_session", targets)
|
||||||
|
self.assertIn("room_no_session@chatroom", targets)
|
||||||
|
self.assertNotIn("wxid_session_hidden", targets)
|
||||||
|
self.assertNotIn("gh_official_no_session", targets)
|
||||||
|
self.assertNotIn("wxid_no_messages", targets)
|
||||||
|
|
||||||
|
def test_group_single_and_official_filters_apply_to_message_discovered_targets(self):
|
||||||
|
import wechat_decrypt_tool.chat_export_service as svc
|
||||||
|
|
||||||
|
with TemporaryDirectory() as td:
|
||||||
|
account_dir = self._prepare_account(Path(td))
|
||||||
|
|
||||||
|
groups = svc._resolve_export_targets(
|
||||||
|
account_dir=account_dir,
|
||||||
|
scope="groups",
|
||||||
|
usernames=[],
|
||||||
|
include_hidden=False,
|
||||||
|
include_official=False,
|
||||||
|
)
|
||||||
|
singles = svc._resolve_export_targets(
|
||||||
|
account_dir=account_dir,
|
||||||
|
scope="singles",
|
||||||
|
usernames=[],
|
||||||
|
include_hidden=False,
|
||||||
|
include_official=False,
|
||||||
|
)
|
||||||
|
with_official = svc._resolve_export_targets(
|
||||||
|
account_dir=account_dir,
|
||||||
|
scope="all",
|
||||||
|
usernames=[],
|
||||||
|
include_hidden=False,
|
||||||
|
include_official=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertEqual(groups, ["room_no_session@chatroom"])
|
||||||
|
self.assertIn("wxid_no_session", singles)
|
||||||
|
self.assertNotIn("room_no_session@chatroom", singles)
|
||||||
|
self.assertIn("gh_official_no_session", with_official)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user