improvement(chat): 完善会话置顶与消息卡片解析展示

- 后端:会话列表支持置顶识别(isTop)并按置顶优先排序

- 后端:修正群聊 XML 发送者提取,避免 refermsg 嵌套误识别

- 后端:完善转账状态后处理与视频缩略图 MD5 回填(packed_info_data)

- 后端:补充 quoteThumbUrl/linkType/linkStyle 字段链路

- 前端:新增置顶会话背景态、引用链接缩略图预览与 LinkCard cover 样式

- 测试:新增转账、置顶、引用解析与视频缩略图相关回归用例
This commit is contained in:
2977094657
2026-02-11 21:57:43 +08:00
parent 2ce479aefd
commit 548f3cf2c8
11 changed files with 1367 additions and 200 deletions

View File

@@ -0,0 +1,93 @@
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
class TestChatRealtimeVideoThumbMd5FromPackedInfo(unittest.TestCase):
def test_video_thumb_md5_filled_from_packed_info(self):
packed_md5 = "faff984641f9dd174e01c74f0796c9ae"
file_id = "3057020100044b3049020100020445eb9d5102032f54690204749999db0204698c336b0424deadbeef"
video_md5 = "22e6612411898b6d43b7e773e504d506"
xml = (
'<?xml version="1.0"?>\n'
"<msg>\n"
f' <videomsg fromusername="wxid_sender" md5="{video_md5}" cdnthumburl="{file_id}" cdnvideourl="{file_id}" />\n'
"</msg>\n"
)
wcdb_rows = [
{
"localId": 1,
"serverId": 123,
"localType": 43,
"sortSeq": 1700000000000,
"realSenderId": 1,
"createTime": 1700000000,
"messageContent": xml,
"compressContent": None,
"packedInfoData": packed_md5.encode("ascii"),
"senderUsername": "wxid_sender",
}
]
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
conn = _DummyConn()
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_messages", return_value=wcdb_rows),
patch.object(chat_router, "_load_contact_rows", return_value={}),
patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
patch.object(chat_router, "_load_usernames_by_display_names", return_value={}),
patch.object(chat_router, "_load_group_nickname_map", return_value={}),
):
resp = chat_router.list_chat_messages(
_DummyRequest(),
username="demo@chatroom",
account="acc",
limit=50,
offset=0,
order="asc",
render_types=None,
source="realtime",
)
self.assertEqual(resp.get("status"), "success")
messages = resp.get("messages") or []
self.assertEqual(len(messages), 1)
msg = messages[0]
self.assertEqual(msg.get("renderType"), "video")
self.assertEqual(msg.get("videoThumbMd5"), packed_md5)
thumb_url = str(msg.get("videoThumbUrl") or "")
self.assertIn(f"md5={packed_md5}", thumb_url)
self.assertNotIn("file_id=", thumb_url)
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,211 @@
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
def _seed_session_db(path: Path, rows: list[tuple[str, int, int, str]]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable(
username TEXT PRIMARY KEY,
unread_count INTEGER,
is_hidden INTEGER,
summary TEXT,
draft TEXT,
last_timestamp INTEGER,
sort_timestamp INTEGER,
last_msg_type INTEGER,
last_msg_sub_type INTEGER
)
"""
)
for username, sort_timestamp, last_timestamp, summary in rows:
conn.execute(
"""
INSERT INTO SessionTable(
username, unread_count, is_hidden, summary, draft,
last_timestamp, sort_timestamp, last_msg_type, last_msg_sub_type
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
username,
0,
0,
summary,
"",
int(last_timestamp),
int(sort_timestamp),
1,
0,
),
)
conn.commit()
finally:
conn.close()
def _seed_contact_db_with_flag(path: Path, flags: dict[str, int]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact(
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT,
flag INTEGER
)
"""
)
conn.execute(
"""
CREATE TABLE stranger(
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT,
flag INTEGER
)
"""
)
for username, flag in flags.items():
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?)",
(username, "", "", "", "", "", int(flag)),
)
conn.commit()
finally:
conn.close()
def _seed_contact_db_without_flag(path: Path, usernames: list[str]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact(
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger(
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
for username in usernames:
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?)",
(username, "", "", "", "", ""),
)
conn.commit()
finally:
conn.close()
class TestChatSessionsPinning(unittest.TestCase):
def test_pinned_session_is_sorted_first_and_has_is_top(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
_seed_session_db(
account_dir / "session.db",
[
("wxid_new", 200, 200, "new message"),
("wxid_top", 100, 100, "top older message"),
],
)
_seed_contact_db_with_flag(
account_dir / "contact.db",
{
"wxid_new": 0,
"wxid_top": 1 << 11,
},
)
with patch.object(chat_router, "_resolve_account_dir", return_value=account_dir):
resp = chat_router.list_chat_sessions(
_DummyRequest(),
account="acc",
limit=50,
include_hidden=True,
include_official=True,
preview="session",
source="",
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 2)
self.assertEqual(sessions[0].get("username"), "wxid_top")
self.assertTrue(bool(sessions[0].get("isTop")))
self.assertEqual(sessions[1].get("username"), "wxid_new")
self.assertFalse(bool(sessions[1].get("isTop")))
def test_missing_flag_column_does_not_error_and_defaults_false(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
_seed_session_db(
account_dir / "session.db",
[
("wxid_top", 100, 100, "hello"),
],
)
_seed_contact_db_without_flag(account_dir / "contact.db", ["wxid_top"])
with patch.object(chat_router, "_resolve_account_dir", return_value=account_dir):
resp = chat_router.list_chat_sessions(
_DummyRequest(),
account="acc",
limit=50,
include_hidden=True,
include_official=True,
preview="session",
source="",
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertFalse(bool(sessions[0].get("isTop")))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,23 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _extract_sender_from_group_xml
class TestGroupXmlSenderExtraction(unittest.TestCase):
def test_prefers_outer_fromusername_over_nested_refermsg(self):
xml_text = (
'<msg><appmsg><type>57</type>'
'<refermsg><fromusername>quoted_user@chatroom</fromusername></refermsg>'
'</appmsg><fromusername>actual_sender@chatroom</fromusername></msg>'
)
self.assertEqual(_extract_sender_from_group_xml(xml_text), "actual_sender@chatroom")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,115 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_app_message
class TestParseAppMessage(unittest.TestCase):
def test_quote_type_57_nested_refermsg_uses_inner_title(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>一松一紧</title><des></des><action></action><type>57</type>'
'<showtype>0</showtype><soundtype>0</soundtype><mediatagname></mediatagname>'
'<messageext></messageext><messageaction></messageaction><content></content>'
'<url></url><appattach><totallen>0</totallen><attachid></attachid><fileext></fileext></appattach>'
'<extinfo></extinfo><sourceusername></sourceusername><sourcedisplayname></sourcedisplayname>'
'<commenturl></commenturl><refermsg>'
'<type>57</type><svrid>1173057991425172913</svrid>'
'<fromusr>44372432598@chatroom</fromusr><chatusr>44372432598@chatroom</chatusr>'
'<displayname><![CDATA[ㅤ磁父]]></displayname>'
'<content><![CDATA[<msg><appmsg appid="" sdkver="0"><title>那里紧?哪里张?</title><des></des>'
'<action></action><type>57</type><showtype>0</showtype><soundtype>0</soundtype>'
'<mediatagname></mediatagname><messageext></messageext><messageaction></messageaction>'
'<content></content><url></url><appattach><totallen>0</totallen><attachid></attachid>'
'<fileext></fileext></appattach><extinfo></extinfo><sourceusername></sourceusername>'
'<sourcedisplayname></sourcedisplayname><commenturl></commenturl></appmsg></msg>]]></content>'
'</refermsg></appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "quote")
self.assertEqual(parsed.get("content"), "一松一紧")
self.assertEqual(parsed.get("quoteType"), "57")
self.assertEqual(parsed.get("quoteContent"), "那里紧?哪里张?")
def test_quote_type_57_plain_text_refermsg_keeps_text(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>回复</title><type>57</type>'
'<refermsg><type>57</type><content><![CDATA[普通文本引用]]></content></refermsg>'
'</appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "quote")
self.assertEqual(parsed.get("quoteContent"), "普通文本引用")
def test_quote_type_49_nested_xml_refermsg_uses_inner_title(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>这种傻逼公众号怎么还在看</title><type>57</type>'
'<refermsg><type>49</type><displayname><![CDATA[水豚喧喧]]></displayname>'
'<content><![CDATA[wxid_gryaI8aopjio22: <?xml version="1.0"?><msg><appmsg appid="" sdkver="0">'
'<title>为自己的美丽漂亮善良知性发声😊</title><des></des>'
'<type>5</type><url>https://mp.weixin.qq.com/s/example</url>'
'<thumburl>https://mmbiz.qpic.cn/some-thumb.jpg</thumburl>'
'</appmsg></msg>]]></content></refermsg></appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "quote")
self.assertEqual(parsed.get("quoteType"), "49")
self.assertEqual(parsed.get("quoteTitle"), "水豚喧喧")
self.assertEqual(parsed.get("quoteContent"), "[链接] 为自己的美丽漂亮善良知性发声😊")
self.assertEqual(parsed.get("quoteThumbUrl"), "https://mmbiz.qpic.cn/some-thumb.jpg")
def test_public_account_link_exposes_link_type_and_style(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>为自己的美丽漂亮善良知性发声😊</title>'
'<des>#日常穿搭灵感 #白色蕾丝裙穿搭 #知性美女</des>'
'<type>5</type>'
'<url>http://mp.weixin.qq.com/s?__biz=xx&mid=1</url>'
'<thumburl>http://mmbiz.qpic.cn/abc/640?wx_fmt=jpeg</thumburl>'
'<sourceusername>gh_0cef8eaa987d</sourceusername>'
'<sourcedisplayname>草莓不甜芒果甜</sourcedisplayname>'
'</appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "cover")
def test_quote_type_5_nested_xml_refermsg_uses_inner_title(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>这个年龄有点大啊</title><type>57</type>'
'<refermsg><type>5</type><displayname><![CDATA[水豚噜噜]]></displayname>'
'<content><![CDATA[wxid_qrval8aopiio22:\n<?xml version="1.0"?>\n<msg><appmsg appid="" sdkver="0">'
'<title>谁说冬天不能穿裙子?</title><des></des><type>5</type>'
'<thumburl>https://mmbiz.qpic.cn/some-thumb2.jpg</thumburl>'
'<url>https://mp.weixin.qq.com/s/example2</url>'
'</appmsg></msg>]]></content></refermsg></appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "quote")
self.assertEqual(parsed.get("quoteType"), "5")
self.assertEqual(parsed.get("quoteTitle"), "水豚噜噜")
self.assertEqual(parsed.get("quoteContent"), "[链接] 谁说冬天不能穿裙子?")
self.assertEqual(parsed.get("quoteThumbUrl"), "https://mmbiz.qpic.cn/some-thumb2.jpg")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,68 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class TestTransferPostprocess(unittest.TestCase):
def test_backfilled_pending_and_received_confirmation_have_expected_titles(self):
transfer_id = "1000050001202601152035503031545"
merged = [
{
"id": "message_0:Msg_x:60",
"renderType": "transfer",
"paySubType": "1",
"transferId": transfer_id,
"amount": "¥100.00",
"createTime": 1768463200,
"isSent": False,
"transferStatus": "",
},
{
"id": "message_0:Msg_x:65",
"renderType": "transfer",
"paySubType": "3",
"transferId": transfer_id,
"amount": "¥100.00",
"createTime": 1768463246,
"isSent": True,
# Pre-inferred value (may be "已被接收") should be corrected by postprocess.
"transferStatus": "已被接收",
},
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("paySubType"), "3")
self.assertEqual(merged[0].get("transferStatus"), "已被接收")
self.assertEqual(merged[1].get("paySubType"), "3")
self.assertEqual(merged[1].get("transferStatus"), "已收款")
def test_received_message_without_pending_is_left_unchanged(self):
merged = [
{
"id": "message_0:Msg_x:65",
"renderType": "transfer",
"paySubType": "3",
"transferId": "t1",
"amount": "¥100.00",
"createTime": 1,
"isSent": True,
"transferStatus": "已被接收",
}
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("transferStatus"), "已被接收")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,63 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _infer_transfer_status_text
class TestTransferStatusText(unittest.TestCase):
def test_paysubtype_3_sent_side(self):
status = _infer_transfer_status_text(
is_sent=True,
paysubtype="3",
receivestatus="",
sendertitle="",
receivertitle="",
senderdes="",
receiverdes="",
)
self.assertEqual(status, "已被接收")
def test_paysubtype_3_received_side(self):
status = _infer_transfer_status_text(
is_sent=False,
paysubtype="3",
receivestatus="",
sendertitle="",
receivertitle="",
senderdes="",
receiverdes="",
)
self.assertEqual(status, "已收款")
def test_receivestatus_1_sent_side(self):
status = _infer_transfer_status_text(
is_sent=True,
paysubtype="1",
receivestatus="1",
sendertitle="",
receivertitle="",
senderdes="",
receiverdes="",
)
self.assertEqual(status, "已被接收")
def test_receivestatus_1_received_side(self):
status = _infer_transfer_status_text(
is_sent=False,
paysubtype="1",
receivestatus="1",
sendertitle="",
receivertitle="",
senderdes="",
receiverdes="",
)
self.assertEqual(status, "已收款")
if __name__ == "__main__":
unittest.main()