Files
WeChatDataAnalysis/tests/test_chat_export_targets.py

226 lines
8.7 KiB
Python

import hashlib
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportTargets(unittest.TestCase):
def _seed_contact_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
rows = [
(account, "", "Me", "", 1, 0, "", ""),
("wxid_visible", "", "Visible friend", "", 1, 0, "", ""),
("wxid_no_session", "", "No session friend", "", 1, 0, "", ""),
("wxid_session_hidden", "", "Hidden session friend", "", 1, 0, "", ""),
("room_no_session@chatroom", "", "No session group", "", 1, 0, "", ""),
("room_hidden@chatroom", "", "Hidden session group", "", 1, 0, "", ""),
("gh_official_no_session", "", "Official account", "", 1, 24, "", ""),
("wxid_no_messages", "", "No messages friend", "", 1, 0, "", ""),
]
conn.executemany("INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)", rows)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_visible", 0, 100))
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("wxid_session_hidden", 1, 200))
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", ("room_hidden@chatroom", 1, 250))
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
usernames = [
account,
"wxid_visible",
"wxid_no_session",
"wxid_session_hidden",
"room_no_session@chatroom",
"room_hidden@chatroom",
"gh_official_no_session",
"wxid_no_messages",
]
for idx, username in enumerate(usernames, start=1):
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (idx, username))
message_usernames = {
"wxid_visible": 100,
"wxid_no_session": 300,
"wxid_session_hidden": 400,
"room_no_session@chatroom": 350,
"room_hidden@chatroom": 450,
"gh_official_no_session": 360,
}
for username, create_time in message_usernames.items():
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
conn.execute(
f"INSERT INTO {table_name} VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 1, 1, 2, create_time, f"message for {username}", None),
)
conn.commit()
finally:
conn.close()
def _prepare_account(self, root: Path) -> Path:
account = "wxid_account"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account)
self._seed_session_db(account_dir / "session.db")
self._seed_message_db(account_dir / "message_0.db", account=account)
return account_dir
def test_all_scope_includes_contacts_with_messages_missing_from_session_list(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
targets = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=False,
include_official=False,
)
self.assertIn("wxid_visible", targets)
self.assertIn("wxid_no_session", targets)
self.assertIn("room_no_session@chatroom", targets)
self.assertNotIn("wxid_session_hidden", targets)
self.assertNotIn("room_hidden@chatroom", targets)
self.assertNotIn("gh_official_no_session", targets)
self.assertNotIn("wxid_no_messages", targets)
def test_group_single_and_official_filters_apply_to_message_discovered_targets(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
groups = svc._resolve_export_targets(
account_dir=account_dir,
scope="groups",
usernames=[],
include_hidden=False,
include_official=False,
)
singles = svc._resolve_export_targets(
account_dir=account_dir,
scope="singles",
usernames=[],
include_hidden=False,
include_official=False,
)
with_official = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=False,
include_official=True,
)
self.assertEqual(groups, ["room_no_session@chatroom"])
self.assertIn("wxid_no_session", singles)
self.assertNotIn("room_no_session@chatroom", singles)
self.assertIn("gh_official_no_session", with_official)
def test_preview_counts_match_bulk_export_targets_including_hidden_sessions(self):
import wechat_decrypt_tool.chat_export_service as svc
with TemporaryDirectory() as td:
account_dir = self._prepare_account(Path(td))
preview = svc.build_chat_export_targets_preview(
account_dir=account_dir,
include_hidden=True,
include_official=False,
base_url="http://example.test",
)
actual_targets = svc._resolve_export_targets(
account_dir=account_dir,
scope="all",
usernames=[],
include_hidden=True,
include_official=False,
)
preview_targets = preview["targets"]
preview_usernames = [item["username"] for item in preview_targets]
by_username = {item["username"]: item for item in preview_targets}
self.assertEqual(preview_usernames, actual_targets)
self.assertEqual(preview["counts"], {"total": 5, "groups": 2, "singles": 3})
self.assertTrue(by_username["room_hidden@chatroom"]["isHidden"])
self.assertTrue(by_username["room_hidden@chatroom"]["inSessionList"])
self.assertFalse(by_username["room_no_session@chatroom"]["inSessionList"])
self.assertTrue(by_username["room_no_session@chatroom"]["avatar"].startswith("http://example.test/api/chat/avatar?"))
if __name__ == "__main__":
unittest.main()