import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class _FakeResponse:
def __init__(self, body: bytes, *, content_type: str) -> None:
self.status_code = 200
self.headers = {
"Content-Type": str(content_type or "").strip(),
"Content-Length": str(len(body)),
}
self._body = body
def iter_content(self, chunk_size=65536):
data = self._body or b""
for i in range(0, len(data), int(chunk_size or 65536)):
yield data[i : i + int(chunk_size or 65536)]
def close(self):
return None
class TestChatExportRemoteThumbOption(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "我", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> tuple[str, str]:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
link_thumb = "https://1.1.1.1/thumb.png"
quote_thumb = "https://1.1.1.1/quote.png"
link_xml = (
""
"5"
"示例链接"
"这是描述"
"https://example.com/"
f"{link_thumb}"
""
)
quote_xml = (
""
"57"
"回复"
""
"49"
"8888"
"wxid_other"
"对方"
""
"5被引用链接https://example.com/"
f"{quote_thumb}"
""
""
""
""
)
rows = [
(1, 1001, 49, 1, 2, 1735689601, link_xml, None),
(2, 1002, 49, 2, 2, 1735689602, quote_xml, None),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
return link_thumb, quote_thumb
finally:
conn.close()
def _prepare_account(self, root: Path, *, account: str, username: str) -> tuple[Path, str, str]:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
link_thumb, quote_thumb = self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
return account_dir, link_thumb, quote_thumb
def _create_job(self, manager, *, account: str, username: str, download_remote_media: bool):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"],
message_types=["link", "quote", "image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=download_remote_media,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_remote_thumb_disabled_does_not_download(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
with mock.patch.object(
svc.requests,
"get",
side_effect=AssertionError("requests.get should not be called when download_remote_media=False"),
) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=False,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertEqual(m_get.call_count, 0)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn(f'src="{link_thumb}"', html_text)
self.assertIn(f'src="{quote_thumb}"', html_text)
self.assertFalse(any(n.startswith("media/remote/") for n in names))
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_remote_thumb_enabled_downloads_and_rewrites(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
fake_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde"
def _fake_get(url, **_kwargs):
return _FakeResponse(fake_png, content_type="image/png")
with mock.patch.object(svc.requests, "get", side_effect=_fake_get) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=True,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertGreaterEqual(m_get.call_count, 1)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
h1 = hashlib.sha256(link_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc1 = f"media/remote/{h1[:32]}.png"
self.assertIn(arc1, names)
self.assertIn(f"../../{arc1}", html_text)
self.assertNotIn(f'src="{link_thumb}"', html_text)
h2 = hashlib.sha256(quote_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc2 = f"media/remote/{h2[:32]}.png"
self.assertIn(arc2, names)
self.assertIn(f"../../{arc2}", html_text)
self.assertNotIn(f'src="{quote_thumb}"', html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data