fix(chat): 修复群聊置顶系统消息显示名称与导出文案

- 解析 ChatRoomTopMsg 文本/XML 系统消息,识别置顶与取消置顶操作
- 优先使用备注名/联系人名替换 wxid,避免实时消息、历史消息、搜索结果显示原始账号
- 导出 JSON/TXT/HTML 时复用同一套系统消息名称解析逻辑
- 补充系统消息解析与实时消息展示测试,覆盖多种消息载荷格式
This commit is contained in:
2977094657
2026-03-22 15:34:05 +08:00
Unverified
parent 94ec5c9a1c
commit a01dba4a93
5 changed files with 470 additions and 54 deletions
+17 -2
View File
@@ -19,7 +19,7 @@ import zipfile
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Iterable, Literal, Optional
from typing import Any, Callable, Iterable, Literal, Optional
from urllib.parse import urljoin, urlparse
import requests
@@ -3386,6 +3386,7 @@ def _parse_message_for_export(
resource_conn: Optional[sqlite3.Connection],
resource_chat_id: Optional[int],
sender_alias: str = "",
resolve_display_name: Optional[Callable[[str], str]] = None,
) -> dict[str, Any]:
raw_text = row.raw_text or ""
sender_username = str(row.sender_username or "").strip()
@@ -3449,7 +3450,18 @@ def _parse_message_for_export(
if local_type == 10000:
render_type = "system"
content_text = _parse_system_message_content(raw_text)
system_display_name_resolver = None
if resolve_display_name is not None:
def system_display_name_resolver(username: str, fallback_display_name: str) -> str:
resolved = str(resolve_display_name(username) or "").strip()
if resolved and resolved != username:
return resolved
fallback = str(fallback_display_name or "").strip()
return fallback or resolved or username
content_text = _parse_system_message_content(
raw_text,
resolve_display_name=system_display_name_resolver,
)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -3923,6 +3935,7 @@ def _write_conversation_json(
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
sender_alias=sender_alias,
resolve_display_name=resolve_display_name,
)
if not _is_render_type_selected(msg.get("renderType"), want_types):
continue
@@ -4101,6 +4114,7 @@ def _write_conversation_txt(
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
sender_alias=sender_alias,
resolve_display_name=resolve_display_name,
)
if not _is_render_type_selected(msg.get("renderType"), want_types):
continue
@@ -4859,6 +4873,7 @@ def _write_conversation_html(
resource_conn=resource_conn,
resource_chat_id=resource_chat_id,
sender_alias="",
resolve_display_name=resolve_display_name,
)
if not _is_render_type_selected(msg.get("renderType"), want_types):
continue
+113 -2
View File
@@ -7,7 +7,7 @@ import sqlite3
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from typing import Any, Callable, Optional
from urllib.parse import parse_qs, quote, urlparse
from fastapi import HTTPException
@@ -787,7 +787,112 @@ def _parse_location_message(text: str) -> dict[str, Any]:
}
def _parse_system_message_content(raw_text: str) -> str:
def _extract_chatroom_top_message_metadata(raw_text: str) -> dict[str, str]:
text = str(raw_text or "").strip()
if not text:
return {}
lower_text = text.lower()
if "<mmchatroomtopmsg" in lower_text or "<sysmsg" in lower_text:
chatroom_id = str(_extract_xml_tag_text(text, "chatroomname") or "").strip()
operation = str(_extract_xml_tag_text(text, "op") or "").strip()
operator_username = str(_extract_xml_tag_text(text, "username") or "").strip()
operator_display_name = str(_extract_xml_tag_text(text, "nickname") or "").strip()
if chatroom_id.endswith("@chatroom") and operation in {"1", "2"} and operator_username:
return {
"operation": operation,
"operatorUsername": operator_username,
"operatorDisplayName": operator_display_name,
}
def _is_int_token(value: str) -> bool:
candidate = str(value or "").strip()
if not candidate:
return False
if candidate[0] in {"+", "-"}:
candidate = candidate[1:]
return candidate.isdigit()
normalized = re.sub(r"<!--\s*ChatRoomTopMsgRequest\s*-->", " ", text, flags=re.IGNORECASE)
normalized = re.sub(r"<!--\s*ChatRoomTopMsgResponse\s*-->", " ", normalized, flags=re.IGNORECASE)
normalized = re.sub(r"\s+", " ", normalized).strip()
if not normalized:
return {}
parts = normalized.split(" ")
has_markers = ("chatroomtopmsgrequest" in lower_text) or ("chatroomtopmsgresponse" in lower_text)
if len(parts) < 5:
return {}
chatroom_id = str(parts[0] or "").strip()
operation = str(parts[1] or "").strip()
if not chatroom_id.endswith("@chatroom"):
return {}
if operation not in {"1", "2"}:
return {}
if not has_markers:
if len(parts) < 6:
return {}
if not _is_int_token(parts[2]) or not _is_int_token(parts[3]) or not _is_int_token(parts[5]):
return {}
operator_username = str(parts[4] or "").strip()
if not operator_username:
return {}
operator_display_name = ""
if len(parts) >= 6 and _is_int_token(parts[5]):
response_tokens = parts[6:]
if len(response_tokens) >= 2 and _is_int_token(response_tokens[-1]):
response_tokens = response_tokens[:-1]
operator_display_name = " ".join(response_tokens).strip()
return {
"operation": operation,
"operatorUsername": operator_username,
"operatorDisplayName": operator_display_name,
}
def _parse_chatroom_top_message(
raw_text: str,
resolve_display_name: Optional[Callable[[str, str], str]] = None,
) -> str:
meta = _extract_chatroom_top_message_metadata(raw_text)
if not meta:
return ""
operation = str(meta.get("operation") or "").strip()
operator_username = str(meta.get("operatorUsername") or "").strip()
operator_display_name = str(meta.get("operatorDisplayName") or "").strip()
if resolve_display_name is not None and operator_username:
try:
resolved = str(resolve_display_name(operator_username, operator_display_name) or "").strip()
except Exception:
resolved = ""
if resolved:
operator_display_name = resolved
if not operator_display_name:
operator_display_name = operator_username or "有人"
action_map = {
"1": "置顶了一条消息",
"2": "移除了一条置顶消息",
}
action = action_map.get(operation)
if not action:
return ""
return f"{operator_display_name}{action}"
def _parse_system_message_content(
raw_text: str,
resolve_display_name: Optional[Callable[[str, str], str]] = None,
) -> str:
text = str(raw_text or "").strip()
if not text:
return "[系统消息]"
@@ -801,12 +906,17 @@ def _parse_system_message_content(raw_text: str) -> str:
if nested_content:
candidate = nested_content
candidate = re.sub(r"<!--.*?-->", " ", candidate, flags=re.IGNORECASE | re.DOTALL)
candidate = re.sub(r"<!\[CDATA\[", "", candidate, flags=re.IGNORECASE)
candidate = re.sub(r"\]\]>", "", candidate)
candidate = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", candidate)
candidate = re.sub(r"\s+", " ", candidate).strip()
return candidate
top_message_text = _parse_chatroom_top_message(text, resolve_display_name=resolve_display_name)
if top_message_text:
return top_message_text
if "revokemsg" in text.lower():
replace_msg = _extract_xml_tag_text(text, "replacemsg")
cleaned_replace_msg = _clean_system_text(replace_msg)
@@ -2334,4 +2444,5 @@ def _row_to_search_hit(
"locationLng": location_lng,
"locationPoiname": location_poiname,
"locationLabel": location_label,
"_rawText": raw_text if local_type in (10000, 266287972401) else "",
}
+168 -50
View File
@@ -26,6 +26,7 @@ from ..chat_helpers import (
_build_fts_query,
_decode_message_content,
_decode_sqlite_text,
_extract_chatroom_top_message_metadata,
_extract_md5_from_packed_info,
_extract_sender_from_group_xml,
_extract_xml_attr,
@@ -514,6 +515,61 @@ def _resolve_sender_display_name(
return display_name
def _resolve_system_message_display_name(
*,
sender_username: str,
fallback_display_name: str,
sender_contact_rows: dict[str, sqlite3.Row],
wcdb_display_names: dict[str, str],
) -> str:
su = str(sender_username or "").strip()
fallback = str(fallback_display_name or "").strip()
if not su:
return fallback or "有人"
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name != su:
return display_name
if fallback and fallback != su:
return fallback
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
return wd
return fallback or wd or su
def _postprocess_special_message_content(
*,
message: dict[str, Any],
sender_contact_rows: dict[str, sqlite3.Row],
wcdb_display_names: dict[str, str],
) -> None:
raw = str(message.get("_rawText") or "")
if not raw:
message.pop("_rawText", None)
return
local_type = int(message.get("type") or 0)
if local_type == 266287972401:
message["content"] = _parse_pat_message(raw, sender_contact_rows)
elif local_type == 10000:
message["content"] = _parse_system_message_content(
raw,
resolve_display_name=lambda sender_username, fallback_display_name="": _resolve_system_message_display_name(
sender_username=sender_username,
fallback_display_name=fallback_display_name,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
),
)
message.pop("_rawText", None)
def _realtime_sync_lock(account: str, username: str) -> threading.Lock:
key = (str(account or "").strip(), str(username or "").strip())
with _REALTIME_SYNC_MU:
@@ -3034,7 +3090,7 @@ def _append_full_messages_from_rows(
"locationLng": location_lng,
"locationPoiname": location_poiname,
"locationLabel": location_label,
"_rawText": raw_text if local_type == 266287972401 else "",
"_rawText": raw_text if local_type in (10000, 266287972401) else "",
}
)
@@ -3271,9 +3327,20 @@ def _postprocess_full_messages(
if fn and fn in name_to_username:
m["fromUsername"] = name_to_username[fn]
system_usernames: set[str] = set()
for m in merged:
if int(m.get("type") or 0) != 10000:
continue
meta = _extract_chatroom_top_message_metadata(str(m.get("_rawText") or ""))
operator_username = str(meta.get("operatorUsername") or "").strip()
if operator_username:
system_usernames.add(operator_username)
from_usernames = [str(m.get("fromUsername") or "").strip() for m in merged]
uniq_senders = list(
dict.fromkeys([u for u in (sender_usernames + list(pat_usernames) + quote_usernames + from_usernames) if u])
dict.fromkeys(
[u for u in (sender_usernames + list(pat_usernames) + quote_usernames + from_usernames + list(system_usernames)) if u]
)
)
sender_contact_rows = _load_contact_rows(contact_db_path, uniq_senders)
local_sender_avatars = _query_head_image_usernames(head_image_db_path, uniq_senders)
@@ -3327,20 +3394,19 @@ def _postprocess_full_messages(
m["from"] = wd
su = str(m.get("senderUsername") or "")
if not su:
continue
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
local_avatar_usernames=local_sender_avatars,
)
m["senderAvatar"] = avatar_url
if su:
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
local_avatar_usernames=local_sender_avatars,
)
m["senderAvatar"] = avatar_url
qu = str(m.get("quoteUsername") or "").strip()
if qu:
@@ -3471,13 +3537,11 @@ def _postprocess_full_messages(
except Exception:
pass
if int(m.get("type") or 0) == 266287972401:
raw = str(m.get("_rawText") or "")
if raw:
m["content"] = _parse_pat_message(raw, sender_contact_rows)
if "_rawText" in m:
m.pop("_rawText", None)
_postprocess_special_message_content(
message=m,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
)
@router.get("/api/chat/accounts", summary="列出已解密账号")
@@ -4526,7 +4590,7 @@ def _collect_chat_messages(
"locationLng": location_lng,
"locationPoiname": location_poiname,
"locationLabel": location_label,
"_rawText": raw_text if local_type == 266287972401 else "",
"_rawText": raw_text if local_type in (10000, 266287972401) else "",
}
)
finally:
@@ -5409,7 +5473,7 @@ def list_chat_messages(
"paySubType": pay_sub_type,
"transferStatus": transfer_status,
"transferId": transfer_id,
"_rawText": raw_text if local_type == 266287972401 else "",
"_rawText": raw_text if local_type in (10000, 266287972401) else "",
}
)
finally:
@@ -5498,6 +5562,15 @@ def list_chat_messages(
continue
pat_usernames_in_page.update({mm.group(1) for mm in re.finditer(r"\$\{([^}]+)\}", template) if mm.group(1)})
system_usernames_in_page: set[str] = set()
for m in messages_window:
if int(m.get("type") or 0) != 10000:
continue
meta = _extract_chatroom_top_message_metadata(str(m.get("_rawText") or ""))
operator_username = str(meta.get("operatorUsername") or "").strip()
if operator_username:
system_usernames_in_page.add(operator_username)
from_usernames = [str(m.get("fromUsername") or "").strip() for m in messages_window]
sender_usernames_in_page = [str(m.get("senderUsername") or "").strip() for m in messages_window]
quote_usernames_in_page = [str(m.get("quoteUsername") or "").strip() for m in messages_window]
@@ -5510,6 +5583,7 @@ def list_chat_messages(
+ list(pat_usernames_in_page)
+ quote_usernames_in_page
+ from_usernames
+ list(system_usernames_in_page)
)
if u
]
@@ -5567,20 +5641,19 @@ def list_chat_messages(
m["from"] = wd
su = str(m.get("senderUsername") or "")
if not su:
continue
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
local_avatar_usernames=local_sender_avatars,
)
m["senderAvatar"] = avatar_url
if su:
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
local_avatar_usernames=local_sender_avatars,
)
m["senderAvatar"] = avatar_url
qu = str(m.get("quoteUsername") or "").strip()
if qu:
@@ -5706,13 +5779,11 @@ def list_chat_messages(
except Exception:
pass
if int(m.get("type") or 0) == 266287972401:
raw = str(m.get("_rawText") or "")
if raw:
m["content"] = _parse_pat_message(raw, sender_contact_rows)
if "_rawText" in m:
m.pop("_rawText", None)
_postprocess_special_message_content(
message=m,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
)
return {
"status": "success",
@@ -6032,7 +6103,14 @@ async def _search_chat_messages_via_fts(
scope = "conversation" if username else "global"
if username:
uniq_usernames = list(dict.fromkeys([username] + [str(x.get("senderUsername") or "") for x in hits]))
system_usernames = [
str(_extract_chatroom_top_message_metadata(str(x.get("_rawText") or "")).get("operatorUsername") or "").strip()
for x in hits
if int(x.get("type") or 0) == 10000
]
uniq_usernames = list(
dict.fromkeys([username] + [str(x.get("senderUsername") or "") for x in hits] + system_usernames)
)
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
local_avatar_usernames = _query_head_image_usernames(head_image_db_path, uniq_usernames)
@@ -6099,10 +6177,22 @@ async def _search_chat_messages_via_fts(
local_avatar_usernames=local_avatar_usernames,
)
h["senderAvatar"] = avatar_url
_postprocess_special_message_content(
message=h,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
)
else:
system_usernames = [
str(_extract_chatroom_top_message_metadata(str(x.get("_rawText") or "")).get("operatorUsername") or "").strip()
for x in hits
if int(x.get("type") or 0) == 10000
]
uniq_contacts = list(
dict.fromkeys(
[str(x.get("username") or "") for x in hits] + [str(x.get("senderUsername") or "") for x in hits]
[str(x.get("username") or "") for x in hits]
+ [str(x.get("senderUsername") or "") for x in hits]
+ system_usernames
)
)
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
@@ -6182,6 +6272,11 @@ async def _search_chat_messages_via_fts(
local_avatar_usernames=local_avatar_usernames,
)
h["senderAvatar"] = avatar_url
_postprocess_special_message_content(
message=h,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
)
return {
"status": "success",
@@ -6434,7 +6529,14 @@ async def search_chat_messages(
total_in_scan = len(conv_hits)
page = conv_hits[int(offset) : int(offset) + int(limit)]
uniq_usernames = list(dict.fromkeys([username] + [str(x.get("senderUsername") or "") for x in page]))
system_usernames = [
str(_extract_chatroom_top_message_metadata(str(x.get("_rawText") or "")).get("operatorUsername") or "").strip()
for x in page
if int(x.get("type") or 0) == 10000
]
uniq_usernames = list(
dict.fromkeys([username] + [str(x.get("senderUsername") or "") for x in page] + system_usernames)
)
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
conv_row = contact_rows.get(username)
conv_name = _pick_display_name(conv_row, username)
@@ -6455,6 +6557,11 @@ async def search_chat_messages(
wcdb_display_names={},
group_nicknames=group_nicknames,
)
_postprocess_special_message_content(
message=h,
sender_contact_rows=contact_rows,
wcdb_display_names={},
)
return {
"status": "success",
@@ -6531,7 +6638,13 @@ async def search_chat_messages(
uniq_contacts = list(
dict.fromkeys(
[str(x.get("username") or "") for x in page] + [str(x.get("senderUsername") or "") for x in page]
[str(x.get("username") or "") for x in page]
+ [str(x.get("senderUsername") or "") for x in page]
+ [
str(_extract_chatroom_top_message_metadata(str(x.get("_rawText") or "")).get("operatorUsername") or "").strip()
for x in page
if int(x.get("type") or 0) == 10000
]
)
)
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
@@ -6566,6 +6679,11 @@ async def search_chat_messages(
wcdb_display_names={},
group_nicknames=group_nickname_cache.get(cu, {}),
)
_postprocess_special_message_content(
message=h,
sender_contact_rows=contact_rows,
wcdb_display_names={},
)
return {
"status": "success",
@@ -0,0 +1,95 @@
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
class TestChatRealtimeSystemMessageDisplayName(unittest.TestCase):
def test_realtime_chatroom_top_message_prefers_remark_name(self):
raw_text = (
"17990148862@chatroom 2 3546361838777087323 0 "
"wxid_k7zhjk9xvzsk22 21 A 69"
)
wcdb_rows = [
{
"localId": 1,
"serverId": 123,
"localType": 10000,
"sortSeq": 1700000000000,
"realSenderId": 0,
"createTime": 1700000000,
"messageContent": raw_text,
"compressContent": None,
"packedInfoData": None,
"senderUsername": "",
"isSent": False,
}
]
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
conn = _DummyConn()
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_messages", return_value=wcdb_rows),
patch.object(
chat_router,
"_load_contact_rows",
return_value={
"wxid_k7zhjk9xvzsk22": {
"remark": "周鑫",
"nick_name": "A",
"alias": "",
}
},
),
patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
patch.object(chat_router, "_load_usernames_by_display_names", return_value={}),
patch.object(chat_router, "_load_group_nickname_map", return_value={}),
):
resp = chat_router.list_chat_messages(
_DummyRequest(),
username="17990148862@chatroom",
account="acc",
limit=50,
offset=0,
order="asc",
render_types=None,
source="realtime",
)
self.assertEqual(resp.get("status"), "success")
messages = resp.get("messages") or []
self.assertEqual(len(messages), 1)
msg = messages[0]
self.assertEqual(msg.get("renderType"), "system")
self.assertEqual(msg.get("content"), "周鑫移除了一条置顶消息")
self.assertNotIn("_rawText", msg)
if __name__ == "__main__":
unittest.main()
+77
View File
@@ -37,6 +37,83 @@ class TestChatSystemMessageParsing(unittest.TestCase):
raw_text = "<sysmsg><template><![CDATA[ 张三 加入了群聊 ]]></template></sysmsg>"
self.assertEqual(_parse_system_message_content(raw_text), "张三 加入了群聊")
def test_chatroom_top_message_uses_response_name_by_default(self):
raw_text = (
"<!-- ChatRoomTopMsgRequest --> 17990148862@chatroom 1 3546361838777087323 49 "
"wxid_7iazcmpjn90k22 <!-- ChatRoomTopMsgResponse --> 21 新青年 68"
)
self.assertEqual(_parse_system_message_content(raw_text), "新青年置顶了一条消息")
def test_chatroom_top_message_prefers_resolved_display_name(self):
raw_text = (
"<!-- ChatRoomTopMsgRequest --> 17990148862@chatroom 2 3546361838777087323 0 "
"wxid_k7zhjk9xvzsk22 <!-- ChatRoomTopMsgResponse --> 21 A 69"
)
def resolve_display_name(username: str, fallback: str) -> str:
self.assertEqual(username, "wxid_k7zhjk9xvzsk22")
self.assertEqual(fallback, "A")
return "周鑫"
self.assertEqual(
_parse_system_message_content(raw_text, resolve_display_name=resolve_display_name),
"周鑫移除了一条置顶消息",
)
def test_chatroom_top_message_without_comment_markers_still_parses(self):
raw_text = "17990148862@chatroom 1 3546361838777087323 49 wxid_7iazcmpjn90k22 21 新青年 68"
self.assertEqual(_parse_system_message_content(raw_text), "新青年置顶了一条消息")
def test_chatroom_top_message_without_comment_markers_still_prefers_resolved_name(self):
raw_text = "17990148862@chatroom 2 3546361838777087323 0 wxid_k7zhjk9xvzsk22 21 A 69"
def resolve_display_name(username: str, fallback: str) -> str:
self.assertEqual(username, "wxid_k7zhjk9xvzsk22")
self.assertEqual(fallback, "A")
return "周鑫"
self.assertEqual(
_parse_system_message_content(raw_text, resolve_display_name=resolve_display_name),
"周鑫移除了一条置顶消息",
)
def test_chatroom_top_message_xml_payload_still_parses(self):
raw_text = (
'<sysmsg type="mmchatroomtopmsg"><mmchatroomtopmsg>'
'<chatroomname><![CDATA[17990148862@chatroom]]></chatroomname>'
'<op><![CDATA[1]]></op>'
'<newmsgid><![CDATA[3546361838777087323]]></newmsgid>'
'<msgtype><![CDATA[49]]></msgtype>'
'<username><![CDATA[wxid_7iazcmpjn90k22]]></username>'
'<id><![CDATA[21]]></id>'
'<nickname><![CDATA[新青年]]></nickname>'
'</mmchatroomtopmsg><chatroominfoversion><![CDATA[68]]></chatroominfoversion></sysmsg>'
)
self.assertEqual(_parse_system_message_content(raw_text), "新青年置顶了一条消息")
def test_chatroom_top_message_xml_payload_prefers_resolved_name(self):
raw_text = (
'<sysmsg type="mmchatroomtopmsg"><mmchatroomtopmsg>'
'<chatroomname><![CDATA[17990148862@chatroom]]></chatroomname>'
'<op><![CDATA[2]]></op>'
'<newmsgid><![CDATA[3546361838777087323]]></newmsgid>'
'<msgtype><![CDATA[0]]></msgtype>'
'<username><![CDATA[wxid_k7zhjk9xvzsk22]]></username>'
'<id><![CDATA[21]]></id>'
'<nickname><![CDATA[A]]></nickname>'
'</mmchatroomtopmsg><chatroominfoversion><![CDATA[69]]></chatroominfoversion></sysmsg>'
)
def resolve_display_name(username: str, fallback: str) -> str:
self.assertEqual(username, "wxid_k7zhjk9xvzsk22")
self.assertEqual(fallback, "A")
return "周鑫"
self.assertEqual(
_parse_system_message_content(raw_text, resolve_display_name=resolve_display_name),
"周鑫移除了一条置顶消息",
)
if __name__ == "__main__":
unittest.main()