feat(chat): 群聊预览补齐群名片并完善系统消息解析

- 新增系统撤回消息解析:优先提取 replacemsg,并统一清洗文本

- 群聊会话预览文本规范化([表情] -> [动画表情]),并支持发送者前缀展示名替换

- 群名片解析来源扩展:contact.db ext_buffer + WCDB realtime(可选新 DLL 接口)

- 图片接口增强:支持 server_id + username 反查消息提取 md5,提升引用图片命中
This commit is contained in:
2977094657
2026-02-09 18:31:00 +08:00
parent c0b76d7a72
commit 814abba2f9
6 changed files with 815 additions and 80 deletions

View File

@@ -28,6 +28,7 @@ from .chat_helpers import (
_load_contact_rows, _load_contact_rows,
_lookup_resource_md5, _lookup_resource_md5,
_parse_app_message, _parse_app_message,
_parse_system_message_content,
_parse_pat_message, _parse_pat_message,
_pick_display_name, _pick_display_name,
_quote_ident, _quote_ident,
@@ -954,13 +955,7 @@ def _parse_message_for_export(
if local_type == 10000: if local_type == 10000:
render_type = "system" render_type = "system"
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
import re as _re
content_text = _re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = _re.sub(r"\\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 49: elif local_type == 49:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text") render_type = str(parsed.get("renderType") or "text")

View File

@@ -645,6 +645,43 @@ def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str:
return _extract_xml_attr(xml_text, name) return _extract_xml_attr(xml_text, name)
def _parse_system_message_content(raw_text: str) -> str:
text = str(raw_text or "").strip()
if not text:
return "[系统消息]"
def _clean_system_text(value: str) -> str:
candidate = str(value or "").strip()
if not candidate:
return ""
nested_content = _extract_xml_tag_text(candidate, "content")
if nested_content:
candidate = nested_content
candidate = re.sub(r"<!\[CDATA\[", "", candidate, flags=re.IGNORECASE)
candidate = re.sub(r"\]\]>", "", candidate)
candidate = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", candidate)
candidate = re.sub(r"\s+", " ", candidate).strip()
return candidate
if "revokemsg" in text.lower():
replace_msg = _extract_xml_tag_text(text, "replacemsg")
cleaned_replace_msg = _clean_system_text(replace_msg)
if cleaned_replace_msg:
return cleaned_replace_msg
revoke_msg = _extract_xml_tag_text(text, "revokemsg")
cleaned_revoke_msg = _clean_system_text(revoke_msg)
if cleaned_revoke_msg:
return cleaned_revoke_msg
return "撤回了一条消息"
content_text = _clean_system_text(text)
return content_text or "[系统消息]"
def _extract_refermsg_block(xml_text: str) -> str: def _extract_refermsg_block(xml_text: str) -> str:
if not xml_text: if not xml_text:
return "" return ""
@@ -1053,11 +1090,7 @@ def _build_latest_message_preview(
content_text = "" content_text = ""
if local_type == 10000: if local_type == 10000:
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 244813135921: elif local_type == 244813135921:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
qt = str(parsed.get("quoteTitle") or "").strip() qt = str(parsed.get("quoteTitle") or "").strip()
@@ -1093,7 +1126,7 @@ def _build_latest_message_preview(
elif local_type == 43 or local_type == 62: elif local_type == 43 or local_type == 62:
content_text = "[视频]" content_text = "[视频]"
elif local_type == 47: elif local_type == 47:
content_text = "[表情]" content_text = "[动画表情]"
else: else:
if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')): if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
content_text = raw_text content_text = raw_text
@@ -1107,6 +1140,101 @@ def _build_latest_message_preview(
return content_text return content_text
def _extract_group_preview_sender_username(preview_text: str) -> str:
text = str(preview_text or "").strip()
if not text:
return ""
match = re.match(r"^([^:\s]{1,128}):\s*.+$", text)
if not match:
return ""
sender = str(match.group(1) or "").strip()
if not sender:
return ""
if sender.startswith("wxid_") or sender.endswith("@chatroom") or ("@" in sender):
return sender
if re.fullmatch(r"[A-Za-z][A-Za-z0-9_-]{1,127}", sender):
return sender
return ""
def _normalize_session_preview_text(
preview_text: str,
*,
is_group: bool,
sender_display_names: Optional[dict[str, str]] = None,
) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
text = text.replace("[表情]", "[动画表情]")
if (not is_group) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\s]{1,128}):\s*(.+)$", text)
if not match:
return text
sender_username = str(match.group(1) or "").strip()
body = str(match.group(2) or "").strip()
if (not sender_username) or (not body):
return text
display_name = str((sender_display_names or {}).get(sender_username) or "").strip()
if display_name and display_name != sender_username:
return f"{display_name}: {body}"
return text
def _replace_preview_sender_prefix(preview_text: str, sender_display_name: str) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
display_name = str(sender_display_name or "").strip()
if (not display_name) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if not match:
return text
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if not body:
return text
return f"{display_name}: {body}"
def _build_group_sender_display_name_map(
contact_db_path: Path,
previews: dict[str, str],
) -> dict[str, str]:
group_sender_usernames: set[str] = set()
for conv_username, preview_text in previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username:
group_sender_usernames.add(sender_username)
if not group_sender_usernames:
return {}
display_names: dict[str, str] = {}
sender_contact_rows = _load_contact_rows(contact_db_path, list(group_sender_usernames))
for sender_username in group_sender_usernames:
row = sender_contact_rows.get(sender_username)
if row is None:
continue
display_name = _pick_display_name(row, sender_username)
if display_name and display_name != sender_username:
display_names[sender_username] = display_name
return display_names
def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]: def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
if not usernames: if not usernames:
return {} return {}
@@ -1338,6 +1466,208 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
conn.close() conn.close()
def _load_group_nickname_map_from_contact_db(
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
) -> dict[str, str]:
"""Best-effort mapping for group member nickname (aka group card) from contact.db.
WeChat stores per-chatroom member nicknames in `contact.db.chat_room.ext_buffer` as a protobuf-like blob.
This helper parses that blob and returns { sender_username -> group_nickname } for the requested senders.
Notes:
- Best-effort: never raises; returns {} on any failure.
- Only resolves usernames included in `sender_usernames` to keep parsing cheap.
"""
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
target_set = set(targets)
def decode_varint(raw: bytes, offset: int) -> tuple[Optional[int], int]:
value = 0
shift = 0
pos = int(offset)
n = len(raw)
while pos < n:
byte = raw[pos]
pos += 1
value |= (byte & 0x7F) << shift
if (byte & 0x80) == 0:
return value, pos
shift += 7
if shift > 63:
return None, n
return None, n
def iter_fields(raw: bytes):
idx = 0
n = len(raw)
while idx < n:
tag, idx_next = decode_varint(raw, idx)
if tag is None or idx_next <= idx:
break
idx = idx_next
field_no = int(tag) >> 3
wire_type = int(tag) & 0x7
if wire_type == 0:
_, idx_next = decode_varint(raw, idx)
if idx_next <= idx:
break
idx = idx_next
continue
if wire_type == 2:
size, idx_next = decode_varint(raw, idx)
if size is None or idx_next <= idx:
break
idx = idx_next
end = idx + int(size)
if end > n:
break
chunk = raw[idx:end]
idx = end
yield field_no, wire_type, chunk
continue
if wire_type == 1:
idx += 8
continue
if wire_type == 5:
idx += 4
continue
break
def is_strong_username_hint(s: str) -> bool:
v = str(s or "").strip()
return v.startswith("wxid_") or v.endswith("@chatroom") or v.startswith("gh_") or ("@" in v)
def looks_like_username(s: str) -> bool:
v = str(s or "").strip()
if not v:
return False
if is_strong_username_hint(v):
return True
# Common alias-style WeChat IDs are ASCII-ish and do not contain whitespace.
if len(v) < 6 or len(v) > 32:
return False
if re.search(r"\s", v):
return False
if not re.match(r"^[A-Za-z][A-Za-z0-9_-]+$", v):
return False
if v.isdigit():
return False
return True
def pick_display(strings: list[tuple[int, str]], target: str) -> str:
best_score = -1
best = ""
for i, (fno, value) in enumerate(strings):
v = str(value or "").strip()
if (not v) or v == target:
continue
if is_strong_username_hint(v):
continue
if "\n" in v or "\r" in v:
continue
if len(v) > 64:
continue
score = 0
if int(fno) == 2:
score += 100
if not looks_like_username(v):
score += 20
score += max(0, 32 - len(v))
# Stable tie-breaker: prefer earlier appearance.
score = score * 1000 - i
if score > best_score:
best_score = score
best = v
return best
try:
conn = sqlite3.connect(str(contact_db_path))
except Exception:
return {}
try:
row = conn.execute(
"SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1",
(chatroom,),
).fetchone()
if row is None:
return {}
ext = row[0]
if ext is None:
return {}
if isinstance(ext, memoryview):
ext_buf = ext.tobytes()
elif isinstance(ext, (bytes, bytearray)):
ext_buf = bytes(ext)
else:
return {}
if not ext_buf:
return {}
out: dict[str, str] = {}
for _, wire_type, chunk in iter_fields(ext_buf):
if wire_type != 2 or (not chunk):
continue
# Parse submessage and collect UTF-8 strings.
strings: list[tuple[int, str]] = []
try:
for sfno, swire, sval in iter_fields(chunk):
if swire != 2:
continue
if not sval:
continue
if len(sval) > 256:
continue
try:
txt = bytes(sval).decode("utf-8", errors="strict")
except Exception:
continue
txt = txt.strip()
if not txt:
continue
strings.append((int(sfno), txt))
except Exception:
continue
if not strings:
continue
present = [v for _, v in strings if v in target_set and v not in out]
if not present:
continue
for target in present:
disp = pick_display(strings, target)
if disp:
out[target] = disp
if len(out) >= len(target_set):
break
return out
except Exception:
return {}
finally:
try:
conn.close()
except Exception:
pass
def _load_usernames_by_display_names(contact_db_path: Path, names: list[str]) -> dict[str, str]: def _load_usernames_by_display_names(contact_db_path: Path, names: list[str]) -> dict[str, str]:
"""Best-effort mapping from display name -> username using contact.db. """Best-effort mapping from display name -> username using contact.db.
@@ -1515,11 +1845,7 @@ def _row_to_search_hit(
if local_type == 10000: if local_type == 10000:
render_type = "system" render_type = "system"
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 49: elif local_type == 49:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text") render_type = str(parsed.get("renderType") or "text")

View File

@@ -23,17 +23,17 @@ logger = get_logger(__name__)
# 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录) # 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录)
_OUTPUT_DATABASES_DIR = get_output_databases_dir()
_PACKAGE_ROOT = Path(__file__).resolve().parent _PACKAGE_ROOT = Path(__file__).resolve().parent
def _list_decrypted_accounts() -> list[str]: def _list_decrypted_accounts() -> list[str]:
"""列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)""" """列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)"""
if not _OUTPUT_DATABASES_DIR.exists(): output_db_dir = get_output_databases_dir()
if not output_db_dir.exists():
return [] return []
accounts: list[str] = [] accounts: list[str] = []
for p in _OUTPUT_DATABASES_DIR.iterdir(): for p in output_db_dir.iterdir():
if not p.is_dir(): if not p.is_dir():
continue continue
if (p / "session.db").exists() and (p / "contact.db").exists(): if (p / "session.db").exists() and (p / "contact.db").exists():
@@ -45,6 +45,7 @@ def _list_decrypted_accounts() -> list[str]:
def _resolve_account_dir(account: Optional[str]) -> Path: def _resolve_account_dir(account: Optional[str]) -> Path:
"""解析账号目录,并进行路径安全校验(防止路径穿越)""" """解析账号目录,并进行路径安全校验(防止路径穿越)"""
output_db_dir = get_output_databases_dir()
accounts = _list_decrypted_accounts() accounts = _list_decrypted_accounts()
if not accounts: if not accounts:
raise HTTPException( raise HTTPException(
@@ -53,8 +54,8 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
) )
selected = account or accounts[0] selected = account or accounts[0]
base = _OUTPUT_DATABASES_DIR.resolve() base = output_db_dir.resolve()
candidate = (_OUTPUT_DATABASES_DIR / selected).resolve() candidate = (output_db_dir / selected).resolve()
if candidate != base and base not in candidate.parents: if candidate != base and base not in candidate.parents:
raise HTTPException(status_code=400, detail="Invalid account path.") raise HTTPException(status_code=400, detail="Invalid account path.")

View File

@@ -39,11 +39,17 @@ from ..chat_helpers import (
_make_snippet, _make_snippet,
_match_tokens, _match_tokens,
_load_contact_rows, _load_contact_rows,
_load_group_nickname_map_from_contact_db,
_load_usernames_by_display_names, _load_usernames_by_display_names,
_load_latest_message_previews, _load_latest_message_previews,
_build_group_sender_display_name_map,
_normalize_session_preview_text,
_extract_group_preview_sender_username,
_replace_preview_sender_prefix,
_lookup_resource_md5, _lookup_resource_md5,
_normalize_xml_url, _normalize_xml_url,
_parse_app_message, _parse_app_message,
_parse_system_message_content,
_parse_pat_message, _parse_pat_message,
_pick_display_name, _pick_display_name,
_query_head_image_usernames, _query_head_image_usernames,
@@ -69,6 +75,8 @@ from ..wcdb_realtime import (
WCDB_REALTIME, WCDB_REALTIME,
get_avatar_urls as _wcdb_get_avatar_urls, get_avatar_urls as _wcdb_get_avatar_urls,
get_display_names as _wcdb_get_display_names, get_display_names as _wcdb_get_display_names,
get_group_members as _wcdb_get_group_members,
get_group_nicknames as _wcdb_get_group_nicknames,
get_messages as _wcdb_get_messages, get_messages as _wcdb_get_messages,
get_sessions as _wcdb_get_sessions, get_sessions as _wcdb_get_sessions,
) )
@@ -97,6 +105,142 @@ def _avatar_url_unified(
return _build_avatar_url(str(account_dir.name or ""), u) return _build_avatar_url(str(account_dir.name or ""), u)
def _load_group_nickname_map_from_wcdb(
*,
account_dir: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
except Exception:
return {}
target_set = set(targets)
out: dict[str, str] = {}
try:
with wcdb_conn.lock:
nickname_map = _wcdb_get_group_nicknames(wcdb_conn.handle, chatroom)
for username, nickname in (nickname_map or {}).items():
su = str(username or "").strip()
nn = str(nickname or "").strip()
if su and nn and su in target_set:
out[su] = nn
except Exception:
pass
unresolved = [u for u in targets if u not in out]
if not unresolved:
return out
try:
with wcdb_conn.lock:
members = _wcdb_get_group_members(wcdb_conn.handle, chatroom)
except Exception:
return out
if not members:
return out
unresolved_set = set(unresolved)
for member in members:
try:
username = str(member.get("username") or "").strip()
except Exception:
username = ""
if (not username) or (username not in unresolved_set):
continue
nickname = ""
for key in ("nickname", "displayName", "remark", "originalName"):
try:
candidate = str(member.get(key) or "").strip()
except Exception:
candidate = ""
if candidate:
nickname = candidate
break
if nickname:
out[username] = nickname
return out
def _load_group_nickname_map(
*,
account_dir: Path,
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
"""Resolve group member nickname (group card) via WCDB and contact.db ext_buffer (best-effort)."""
contact_map: dict[str, str] = {}
try:
contact_map = _load_group_nickname_map_from_contact_db(
contact_db_path,
chatroom_id,
sender_usernames,
)
except Exception:
contact_map = {}
wcdb_map: dict[str, str] = {}
try:
wcdb_map = _load_group_nickname_map_from_wcdb(
account_dir=account_dir,
chatroom_id=chatroom_id,
sender_usernames=sender_usernames,
rt_conn=rt_conn,
)
except Exception:
wcdb_map = {}
if not contact_map and not wcdb_map:
return {}
# Merge: WCDB wins (newer DLLs may provide higher-quality group nicknames).
merged: dict[str, str] = {}
merged.update(contact_map)
merged.update(wcdb_map)
return merged
def _resolve_sender_display_name(
*,
sender_username: str,
sender_contact_rows: dict[str, sqlite3.Row],
wcdb_display_names: dict[str, str],
group_nicknames: Optional[dict[str, str]] = None,
) -> str:
su = str(sender_username or "").strip()
if not su:
return ""
gn = str((group_nicknames or {}).get(su) or "").strip()
if gn:
return gn
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
return display_name
def _realtime_sync_lock(account: str, username: str) -> threading.Lock: def _realtime_sync_lock(account: str, username: str) -> threading.Lock:
key = (str(account or "").strip(), str(username or "").strip()) key = (str(account or "").strip(), str(username or "").strip())
with _REALTIME_SYNC_MU: with _REALTIME_SYNC_MU:
@@ -557,8 +701,11 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
"draft", "draft",
"last_timestamp", "last_timestamp",
"sort_timestamp", "sort_timestamp",
"last_msg_locald_id",
"last_msg_type", "last_msg_type",
"last_msg_sub_type", "last_msg_sub_type",
"last_msg_sender",
"last_sender_display_name",
] ]
update_cols = [c for c in desired_cols if c in cols] update_cols = [c for c in desired_cols if c in cols]
if not update_cols: if not update_cols:
@@ -583,7 +730,15 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
continue continue
values: list[Any] = [] values: list[Any] = []
for c in update_cols: for c in update_cols:
if c in {"unread_count", "is_hidden", "last_timestamp", "sort_timestamp", "last_msg_type", "last_msg_sub_type"}: if c in {
"unread_count",
"is_hidden",
"last_timestamp",
"sort_timestamp",
"last_msg_locald_id",
"last_msg_type",
"last_msg_sub_type",
}:
values.append(_int((r or {}).get(c))) values.append(_int((r or {}).get(c)))
else: else:
values.append(_text((r or {}).get(c))) values.append(_text((r or {}).get(c)))
@@ -1510,8 +1665,17 @@ def sync_chat_realtime_messages_all(
"sort_timestamp", "sort_timestamp",
item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))), item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))),
), ),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)), "last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)), "last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
} }
# Prefer the row with the newer sort timestamp for the same username. # Prefer the row with the newer sort timestamp for the same username.
prev = realtime_rows_by_user.get(uname) prev = realtime_rows_by_user.get(uname)
@@ -2137,11 +2301,7 @@ def _append_full_messages_from_rows(
if local_type == 10000: if local_type == 10000:
render_type = "system" render_type = "system"
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 49: elif local_type == 49:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text") render_type = str(parsed.get("renderType") or "text")
@@ -2598,6 +2758,13 @@ def _postprocess_full_messages(
wcdb_display_names = {} wcdb_display_names = {}
wcdb_avatar_urls = {} wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged: for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name. # If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip(): if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -2613,13 +2780,12 @@ def _postprocess_full_messages(
su = str(m.get("senderUsername") or "") su = str(m.get("senderUsername") or "")
if not su: if not su:
continue continue
row = sender_contact_rows.get(su) m["senderDisplayName"] = _resolve_sender_display_name(
display_name = _pick_display_name(row, su) sender_username=su,
if display_name == su: sender_contact_rows=sender_contact_rows,
wd = str(wcdb_display_names.get(su) or "").strip() wcdb_display_names=wcdb_display_names,
if wd and wd != su: group_nicknames=group_nicknames,
display_name = wd )
m["senderDisplayName"] = display_name
avatar_url = base_url + _avatar_url_unified( avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir, account_dir=account_dir,
username=su, username=su,
@@ -2836,6 +3002,17 @@ def list_chat_sessions(
"sort_timestamp": item.get("sort_timestamp", item.get("sortTimestamp", item.get("last_timestamp", 0))), "sort_timestamp": item.get("sort_timestamp", item.get("sortTimestamp", item.get("last_timestamp", 0))),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)), "last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)), "last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
# Keep these fields so group session previews can render "sender: content" without
# crashing (realtime rows are dicts, not sqlite Rows).
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
} }
) )
@@ -2923,12 +3100,16 @@ def list_chat_sessions(
try: try:
need_display: list[str] = [] need_display: list[str] = []
need_avatar: list[str] = [] need_avatar: list[str] = []
if source_norm == "realtime":
# In realtime mode, always ask WCDB for display names: decrypted contact.db can be stale.
need_display = [str(u or "").strip() for u in usernames if str(u or "").strip()]
for u in usernames: for u in usernames:
if not u: if not u:
continue continue
row = contact_rows.get(u) if source_norm != "realtime":
if _pick_display_name(row, u) == u: row = contact_rows.get(u)
need_display.append(u) if _pick_display_name(row, u) == u:
need_display.append(u)
if source_norm == "realtime": if source_norm == "realtime":
# In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale). # In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale).
if u not in local_avatar_usernames: if u not in local_avatar_usernames:
@@ -2983,14 +3164,40 @@ def list_chat_sessions(
if v: if v:
last_previews[u] = v last_previews[u] = v
group_sender_display_names: dict[str, str] = _build_group_sender_display_name_map(
contact_db_path,
last_previews,
)
unresolved = []
for conv_username, preview_text in last_previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username and sender_username not in group_sender_display_names:
unresolved.append(sender_username)
unresolved = list(dict.fromkeys(unresolved))
if unresolved:
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
with wcdb_conn.lock:
wcdb_names = _wcdb_get_display_names(wcdb_conn.handle, unresolved)
for sender_username in unresolved:
wcdb_name = str(wcdb_names.get(sender_username) or "").strip()
if wcdb_name and wcdb_name != sender_username:
group_sender_display_names[sender_username] = wcdb_name
except Exception:
pass
sessions: list[dict[str, Any]] = [] sessions: list[dict[str, Any]] = []
for r in filtered: for r in filtered:
username = r["username"] username = r["username"]
c_row = contact_rows.get(username) c_row = contact_rows.get(username)
display_name = _pick_display_name(c_row, username) display_name = _pick_display_name(c_row, username)
if display_name == username: wd = str(wcdb_display_names.get(username) or "").strip()
wd = str(wcdb_display_names.get(username) or "").strip() if source_norm == "realtime" and wd and wd != username:
display_name = wd
elif display_name == username:
if wd and wd != username: if wd and wd != username:
display_name = wd display_name = wd
@@ -3046,6 +3253,37 @@ def list_chat_sessions(
if last_msg_type == 81604378673 or (last_msg_type == 49 and last_msg_sub_type == 19): if last_msg_type == 81604378673 or (last_msg_type == 49 and last_msg_sub_type == 19):
last_message = "[聊天记录]" last_message = "[聊天记录]"
last_message = _normalize_session_preview_text(
last_message,
is_group=bool(str(username or "").endswith("@chatroom")),
sender_display_names=group_sender_display_names,
)
if str(username or "").endswith("@chatroom") and str(last_message or "") and not str(last_message).startswith("[草稿]"):
# Prefer group card nickname when available. In realtime mode, WCDB session rows can provide
# `last_sender_display_name`, but we may still get a summary that doesn't include "sender:".
# Also guard against URL schemes like "https://..." being mis-parsed as "https: //...".
raw_sender_display = ""
try:
raw_sender_display = r["last_sender_display_name"]
except Exception:
try:
raw_sender_display = r.get("last_sender_display_name", "")
except Exception:
raw_sender_display = ""
sender_display = _decode_sqlite_text(raw_sender_display).strip()
if sender_display:
text = re.sub(r"\s+", " ", str(last_message or "").strip()).strip()
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if match:
prefix = str(match.group(1) or "").strip()
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if prefix.lower() in {"http", "https"} and body.startswith("//"):
last_message = f"{sender_display}: {text}"
else:
last_message = f"{sender_display}: {body}"
else:
last_message = f"{sender_display}: {text}"
last_time = _format_session_time(r["sort_timestamp"] or r["last_timestamp"]) last_time = _format_session_time(r["sort_timestamp"] or r["last_timestamp"])
sessions.append( sessions.append(
@@ -3248,13 +3486,7 @@ def _collect_chat_messages(
if local_type == 10000: if local_type == 10000:
render_type = "system" render_type = "system"
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 49: elif local_type == 49:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text") render_type = str(parsed.get("renderType") or "text")
@@ -3957,13 +4189,7 @@ def list_chat_messages(
if local_type == 10000: if local_type == 10000:
render_type = "system" render_type = "system"
if "revokemsg" in raw_text: content_text = _parse_system_message_content(raw_text)
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
elif local_type == 49: elif local_type == 49:
parsed = _parse_app_message(raw_text) parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text") render_type = str(parsed.get("renderType") or "text")
@@ -4412,6 +4638,13 @@ def list_chat_messages(
wcdb_display_names = {} wcdb_display_names = {}
wcdb_avatar_urls = {} wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged: for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name. # If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip(): if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -4427,13 +4660,12 @@ def list_chat_messages(
su = str(m.get("senderUsername") or "") su = str(m.get("senderUsername") or "")
if not su: if not su:
continue continue
row = sender_contact_rows.get(su) m["senderDisplayName"] = _resolve_sender_display_name(
display_name = _pick_display_name(row, su) sender_username=su,
if display_name == su: sender_contact_rows=sender_contact_rows,
wd = str(wcdb_display_names.get(su) or "").strip() wcdb_display_names=wcdb_display_names,
if wd and wd != su: group_nicknames=group_nicknames,
display_name = wd )
m["senderDisplayName"] = display_name
avatar_url = base_url + _avatar_url_unified( avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir, account_dir=account_dir,
username=su, username=su,
@@ -4930,19 +5162,24 @@ async def _search_chat_messages_via_fts(
username=username, username=username,
local_avatar_usernames=local_avatar_usernames, local_avatar_usernames=local_avatar_usernames,
) )
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in hits],
)
for h in hits: for h in hits:
su = str(h.get("senderUsername") or "").strip() su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name h["conversationName"] = conv_name
h["conversationAvatar"] = conv_avatar h["conversationAvatar"] = conv_avatar
if su: if su:
row = contact_rows.get(su) h["senderDisplayName"] = _resolve_sender_display_name(
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su) sender_username=su,
if display_name == su: sender_contact_rows=contact_rows,
wd = str(wcdb_display_names.get(su) or "").strip() wcdb_display_names=wcdb_display_names,
if wd and wd != su: group_nicknames=group_nicknames,
display_name = wd )
h["senderDisplayName"] = display_name
avatar_url = base_url + _avatar_url_unified( avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir, account_dir=account_dir,
username=su, username=su,
@@ -4986,6 +5223,23 @@ async def _search_chat_messages_via_fts(
wcdb_display_names = {} wcdb_display_names = {}
wcdb_avatar_urls = {} wcdb_avatar_urls = {}
group_senders_by_room: dict[str, list[str]] = {}
for h in hits:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in hits: for h in hits:
cu = str(h.get("username") or "").strip() cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip() su = str(h.get("senderUsername") or "").strip()
@@ -5003,13 +5257,12 @@ async def _search_chat_messages_via_fts(
) )
h["conversationAvatar"] = conv_avatar h["conversationAvatar"] = conv_avatar
if su: if su:
row = contact_rows.get(su) h["senderDisplayName"] = _resolve_sender_display_name(
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su) sender_username=su,
if display_name == su: sender_contact_rows=contact_rows,
wd = str(wcdb_display_names.get(su) or "").strip() wcdb_display_names=wcdb_display_names,
if wd and wd != su: group_nicknames=group_nickname_cache.get(cu, {}),
display_name = wd )
h["senderDisplayName"] = display_name
avatar_url = base_url + _avatar_url_unified( avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir, account_dir=account_dir,
username=su, username=su,
@@ -5272,13 +5525,23 @@ async def search_chat_messages(
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames) contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
conv_row = contact_rows.get(username) conv_row = contact_rows.get(username)
conv_name = _pick_display_name(conv_row, username) conv_name = _pick_display_name(conv_row, username)
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in page],
)
for h in page: for h in page:
su = str(h.get("senderUsername") or "").strip() su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name h["conversationName"] = conv_name
if su: if su:
row = contact_rows.get(su) h["senderDisplayName"] = _resolve_sender_display_name(
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su) sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nicknames,
)
return { return {
"status": "success", "status": "success",
@@ -5360,6 +5623,23 @@ async def search_chat_messages(
) )
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts) contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
group_senders_by_room: dict[str, list[str]] = {}
for h in page:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in page: for h in page:
cu = str(h.get("username") or "").strip() cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip() su = str(h.get("senderUsername") or "").strip()
@@ -5367,8 +5647,12 @@ async def search_chat_messages(
conv_name = _pick_display_name(crow, cu) if cu else "" conv_name = _pick_display_name(crow, cu) if cu else ""
h["conversationName"] = conv_name or cu h["conversationName"] = conv_name or cu
if su: if su:
row = contact_rows.get(su) h["senderDisplayName"] = _resolve_sender_display_name(
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su) sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nickname_cache.get(cu, {}),
)
return { return {
"status": "success", "status": "success",

View File

@@ -688,6 +688,83 @@ def _lookup_resource_md5_by_server_id(account_dir_str: str, server_id: int, want
pass pass
@lru_cache(maxsize=4096)
def _lookup_image_md5_by_server_id_from_messages(account_dir_str: str, server_id: int, username: str) -> str:
account_dir_str = str(account_dir_str or "").strip()
username = str(username or "").strip()
if not account_dir_str or not username:
return ""
try:
sid = int(server_id or 0)
except Exception:
sid = 0
if not sid:
return ""
try:
chat_hash = hashlib.md5(username.encode()).hexdigest()
except Exception:
return ""
if not chat_hash:
return ""
table_name = f"Msg_{chat_hash}"
account_dir = Path(account_dir_str)
db_paths: list[Path] = []
try:
for p in account_dir.glob("message_*.db"):
try:
if p.is_file():
db_paths.append(p)
except Exception:
continue
except Exception:
db_paths = []
if not db_paths:
return ""
db_paths.sort(key=lambda p: p.name)
for db_path in db_paths:
try:
conn = sqlite3.connect(str(db_path))
except Exception:
continue
try:
row = conn.execute(
f"SELECT local_type, packed_info_data FROM {table_name} "
"WHERE server_id = ? ORDER BY create_time DESC LIMIT 1",
(sid,),
).fetchone()
except Exception:
row = None
finally:
try:
conn.close()
except Exception:
pass
if not row:
continue
try:
local_type = int(row[0] or 0)
except Exception:
local_type = 0
if local_type != 3:
continue
md5 = _extract_md5_from_packed_info(row[1])
md5_norm = str(md5 or "").strip().lower()
if _is_valid_md5(md5_norm):
return md5_norm
return ""
def _is_safe_http_url(url: str) -> bool: def _is_safe_http_url(url: str) -> bool:
u = str(url or "").strip() u = str(url or "").strip()
if not u: if not u:
@@ -1062,6 +1139,12 @@ async def get_chat_image(
resource_md5 = _lookup_resource_md5_by_server_id(str(account_dir), int(server_id), want_local_type=3) resource_md5 = _lookup_resource_md5_by_server_id(str(account_dir), int(server_id), want_local_type=3)
if resource_md5: if resource_md5:
md5 = resource_md5 md5 = resource_md5
elif username:
md5_from_msg = _lookup_image_md5_by_server_id_from_messages(
str(account_dir), int(server_id), str(username)
)
if md5_from_msg:
md5 = md5_from_msg
# md5 模式:优先从解密资源目录读取(更快) # md5 模式:优先从解密资源目录读取(更快)
if md5: if md5:

View File

@@ -102,6 +102,17 @@ def _load_wcdb_lib() -> ctypes.CDLL:
lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)] lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_group_members.restype = ctypes.c_int lib.wcdb_get_group_members.restype = ctypes.c_int
# Optional (newer DLLs): wcdb_get_group_nicknames(handle, chatroom_id, out_json)
try:
lib.wcdb_get_group_nicknames.argtypes = [
ctypes.c_int64,
ctypes.c_char_p,
ctypes.POINTER(ctypes.c_char_p),
]
lib.wcdb_get_group_nicknames.restype = ctypes.c_int
except Exception:
pass
# Optional: execute arbitrary SQL on a selected database kind/path. # Optional: execute arbitrary SQL on a selected database kind/path.
# Signature: wcdb_exec_query(handle, kind, path, sql, out_json) # Signature: wcdb_exec_query(handle, kind, path, sql, out_json)
try: try:
@@ -355,6 +366,41 @@ def get_avatar_urls(handle: int, usernames: list[str]) -> dict[str, str]:
return {} return {}
def get_group_members(handle: int, chatroom_id: str) -> list[dict[str, Any]]:
_ensure_initialized()
lib = _load_wcdb_lib()
cid = str(chatroom_id or "").strip()
if not cid:
return []
out_json = _call_out_json(lib.wcdb_get_group_members, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, list):
out: list[dict[str, Any]] = []
for x in decoded:
if isinstance(x, dict):
out.append(x)
return out
return []
def get_group_nicknames(handle: int, chatroom_id: str) -> dict[str, str]:
_ensure_initialized()
lib = _load_wcdb_lib()
fn = getattr(lib, "wcdb_get_group_nicknames", None)
if not fn:
return {}
cid = str(chatroom_id or "").strip()
if not cid:
return {}
out_json = _call_out_json(fn, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, dict):
return {str(k): str(v) for k, v in decoded.items()}
return {}
def exec_query(handle: int, *, kind: str, path: Optional[str], sql: str) -> list[dict[str, Any]]: def exec_query(handle: int, *, kind: str, path: Optional[str], sql: str) -> list[dict[str, Any]]:
"""Execute raw SQL on a specific db kind/path via WCDB. """Execute raw SQL on a specific db kind/path via WCDB.