From cf58d26e6f6003c2a4ffee75f018dc9ef60889bc Mon Sep 17 00:00:00 2001
From: 2977094657 <2977094657@qq.com>
Date: Mon, 9 Feb 2026 18:31:47 +0800
Subject: [PATCH] =?UTF-8?q?test(chat):=20=E8=A6=86=E7=9B=96=E7=B3=BB?=
=?UTF-8?q?=E7=BB=9F=E6=92=A4=E5=9B=9E/=E7=BE=A4=E5=90=8D=E7=89=87/?=
=?UTF-8?q?=E5=AE=9E=E6=97=B6=E4=BC=9A=E8=AF=9D=E5=90=8C=E6=AD=A5=E7=9B=B8?=
=?UTF-8?q?=E5=85=B3=E7=94=A8=E4=BE=8B?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
- 新增系统撤回消息 replacemsg 解析与导出语义测试
- 新增群聊会话预览格式化与群名片 ext_buffer 解析测试
- 新增 realtime 会话列表与 sync_all 落库 last_sender_display_name 测试
---
...est_chat_export_message_types_semantics.py | 41 +++++++
...me_sync_all_updates_sender_display_name.py | 111 +++++++++++++++++
tests/test_chat_session_preview_formatting.py | 68 +++++++++++
...t_chat_sessions_realtime_sender_preview.py | 103 ++++++++++++++++
tests/test_chat_system_message_parsing.py | 42 +++++++
.../test_group_nickname_ext_buffer_parsing.py | 114 ++++++++++++++++++
6 files changed, 479 insertions(+)
create mode 100644 tests/test_chat_realtime_sync_all_updates_sender_display_name.py
create mode 100644 tests/test_chat_session_preview_formatting.py
create mode 100644 tests/test_chat_sessions_realtime_sender_preview.py
create mode 100644 tests/test_chat_system_message_parsing.py
create mode 100644 tests/test_group_nickname_ext_buffer_parsing.py
diff --git a/tests/test_chat_export_message_types_semantics.py b/tests/test_chat_export_message_types_semantics.py
index 42114de..d641bb8 100644
--- a/tests/test_chat_export_message_types_semantics.py
+++ b/tests/test_chat_export_message_types_semantics.py
@@ -122,6 +122,16 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
(3, 1003, 49, 3, 2, 1735689603, '2000收到转账0.01元', None),
(4, 1004, 1, 4, 2, 1735689604, '普通文本消息', None),
(5, 1005, 10000, 5, 2, 1735689605, '系统提示消息', None),
+ (
+ 6,
+ 1006,
+ 10000,
+ 6,
+ 2,
+ 1735689606,
+ '',
+ None,
+ ),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
@@ -413,6 +423,37 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
+ def test_system_revoke_exports_readable_revoker_content(self):
+ with TemporaryDirectory() as td:
+ root = Path(td)
+ account = "wxid_test"
+ username = "wxid_friend"
+ self._prepare_account(root, account=account, username=username)
+
+ prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
+ try:
+ os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
+ svc = self._reload_export_modules()
+ job = self._create_job(
+ svc.CHAT_EXPORT_MANAGER,
+ account=account,
+ username=username,
+ message_types=["system"],
+ include_media=False,
+ )
+ self.assertEqual(job.status, "done", msg=job.error)
+
+ payload, _, _ = self._load_export_payload(job.zip_path)
+ revoke_msg = next((m for m in payload.get("messages", []) if int(m.get("serverId") or 0) == 1006), None)
+ self.assertIsNotNone(revoke_msg)
+ self.assertEqual(str(revoke_msg.get("renderType") or ""), "system")
+ self.assertEqual(str(revoke_msg.get("content") or ""), "“测试好友”撤回了一条消息")
+ finally:
+ if prev_data is None:
+ os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
+ else:
+ os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
+
if __name__ == "__main__":
unittest.main()
diff --git a/tests/test_chat_realtime_sync_all_updates_sender_display_name.py b/tests/test_chat_realtime_sync_all_updates_sender_display_name.py
new file mode 100644
index 0000000..db60770
--- /dev/null
+++ b/tests/test_chat_realtime_sync_all_updates_sender_display_name.py
@@ -0,0 +1,111 @@
+import sqlite3
+import sys
+import threading
+import unittest
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest.mock import patch
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+from wechat_decrypt_tool.routers import chat as chat_router
+
+
+class _DummyRequest:
+ base_url = "http://testserver/"
+
+
+class _DummyConn:
+ def __init__(self) -> None:
+ self.handle = 1
+ self.lock = threading.Lock()
+
+
+def _seed_session_db(session_db_path: Path) -> None:
+ conn = sqlite3.connect(str(session_db_path))
+ try:
+ conn.execute(
+ """
+ CREATE TABLE SessionTable (
+ username TEXT PRIMARY KEY,
+ unread_count INTEGER DEFAULT 0,
+ is_hidden INTEGER DEFAULT 0,
+ summary TEXT DEFAULT '',
+ draft TEXT DEFAULT '',
+ last_timestamp INTEGER DEFAULT 0,
+ sort_timestamp INTEGER DEFAULT 0,
+ last_msg_locald_id INTEGER DEFAULT 0,
+ last_msg_type INTEGER DEFAULT 0,
+ last_msg_sub_type INTEGER DEFAULT 0,
+ last_msg_sender TEXT DEFAULT '',
+ last_sender_display_name TEXT DEFAULT ''
+ )
+ """
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+
+class TestChatRealtimeSyncAllUpdatesSenderDisplayName(unittest.TestCase):
+ def test_sync_all_upserts_last_sender_display_name(self):
+ with TemporaryDirectory() as td:
+ account_dir = Path(td) / "acc"
+ account_dir.mkdir(parents=True, exist_ok=True)
+ _seed_session_db(account_dir / "session.db")
+
+ conn = _DummyConn()
+ sessions_rows = [
+ {
+ "username": "demo@chatroom",
+ "unread_count": 0,
+ "is_hidden": 0,
+ "summary": "hello",
+ "draft": "",
+ "last_timestamp": 123,
+ "sort_timestamp": 123,
+ "last_msg_type": 1,
+ "last_msg_sub_type": 0,
+ "last_msg_sender": "wxid_demo",
+ "last_sender_display_name": "群名片A",
+ "last_msg_locald_id": 777,
+ }
+ ]
+
+ with (
+ patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
+ patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
+ patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
+ patch.object(chat_router, "_ensure_decrypted_message_tables", return_value={}),
+ patch.object(chat_router, "_should_keep_session", return_value=True),
+ ):
+ resp = chat_router.sync_chat_realtime_messages_all(
+ _DummyRequest(),
+ account="acc",
+ max_scan=20,
+ include_hidden=True,
+ include_official=True,
+ )
+
+ self.assertEqual(resp.get("status"), "success")
+
+ db = sqlite3.connect(str(account_dir / "session.db"))
+ try:
+ row = db.execute(
+ "SELECT last_sender_display_name, last_msg_sender, last_msg_locald_id FROM SessionTable WHERE username = ? LIMIT 1",
+ ("demo@chatroom",),
+ ).fetchone()
+ finally:
+ db.close()
+
+ self.assertIsNotNone(row)
+ self.assertEqual(str(row[0] or ""), "群名片A")
+ self.assertEqual(str(row[1] or ""), "wxid_demo")
+ self.assertEqual(int(row[2] or 0), 777)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/test_chat_session_preview_formatting.py b/tests/test_chat_session_preview_formatting.py
new file mode 100644
index 0000000..55466a7
--- /dev/null
+++ b/tests/test_chat_session_preview_formatting.py
@@ -0,0 +1,68 @@
+import sqlite3
+import sys
+import unittest
+from pathlib import Path
+from tempfile import TemporaryDirectory
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+from wechat_decrypt_tool.chat_helpers import (
+ _build_group_sender_display_name_map,
+ _normalize_session_preview_text,
+ _replace_preview_sender_prefix,
+)
+
+
+class TestChatSessionPreviewFormatting(unittest.TestCase):
+ def test_normalize_session_preview_emoji_label(self):
+ out = _normalize_session_preview_text("[表情]", is_group=False, sender_display_names={})
+ self.assertEqual(out, "[动画表情]")
+
+ def test_normalize_group_preview_sender_display_name(self):
+ out = _normalize_session_preview_text(
+ "wxid_u3gwceqvne2m22: [表情]",
+ is_group=True,
+ sender_display_names={"wxid_u3gwceqvne2m22": "食神"},
+ )
+ self.assertEqual(out, "食神: [动画表情]")
+
+ def test_build_group_sender_display_name_map_from_contact_db(self):
+ with TemporaryDirectory() as td:
+ contact_db_path = Path(td) / "contact.db"
+ conn = sqlite3.connect(str(contact_db_path))
+ try:
+ conn.execute(
+ """
+ CREATE TABLE contact (
+ username TEXT,
+ remark TEXT,
+ nick_name TEXT,
+ alias TEXT,
+ big_head_url TEXT,
+ small_head_url TEXT
+ )
+ """
+ )
+ conn.execute(
+ "INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?)",
+ ("wxid_u3gwceqvne2m22", "", "食神", "", "", ""),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+ mapping = _build_group_sender_display_name_map(
+ contact_db_path,
+ {"demo@chatroom": "wxid_u3gwceqvne2m22: [动画表情]"},
+ )
+ self.assertEqual(mapping.get("wxid_u3gwceqvne2m22"), "食神")
+
+ def test_replace_preview_sender_prefix_uses_group_nickname(self):
+ out = _replace_preview_sender_prefix("去码头整点🍟: [动画表情]", "麻辣香锅")
+ self.assertEqual(out, "麻辣香锅: [动画表情]")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_chat_sessions_realtime_sender_preview.py b/tests/test_chat_sessions_realtime_sender_preview.py
new file mode 100644
index 0000000..dd92bad
--- /dev/null
+++ b/tests/test_chat_sessions_realtime_sender_preview.py
@@ -0,0 +1,103 @@
+import sys
+import threading
+import unittest
+from pathlib import Path
+from tempfile import TemporaryDirectory
+from unittest.mock import patch
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+
+from wechat_decrypt_tool.routers import chat as chat_router
+
+
+class _DummyRequest:
+ base_url = "http://testserver/"
+
+
+class _DummyConn:
+ def __init__(self) -> None:
+ self.handle = 1
+ self.lock = threading.Lock()
+
+
+class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
+ def _run(self, sessions_rows: list[dict]) -> dict:
+ with TemporaryDirectory() as td:
+ account_dir = Path(td) / "acc"
+ account_dir.mkdir(parents=True, exist_ok=True)
+
+ conn = _DummyConn()
+ with (
+ patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
+ patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
+ patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
+ patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
+ patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
+ patch.object(chat_router, "_load_contact_rows", return_value={}),
+ patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
+ patch.object(chat_router, "_should_keep_session", return_value=True),
+ patch.object(chat_router, "_avatar_url_unified", return_value="/avatar"),
+ ):
+ return chat_router.list_chat_sessions(
+ _DummyRequest(),
+ account="acc",
+ limit=50,
+ include_hidden=True,
+ include_official=True,
+ preview="latest",
+ source="realtime",
+ )
+
+ def test_realtime_sessions_group_summary_prefixed_by_sender_display_name(self):
+ resp = self._run(
+ [
+ {
+ "username": "demo@chatroom",
+ "summary": "hello",
+ "draft": "",
+ "unread_count": 0,
+ "is_hidden": 0,
+ "last_timestamp": 123,
+ "sort_timestamp": 123,
+ "last_msg_type": 1,
+ "last_msg_sub_type": 0,
+ "last_msg_sender": "wxid_demo",
+ "last_sender_display_name": "群名片A",
+ }
+ ]
+ )
+ self.assertEqual(resp.get("status"), "success")
+ sessions = resp.get("sessions") or []
+ self.assertEqual(len(sessions), 1)
+ self.assertEqual(sessions[0].get("lastMessage"), "群名片A: hello")
+
+ def test_realtime_sessions_group_url_summary_keeps_scheme(self):
+ resp = self._run(
+ [
+ {
+ "username": "url@chatroom",
+ "summary": "https://example.com/x",
+ "draft": "",
+ "unread_count": 0,
+ "is_hidden": 0,
+ "last_timestamp": 123,
+ "sort_timestamp": 123,
+ "last_msg_type": 1,
+ "last_msg_sub_type": 0,
+ "last_msg_sender": "wxid_demo",
+ "last_sender_display_name": "群名片B",
+ }
+ ]
+ )
+ self.assertEqual(resp.get("status"), "success")
+ sessions = resp.get("sessions") or []
+ self.assertEqual(len(sessions), 1)
+ self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
+
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/test_chat_system_message_parsing.py b/tests/test_chat_system_message_parsing.py
new file mode 100644
index 0000000..7828dfc
--- /dev/null
+++ b/tests/test_chat_system_message_parsing.py
@@ -0,0 +1,42 @@
+import sys
+import unittest
+from pathlib import Path
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+from wechat_decrypt_tool.chat_helpers import _parse_system_message_content
+
+
+class TestChatSystemMessageParsing(unittest.TestCase):
+ def test_extract_replacemsg_for_revoke(self):
+ raw_text = (
+ ''
+ ""
+ )
+ self.assertEqual(_parse_system_message_content(raw_text), "“张三”撤回了一条消息")
+
+ def test_extract_nested_content_in_replacemsg(self):
+ raw_text = (
+ '"黄智欢" 撤回了一条消息0'
+ ']]>'
+ )
+ self.assertEqual(_parse_system_message_content(raw_text), '"黄智欢" 撤回了一条消息')
+
+ def test_extract_revokemsg_text_when_replacemsg_missing(self):
+ raw_text = "你撤回了一条消息"
+ self.assertEqual(_parse_system_message_content(raw_text), "你撤回了一条消息")
+
+ def test_revoke_fallback_when_no_readable_text(self):
+ raw_text = ''
+ self.assertEqual(_parse_system_message_content(raw_text), "撤回了一条消息")
+
+ def test_normal_system_message_still_cleaned(self):
+ raw_text = ""
+ self.assertEqual(_parse_system_message_content(raw_text), "张三 加入了群聊")
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/tests/test_group_nickname_ext_buffer_parsing.py b/tests/test_group_nickname_ext_buffer_parsing.py
new file mode 100644
index 0000000..40558c4
--- /dev/null
+++ b/tests/test_group_nickname_ext_buffer_parsing.py
@@ -0,0 +1,114 @@
+import sqlite3
+import sys
+import unittest
+from pathlib import Path
+from tempfile import TemporaryDirectory
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+from wechat_decrypt_tool.chat_helpers import _load_group_nickname_map_from_contact_db
+
+
+def _enc_varint(n: int) -> bytes:
+ v = int(n)
+ out = bytearray()
+ while True:
+ b = v & 0x7F
+ v >>= 7
+ if v:
+ out.append(b | 0x80)
+ else:
+ out.append(b)
+ break
+ return bytes(out)
+
+
+def _enc_tag(field_no: int, wire_type: int) -> bytes:
+ return _enc_varint((int(field_no) << 3) | int(wire_type))
+
+
+def _enc_len(field_no: int, data: bytes) -> bytes:
+ b = bytes(data or b"")
+ return _enc_tag(field_no, 2) + _enc_varint(len(b)) + b
+
+
+def _member_entry(*, inner: bytes) -> bytes:
+ # contact.db ext_buffer uses repeated length-delimited submessages; the top-level field number is not important
+ # for our best-effort parser, so we use field 1.
+ return _enc_len(1, inner)
+
+
+class TestGroupNicknameExtBufferParsing(unittest.TestCase):
+ def test_parse_pattern_a_field1_username_field2_display(self):
+ chatroom = "demo@chatroom"
+ username = "wxid_demo_123456"
+ display = "群名片A"
+
+ inner = _enc_len(1, username.encode("utf-8")) + _enc_len(2, display.encode("utf-8"))
+ ext_buffer = _member_entry(inner=inner)
+
+ with TemporaryDirectory() as td:
+ contact_db_path = Path(td) / "contact.db"
+ conn = sqlite3.connect(str(contact_db_path))
+ try:
+ conn.execute(
+ "CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
+ )
+ conn.execute(
+ "INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
+ (1, chatroom, "", ext_buffer),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+ out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
+ self.assertEqual(out.get(username), display)
+
+ def test_parse_pattern_b_field4_username_field1_display(self):
+ chatroom = "demo2@chatroom"
+ username = "wxid_demo_abcdef"
+ display = "hjlbingo"
+
+ inner = _enc_len(4, username.encode("utf-8")) + _enc_len(1, display.encode("utf-8"))
+ ext_buffer = _member_entry(inner=inner)
+
+ with TemporaryDirectory() as td:
+ contact_db_path = Path(td) / "contact.db"
+ conn = sqlite3.connect(str(contact_db_path))
+ try:
+ conn.execute(
+ "CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
+ )
+ conn.execute(
+ "INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
+ (1, chatroom, "", ext_buffer),
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+ out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
+ self.assertEqual(out.get(username), display)
+
+ def test_non_chatroom_returns_empty(self):
+ with TemporaryDirectory() as td:
+ contact_db_path = Path(td) / "contact.db"
+ conn = sqlite3.connect(str(contact_db_path))
+ try:
+ conn.execute(
+ "CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
+ )
+ conn.commit()
+ finally:
+ conn.close()
+
+ out = _load_group_nickname_map_from_contact_db(contact_db_path, "wxid_not_chatroom", ["wxid_xxx"])
+ self.assertEqual(out, {})
+
+
+if __name__ == "__main__":
+ unittest.main()
+