Compare commits

...

3 Commits

  • test(chat): 覆盖系统撤回/群名片/实时会话同步相关用例
    - 新增系统撤回消息 replacemsg 解析与导出语义测试
    
    - 新增群聊会话预览格式化与群名片 ext_buffer 解析测试
    
    - 新增 realtime 会话列表与 sync_all 落库 last_sender_display_name 测试
  • feat(chat-ui): 会话列表未读提示与引用图片预览优化
    - 未读展示改为头像红点,并在 lastMessage 前缀展示未读条数
    
    - 引用消息支持图片缩略图预览,失败自动降级为纯文本引用
    
    - 规范化 quoteVoiceUrl/quoteImageUrl 生成,与后端 media 接口对齐
  • feat(chat): 群聊预览补齐群名片并完善系统消息解析
    - 新增系统撤回消息解析:优先提取 replacemsg,并统一清洗文本
    
    - 群聊会话预览文本规范化([表情] -> [动画表情]),并支持发送者前缀展示名替换
    
    - 群名片解析来源扩展:contact.db ext_buffer + WCDB realtime(可选新 DLL 接口)
    
    - 图片接口增强:支持 server_id + username 反查消息提取 md5,提升引用图片命中
13 changed files with 1383 additions and 112 deletions
+89 -32
View File
@@ -242,14 +242,20 @@
@click="selectContact(contact)">
<div class="flex items-center space-x-3 w-full">
<!-- 联系人头像 -->
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300 flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
<div v-if="contact.avatar" class="w-full h-full">
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
</div>
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
{{ contact.name.charAt(0) }}
<div class="relative flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300">
<div v-if="contact.avatar" class="w-full h-full">
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
</div>
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
{{ contact.name.charAt(0) }}
</div>
</div>
<span
v-if="contact.unreadCount > 0"
class="absolute z-10 -top-[calc(4px/var(--dpr))] -right-[calc(4px/var(--dpr))] w-[calc(10px/var(--dpr))] h-[calc(10px/var(--dpr))] bg-[#ed4d4d] rounded-full"
></span>
</div>
<!-- 联系人信息 -->
@@ -257,13 +263,12 @@
<div class="flex items-center justify-between">
<h3 class="text-sm font-medium text-gray-900 truncate" :class="{ 'privacy-blur': privacyMode }">{{ contact.name }}</h3>
<div class="flex items-center flex-shrink-0 ml-2">
<span v-if="contact.unreadCount > 0" class="text-[10px] text-white bg-red-500 rounded-full min-w-[18px] h-[18px] px-1 flex items-center justify-center mr-2">
{{ contact.unreadCount > 99 ? '99+' : contact.unreadCount }}
</span>
<span class="text-xs text-gray-500">{{ contact.lastMessageTime }}</span>
</div>
</div>
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">{{ contact.lastMessage }}</p>
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">
{{ contact.unreadCount > 0 ? `[${contact.unreadCount > 99 ? '99+' : contact.unreadCount}条] ` : '' }}{{ contact.lastMessage }}
</p>
</div>
</div>
</div>
@@ -616,15 +621,15 @@
<img v-else :src="seg.emojiSrc" :alt="seg.content" class="inline-block w-[1.25em] h-[1.25em] align-text-bottom mx-px">
</span>
</div>
<div
v-if="message.quoteTitle || message.quoteContent"
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[61px] flex items-center bg-[#e1e1e1]">
<div class="py-2 min-w-0 w-full">
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
<button
type="button"
class="flex items-center gap-1 min-w-0 hover:opacity-80"
<div
v-if="message.quoteTitle || message.quoteContent"
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[65px] overflow-hidden flex items-start bg-[#e1e1e1]">
<div class="py-2 min-w-0 flex-1">
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
<button
type="button"
class="flex items-center gap-1 min-w-0 hover:opacity-80"
:disabled="!message.quoteVoiceUrl"
:class="!message.quoteVoiceUrl ? 'opacity-60 cursor-not-allowed' : ''"
@click.stop="message.quoteVoiceUrl && playQuoteVoice(message)"
@@ -647,13 +652,35 @@
:ref="el => setVoiceRef(getQuoteVoiceId(message), el)"
:src="message.quoteVoiceUrl"
preload="none"
class="hidden"
></audio>
</div>
<div v-else class="line-clamp-2">{{ message.quoteTitle ? (message.quoteTitle + ': ') : '' }}{{ message.quoteContent }}</div>
</div>
</div>
</template>
class="hidden"
></audio>
</div>
<div v-else class="line-clamp-2">
<span v-if="message.quoteTitle">{{ message.quoteTitle }}:</span>
<span
v-if="message.quoteContent && !(isQuotedImage(message) && message.quoteTitle && message.quoteImageUrl && !message._quoteImageError)"
:class="message.quoteTitle ? 'ml-1' : ''"
>
{{ message.quoteContent }}
</span>
</div>
</div>
<div
v-if="isQuotedImage(message) && message.quoteImageUrl && !message._quoteImageError"
class="ml-2 my-2 flex-shrink-0 w-[45px] h-[45px] rounded overflow-hidden cursor-pointer"
@click.stop="openImagePreview(message.quoteImageUrl)"
>
<img
:src="message.quoteImageUrl"
alt="引用图片"
class="w-full h-full object-contain"
loading="lazy"
decoding="async"
@error="onQuoteImageError(message)"
/>
</div>
</div>
</template>
<!-- 合并转发聊天记录Chat History -->
<div
v-else-if="message.renderType === 'chatHistory'"
@@ -3464,6 +3491,19 @@ const isQuotedVoice = (message) => {
return false
}
const isQuotedImage = (message) => {
const t = String(message?.quoteType || '').trim()
if (t === '3') return true
if (String(message?.quoteContent || '').trim() === '[图片]' && String(message?.quoteServerId || '').trim()) return true
return false
}
const onQuoteImageError = (message) => {
try {
if (message) message._quoteImageError = true
} catch {}
}
const playQuoteVoice = (message) => {
playVoice({ id: getQuoteVoiceId(message) })
}
@@ -4665,6 +4705,23 @@ const normalizeMessage = (msg) => {
}
}
const quoteServerIdStr = String(msg.quoteServerId || '').trim()
const quoteTypeStr = String(msg.quoteType || '').trim()
const quoteVoiceUrl = quoteServerIdStr
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(quoteServerIdStr)}`
: ''
const quoteImageUrl = (() => {
if (!quoteServerIdStr) return ''
if (quoteTypeStr !== '3' && String(msg.quoteContent || '').trim() !== '[图片]') return ''
const convUsername = String(selectedContact.value?.username || '').trim()
const parts = [
`account=${encodeURIComponent(selectedAccount.value || '')}`,
`server_id=${encodeURIComponent(quoteServerIdStr)}`,
convUsername ? `username=${encodeURIComponent(convUsername)}` : ''
].filter(Boolean)
return parts.length ? `${mediaBase}/api/chat/media/image?${parts.join('&')}` : ''
})()
return {
id: msg.id,
serverId: msg.serverId || 0,
@@ -4701,12 +4758,12 @@ const normalizeMessage = (msg) => {
quoteTitle: msg.quoteTitle || '',
quoteContent,
quoteUsername: msg.quoteUsername || '',
quoteServerId: String(msg.quoteServerId || '').trim(),
quoteType: String(msg.quoteType || '').trim(),
quoteServerId: quoteServerIdStr,
quoteType: quoteTypeStr,
quoteVoiceLength: msg.quoteVoiceLength || '',
quoteVoiceUrl: String(msg.quoteServerId || '').trim()
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(String(msg.quoteServerId || '').trim())}`
: '',
quoteVoiceUrl,
quoteImageUrl: quoteImageUrl || '',
_quoteImageError: false,
amount: msg.amount || '',
coverUrl: msg.coverUrl || '',
fileSize: msg.fileSize || '',
@@ -28,6 +28,7 @@ from .chat_helpers import (
_load_contact_rows,
_lookup_resource_md5,
_parse_app_message,
_parse_system_message_content,
_parse_pat_message,
_pick_display_name,
_quote_ident,
@@ -954,13 +955,7 @@ def _parse_message_for_export(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re as _re
content_text = _re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = _re.sub(r"\\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
+337 -11
View File
@@ -645,6 +645,43 @@ def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str:
return _extract_xml_attr(xml_text, name)
def _parse_system_message_content(raw_text: str) -> str:
text = str(raw_text or "").strip()
if not text:
return "[系统消息]"
def _clean_system_text(value: str) -> str:
candidate = str(value or "").strip()
if not candidate:
return ""
nested_content = _extract_xml_tag_text(candidate, "content")
if nested_content:
candidate = nested_content
candidate = re.sub(r"<!\[CDATA\[", "", candidate, flags=re.IGNORECASE)
candidate = re.sub(r"\]\]>", "", candidate)
candidate = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", candidate)
candidate = re.sub(r"\s+", " ", candidate).strip()
return candidate
if "revokemsg" in text.lower():
replace_msg = _extract_xml_tag_text(text, "replacemsg")
cleaned_replace_msg = _clean_system_text(replace_msg)
if cleaned_replace_msg:
return cleaned_replace_msg
revoke_msg = _extract_xml_tag_text(text, "revokemsg")
cleaned_revoke_msg = _clean_system_text(revoke_msg)
if cleaned_revoke_msg:
return cleaned_revoke_msg
return "撤回了一条消息"
content_text = _clean_system_text(text)
return content_text or "[系统消息]"
def _extract_refermsg_block(xml_text: str) -> str:
if not xml_text:
return ""
@@ -1053,11 +1090,7 @@ def _build_latest_message_preview(
content_text = ""
if local_type == 10000:
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 244813135921:
parsed = _parse_app_message(raw_text)
qt = str(parsed.get("quoteTitle") or "").strip()
@@ -1093,7 +1126,7 @@ def _build_latest_message_preview(
elif local_type == 43 or local_type == 62:
content_text = "[视频]"
elif local_type == 47:
content_text = "[表情]"
content_text = "[动画表情]"
else:
if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
content_text = raw_text
@@ -1107,6 +1140,101 @@ def _build_latest_message_preview(
return content_text
def _extract_group_preview_sender_username(preview_text: str) -> str:
text = str(preview_text or "").strip()
if not text:
return ""
match = re.match(r"^([^:\s]{1,128}):\s*.+$", text)
if not match:
return ""
sender = str(match.group(1) or "").strip()
if not sender:
return ""
if sender.startswith("wxid_") or sender.endswith("@chatroom") or ("@" in sender):
return sender
if re.fullmatch(r"[A-Za-z][A-Za-z0-9_-]{1,127}", sender):
return sender
return ""
def _normalize_session_preview_text(
preview_text: str,
*,
is_group: bool,
sender_display_names: Optional[dict[str, str]] = None,
) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
text = text.replace("[表情]", "[动画表情]")
if (not is_group) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\s]{1,128}):\s*(.+)$", text)
if not match:
return text
sender_username = str(match.group(1) or "").strip()
body = str(match.group(2) or "").strip()
if (not sender_username) or (not body):
return text
display_name = str((sender_display_names or {}).get(sender_username) or "").strip()
if display_name and display_name != sender_username:
return f"{display_name}: {body}"
return text
def _replace_preview_sender_prefix(preview_text: str, sender_display_name: str) -> str:
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
if not text:
return ""
display_name = str(sender_display_name or "").strip()
if (not display_name) or text.startswith("[草稿]"):
return text
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if not match:
return text
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if not body:
return text
return f"{display_name}: {body}"
def _build_group_sender_display_name_map(
contact_db_path: Path,
previews: dict[str, str],
) -> dict[str, str]:
group_sender_usernames: set[str] = set()
for conv_username, preview_text in previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username:
group_sender_usernames.add(sender_username)
if not group_sender_usernames:
return {}
display_names: dict[str, str] = {}
sender_contact_rows = _load_contact_rows(contact_db_path, list(group_sender_usernames))
for sender_username in group_sender_usernames:
row = sender_contact_rows.get(sender_username)
if row is None:
continue
display_name = _pick_display_name(row, sender_username)
if display_name and display_name != sender_username:
display_names[sender_username] = display_name
return display_names
def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
if not usernames:
return {}
@@ -1338,6 +1466,208 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
conn.close()
def _load_group_nickname_map_from_contact_db(
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
) -> dict[str, str]:
"""Best-effort mapping for group member nickname (aka group card) from contact.db.
WeChat stores per-chatroom member nicknames in `contact.db.chat_room.ext_buffer` as a protobuf-like blob.
This helper parses that blob and returns { sender_username -> group_nickname } for the requested senders.
Notes:
- Best-effort: never raises; returns {} on any failure.
- Only resolves usernames included in `sender_usernames` to keep parsing cheap.
"""
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
target_set = set(targets)
def decode_varint(raw: bytes, offset: int) -> tuple[Optional[int], int]:
value = 0
shift = 0
pos = int(offset)
n = len(raw)
while pos < n:
byte = raw[pos]
pos += 1
value |= (byte & 0x7F) << shift
if (byte & 0x80) == 0:
return value, pos
shift += 7
if shift > 63:
return None, n
return None, n
def iter_fields(raw: bytes):
idx = 0
n = len(raw)
while idx < n:
tag, idx_next = decode_varint(raw, idx)
if tag is None or idx_next <= idx:
break
idx = idx_next
field_no = int(tag) >> 3
wire_type = int(tag) & 0x7
if wire_type == 0:
_, idx_next = decode_varint(raw, idx)
if idx_next <= idx:
break
idx = idx_next
continue
if wire_type == 2:
size, idx_next = decode_varint(raw, idx)
if size is None or idx_next <= idx:
break
idx = idx_next
end = idx + int(size)
if end > n:
break
chunk = raw[idx:end]
idx = end
yield field_no, wire_type, chunk
continue
if wire_type == 1:
idx += 8
continue
if wire_type == 5:
idx += 4
continue
break
def is_strong_username_hint(s: str) -> bool:
v = str(s or "").strip()
return v.startswith("wxid_") or v.endswith("@chatroom") or v.startswith("gh_") or ("@" in v)
def looks_like_username(s: str) -> bool:
v = str(s or "").strip()
if not v:
return False
if is_strong_username_hint(v):
return True
# Common alias-style WeChat IDs are ASCII-ish and do not contain whitespace.
if len(v) < 6 or len(v) > 32:
return False
if re.search(r"\s", v):
return False
if not re.match(r"^[A-Za-z][A-Za-z0-9_-]+$", v):
return False
if v.isdigit():
return False
return True
def pick_display(strings: list[tuple[int, str]], target: str) -> str:
best_score = -1
best = ""
for i, (fno, value) in enumerate(strings):
v = str(value or "").strip()
if (not v) or v == target:
continue
if is_strong_username_hint(v):
continue
if "\n" in v or "\r" in v:
continue
if len(v) > 64:
continue
score = 0
if int(fno) == 2:
score += 100
if not looks_like_username(v):
score += 20
score += max(0, 32 - len(v))
# Stable tie-breaker: prefer earlier appearance.
score = score * 1000 - i
if score > best_score:
best_score = score
best = v
return best
try:
conn = sqlite3.connect(str(contact_db_path))
except Exception:
return {}
try:
row = conn.execute(
"SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1",
(chatroom,),
).fetchone()
if row is None:
return {}
ext = row[0]
if ext is None:
return {}
if isinstance(ext, memoryview):
ext_buf = ext.tobytes()
elif isinstance(ext, (bytes, bytearray)):
ext_buf = bytes(ext)
else:
return {}
if not ext_buf:
return {}
out: dict[str, str] = {}
for _, wire_type, chunk in iter_fields(ext_buf):
if wire_type != 2 or (not chunk):
continue
# Parse submessage and collect UTF-8 strings.
strings: list[tuple[int, str]] = []
try:
for sfno, swire, sval in iter_fields(chunk):
if swire != 2:
continue
if not sval:
continue
if len(sval) > 256:
continue
try:
txt = bytes(sval).decode("utf-8", errors="strict")
except Exception:
continue
txt = txt.strip()
if not txt:
continue
strings.append((int(sfno), txt))
except Exception:
continue
if not strings:
continue
present = [v for _, v in strings if v in target_set and v not in out]
if not present:
continue
for target in present:
disp = pick_display(strings, target)
if disp:
out[target] = disp
if len(out) >= len(target_set):
break
return out
except Exception:
return {}
finally:
try:
conn.close()
except Exception:
pass
def _load_usernames_by_display_names(contact_db_path: Path, names: list[str]) -> dict[str, str]:
"""Best-effort mapping from display name -> username using contact.db.
@@ -1515,11 +1845,7 @@ def _row_to_search_hit(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
+6 -5
View File
@@ -23,17 +23,17 @@ logger = get_logger(__name__)
# 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录)
_OUTPUT_DATABASES_DIR = get_output_databases_dir()
_PACKAGE_ROOT = Path(__file__).resolve().parent
def _list_decrypted_accounts() -> list[str]:
"""列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)"""
if not _OUTPUT_DATABASES_DIR.exists():
output_db_dir = get_output_databases_dir()
if not output_db_dir.exists():
return []
accounts: list[str] = []
for p in _OUTPUT_DATABASES_DIR.iterdir():
for p in output_db_dir.iterdir():
if not p.is_dir():
continue
if (p / "session.db").exists() and (p / "contact.db").exists():
@@ -45,6 +45,7 @@ def _list_decrypted_accounts() -> list[str]:
def _resolve_account_dir(account: Optional[str]) -> Path:
"""解析账号目录,并进行路径安全校验(防止路径穿越)"""
output_db_dir = get_output_databases_dir()
accounts = _list_decrypted_accounts()
if not accounts:
raise HTTPException(
@@ -53,8 +54,8 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
)
selected = account or accounts[0]
base = _OUTPUT_DATABASES_DIR.resolve()
candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
base = output_db_dir.resolve()
candidate = (output_db_dir / selected).resolve()
if candidate != base and base not in candidate.parents:
raise HTTPException(status_code=400, detail="Invalid account path.")
+341 -57
View File
@@ -39,11 +39,17 @@ from ..chat_helpers import (
_make_snippet,
_match_tokens,
_load_contact_rows,
_load_group_nickname_map_from_contact_db,
_load_usernames_by_display_names,
_load_latest_message_previews,
_build_group_sender_display_name_map,
_normalize_session_preview_text,
_extract_group_preview_sender_username,
_replace_preview_sender_prefix,
_lookup_resource_md5,
_normalize_xml_url,
_parse_app_message,
_parse_system_message_content,
_parse_pat_message,
_pick_display_name,
_query_head_image_usernames,
@@ -69,6 +75,8 @@ from ..wcdb_realtime import (
WCDB_REALTIME,
get_avatar_urls as _wcdb_get_avatar_urls,
get_display_names as _wcdb_get_display_names,
get_group_members as _wcdb_get_group_members,
get_group_nicknames as _wcdb_get_group_nicknames,
get_messages as _wcdb_get_messages,
get_sessions as _wcdb_get_sessions,
)
@@ -97,6 +105,142 @@ def _avatar_url_unified(
return _build_avatar_url(str(account_dir.name or ""), u)
def _load_group_nickname_map_from_wcdb(
*,
account_dir: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
chatroom = str(chatroom_id or "").strip()
if not chatroom.endswith("@chatroom"):
return {}
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
if not targets:
return {}
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
except Exception:
return {}
target_set = set(targets)
out: dict[str, str] = {}
try:
with wcdb_conn.lock:
nickname_map = _wcdb_get_group_nicknames(wcdb_conn.handle, chatroom)
for username, nickname in (nickname_map or {}).items():
su = str(username or "").strip()
nn = str(nickname or "").strip()
if su and nn and su in target_set:
out[su] = nn
except Exception:
pass
unresolved = [u for u in targets if u not in out]
if not unresolved:
return out
try:
with wcdb_conn.lock:
members = _wcdb_get_group_members(wcdb_conn.handle, chatroom)
except Exception:
return out
if not members:
return out
unresolved_set = set(unresolved)
for member in members:
try:
username = str(member.get("username") or "").strip()
except Exception:
username = ""
if (not username) or (username not in unresolved_set):
continue
nickname = ""
for key in ("nickname", "displayName", "remark", "originalName"):
try:
candidate = str(member.get(key) or "").strip()
except Exception:
candidate = ""
if candidate:
nickname = candidate
break
if nickname:
out[username] = nickname
return out
def _load_group_nickname_map(
*,
account_dir: Path,
contact_db_path: Path,
chatroom_id: str,
sender_usernames: list[str],
rt_conn=None,
) -> dict[str, str]:
"""Resolve group member nickname (group card) via WCDB and contact.db ext_buffer (best-effort)."""
contact_map: dict[str, str] = {}
try:
contact_map = _load_group_nickname_map_from_contact_db(
contact_db_path,
chatroom_id,
sender_usernames,
)
except Exception:
contact_map = {}
wcdb_map: dict[str, str] = {}
try:
wcdb_map = _load_group_nickname_map_from_wcdb(
account_dir=account_dir,
chatroom_id=chatroom_id,
sender_usernames=sender_usernames,
rt_conn=rt_conn,
)
except Exception:
wcdb_map = {}
if not contact_map and not wcdb_map:
return {}
# Merge: WCDB wins (newer DLLs may provide higher-quality group nicknames).
merged: dict[str, str] = {}
merged.update(contact_map)
merged.update(wcdb_map)
return merged
def _resolve_sender_display_name(
*,
sender_username: str,
sender_contact_rows: dict[str, sqlite3.Row],
wcdb_display_names: dict[str, str],
group_nicknames: Optional[dict[str, str]] = None,
) -> str:
su = str(sender_username or "").strip()
if not su:
return ""
gn = str((group_nicknames or {}).get(su) or "").strip()
if gn:
return gn
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
return display_name
def _realtime_sync_lock(account: str, username: str) -> threading.Lock:
key = (str(account or "").strip(), str(username or "").strip())
with _REALTIME_SYNC_MU:
@@ -557,8 +701,11 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
"draft",
"last_timestamp",
"sort_timestamp",
"last_msg_locald_id",
"last_msg_type",
"last_msg_sub_type",
"last_msg_sender",
"last_sender_display_name",
]
update_cols = [c for c in desired_cols if c in cols]
if not update_cols:
@@ -583,7 +730,15 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
continue
values: list[Any] = []
for c in update_cols:
if c in {"unread_count", "is_hidden", "last_timestamp", "sort_timestamp", "last_msg_type", "last_msg_sub_type"}:
if c in {
"unread_count",
"is_hidden",
"last_timestamp",
"sort_timestamp",
"last_msg_locald_id",
"last_msg_type",
"last_msg_sub_type",
}:
values.append(_int((r or {}).get(c)))
else:
values.append(_text((r or {}).get(c)))
@@ -1510,8 +1665,17 @@ def sync_chat_realtime_messages_all(
"sort_timestamp",
item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))),
),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
}
# Prefer the row with the newer sort timestamp for the same username.
prev = realtime_rows_by_user.get(uname)
@@ -2137,11 +2301,7 @@ def _append_full_messages_from_rows(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -2598,6 +2758,13 @@ def _postprocess_full_messages(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -2613,13 +2780,12 @@ def _postprocess_full_messages(
su = str(m.get("senderUsername") or "")
if not su:
continue
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
m["senderDisplayName"] = display_name
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -2836,6 +3002,17 @@ def list_chat_sessions(
"sort_timestamp": item.get("sort_timestamp", item.get("sortTimestamp", item.get("last_timestamp", 0))),
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
# Keep these fields so group session previews can render "sender: content" without
# crashing (realtime rows are dicts, not sqlite Rows).
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
"last_sender_display_name": item.get(
"last_sender_display_name",
item.get("lastSenderDisplayName", ""),
),
"last_msg_locald_id": item.get(
"last_msg_locald_id",
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
),
}
)
@@ -2923,12 +3100,16 @@ def list_chat_sessions(
try:
need_display: list[str] = []
need_avatar: list[str] = []
if source_norm == "realtime":
# In realtime mode, always ask WCDB for display names: decrypted contact.db can be stale.
need_display = [str(u or "").strip() for u in usernames if str(u or "").strip()]
for u in usernames:
if not u:
continue
row = contact_rows.get(u)
if _pick_display_name(row, u) == u:
need_display.append(u)
if source_norm != "realtime":
row = contact_rows.get(u)
if _pick_display_name(row, u) == u:
need_display.append(u)
if source_norm == "realtime":
# In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale).
if u not in local_avatar_usernames:
@@ -2983,14 +3164,40 @@ def list_chat_sessions(
if v:
last_previews[u] = v
group_sender_display_names: dict[str, str] = _build_group_sender_display_name_map(
contact_db_path,
last_previews,
)
unresolved = []
for conv_username, preview_text in last_previews.items():
if not str(conv_username or "").endswith("@chatroom"):
continue
sender_username = _extract_group_preview_sender_username(preview_text)
if sender_username and sender_username not in group_sender_display_names:
unresolved.append(sender_username)
unresolved = list(dict.fromkeys(unresolved))
if unresolved:
try:
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
with wcdb_conn.lock:
wcdb_names = _wcdb_get_display_names(wcdb_conn.handle, unresolved)
for sender_username in unresolved:
wcdb_name = str(wcdb_names.get(sender_username) or "").strip()
if wcdb_name and wcdb_name != sender_username:
group_sender_display_names[sender_username] = wcdb_name
except Exception:
pass
sessions: list[dict[str, Any]] = []
for r in filtered:
username = r["username"]
c_row = contact_rows.get(username)
display_name = _pick_display_name(c_row, username)
if display_name == username:
wd = str(wcdb_display_names.get(username) or "").strip()
wd = str(wcdb_display_names.get(username) or "").strip()
if source_norm == "realtime" and wd and wd != username:
display_name = wd
elif display_name == username:
if wd and wd != username:
display_name = wd
@@ -3046,6 +3253,37 @@ def list_chat_sessions(
if last_msg_type == 81604378673 or (last_msg_type == 49 and last_msg_sub_type == 19):
last_message = "[聊天记录]"
last_message = _normalize_session_preview_text(
last_message,
is_group=bool(str(username or "").endswith("@chatroom")),
sender_display_names=group_sender_display_names,
)
if str(username or "").endswith("@chatroom") and str(last_message or "") and not str(last_message).startswith("[草稿]"):
# Prefer group card nickname when available. In realtime mode, WCDB session rows can provide
# `last_sender_display_name`, but we may still get a summary that doesn't include "sender:".
# Also guard against URL schemes like "https://..." being mis-parsed as "https: //...".
raw_sender_display = ""
try:
raw_sender_display = r["last_sender_display_name"]
except Exception:
try:
raw_sender_display = r.get("last_sender_display_name", "")
except Exception:
raw_sender_display = ""
sender_display = _decode_sqlite_text(raw_sender_display).strip()
if sender_display:
text = re.sub(r"\s+", " ", str(last_message or "").strip()).strip()
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
if match:
prefix = str(match.group(1) or "").strip()
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
if prefix.lower() in {"http", "https"} and body.startswith("//"):
last_message = f"{sender_display}: {text}"
else:
last_message = f"{sender_display}: {body}"
else:
last_message = f"{sender_display}: {text}"
last_time = _format_session_time(r["sort_timestamp"] or r["last_timestamp"])
sessions.append(
@@ -3248,13 +3486,7 @@ def _collect_chat_messages(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -3957,13 +4189,7 @@ def list_chat_messages(
if local_type == 10000:
render_type = "system"
if "revokemsg" in raw_text:
content_text = "撤回了一条消息"
else:
import re
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
content_text = _parse_system_message_content(raw_text)
elif local_type == 49:
parsed = _parse_app_message(raw_text)
render_type = str(parsed.get("renderType") or "text")
@@ -4412,6 +4638,13 @@ def list_chat_messages(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=uniq_senders,
)
for m in merged:
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
@@ -4427,13 +4660,12 @@ def list_chat_messages(
su = str(m.get("senderUsername") or "")
if not su:
continue
row = sender_contact_rows.get(su)
display_name = _pick_display_name(row, su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
m["senderDisplayName"] = display_name
m["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=sender_contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -4930,19 +5162,24 @@ async def _search_chat_messages_via_fts(
username=username,
local_avatar_usernames=local_avatar_usernames,
)
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in hits],
)
for h in hits:
su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name
h["conversationAvatar"] = conv_avatar
if su:
row = contact_rows.get(su)
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
h["senderDisplayName"] = display_name
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nicknames,
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -4986,6 +5223,23 @@ async def _search_chat_messages_via_fts(
wcdb_display_names = {}
wcdb_avatar_urls = {}
group_senders_by_room: dict[str, list[str]] = {}
for h in hits:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in hits:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
@@ -5003,13 +5257,12 @@ async def _search_chat_messages_via_fts(
)
h["conversationAvatar"] = conv_avatar
if su:
row = contact_rows.get(su)
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
if display_name == su:
wd = str(wcdb_display_names.get(su) or "").strip()
if wd and wd != su:
display_name = wd
h["senderDisplayName"] = display_name
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names=wcdb_display_names,
group_nicknames=group_nickname_cache.get(cu, {}),
)
avatar_url = base_url + _avatar_url_unified(
account_dir=account_dir,
username=su,
@@ -5272,13 +5525,23 @@ async def search_chat_messages(
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
conv_row = contact_rows.get(username)
conv_name = _pick_display_name(conv_row, username)
group_nicknames = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=username,
sender_usernames=[str(x.get("senderUsername") or "") for x in page],
)
for h in page:
su = str(h.get("senderUsername") or "").strip()
h["conversationName"] = conv_name
if su:
row = contact_rows.get(su)
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nicknames,
)
return {
"status": "success",
@@ -5360,6 +5623,23 @@ async def search_chat_messages(
)
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
group_senders_by_room: dict[str, list[str]] = {}
for h in page:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
if (not cu.endswith("@chatroom")) or (not su):
continue
group_senders_by_room.setdefault(cu, []).append(su)
group_nickname_cache: dict[str, dict[str, str]] = {}
for cu, senders in group_senders_by_room.items():
group_nickname_cache[cu] = _load_group_nickname_map(
account_dir=account_dir,
contact_db_path=contact_db_path,
chatroom_id=cu,
sender_usernames=senders,
)
for h in page:
cu = str(h.get("username") or "").strip()
su = str(h.get("senderUsername") or "").strip()
@@ -5367,8 +5647,12 @@ async def search_chat_messages(
conv_name = _pick_display_name(crow, cu) if cu else ""
h["conversationName"] = conv_name or cu
if su:
row = contact_rows.get(su)
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
h["senderDisplayName"] = _resolve_sender_display_name(
sender_username=su,
sender_contact_rows=contact_rows,
wcdb_display_names={},
group_nicknames=group_nickname_cache.get(cu, {}),
)
return {
"status": "success",
@@ -688,6 +688,83 @@ def _lookup_resource_md5_by_server_id(account_dir_str: str, server_id: int, want
pass
@lru_cache(maxsize=4096)
def _lookup_image_md5_by_server_id_from_messages(account_dir_str: str, server_id: int, username: str) -> str:
account_dir_str = str(account_dir_str or "").strip()
username = str(username or "").strip()
if not account_dir_str or not username:
return ""
try:
sid = int(server_id or 0)
except Exception:
sid = 0
if not sid:
return ""
try:
chat_hash = hashlib.md5(username.encode()).hexdigest()
except Exception:
return ""
if not chat_hash:
return ""
table_name = f"Msg_{chat_hash}"
account_dir = Path(account_dir_str)
db_paths: list[Path] = []
try:
for p in account_dir.glob("message_*.db"):
try:
if p.is_file():
db_paths.append(p)
except Exception:
continue
except Exception:
db_paths = []
if not db_paths:
return ""
db_paths.sort(key=lambda p: p.name)
for db_path in db_paths:
try:
conn = sqlite3.connect(str(db_path))
except Exception:
continue
try:
row = conn.execute(
f"SELECT local_type, packed_info_data FROM {table_name} "
"WHERE server_id = ? ORDER BY create_time DESC LIMIT 1",
(sid,),
).fetchone()
except Exception:
row = None
finally:
try:
conn.close()
except Exception:
pass
if not row:
continue
try:
local_type = int(row[0] or 0)
except Exception:
local_type = 0
if local_type != 3:
continue
md5 = _extract_md5_from_packed_info(row[1])
md5_norm = str(md5 or "").strip().lower()
if _is_valid_md5(md5_norm):
return md5_norm
return ""
def _is_safe_http_url(url: str) -> bool:
u = str(url or "").strip()
if not u:
@@ -1062,6 +1139,12 @@ async def get_chat_image(
resource_md5 = _lookup_resource_md5_by_server_id(str(account_dir), int(server_id), want_local_type=3)
if resource_md5:
md5 = resource_md5
elif username:
md5_from_msg = _lookup_image_md5_by_server_id_from_messages(
str(account_dir), int(server_id), str(username)
)
if md5_from_msg:
md5 = md5_from_msg
# md5 模式:优先从解密资源目录读取(更快)
if md5:
+46
View File
@@ -102,6 +102,17 @@ def _load_wcdb_lib() -> ctypes.CDLL:
lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
lib.wcdb_get_group_members.restype = ctypes.c_int
# Optional (newer DLLs): wcdb_get_group_nicknames(handle, chatroom_id, out_json)
try:
lib.wcdb_get_group_nicknames.argtypes = [
ctypes.c_int64,
ctypes.c_char_p,
ctypes.POINTER(ctypes.c_char_p),
]
lib.wcdb_get_group_nicknames.restype = ctypes.c_int
except Exception:
pass
# Optional: execute arbitrary SQL on a selected database kind/path.
# Signature: wcdb_exec_query(handle, kind, path, sql, out_json)
try:
@@ -355,6 +366,41 @@ def get_avatar_urls(handle: int, usernames: list[str]) -> dict[str, str]:
return {}
def get_group_members(handle: int, chatroom_id: str) -> list[dict[str, Any]]:
_ensure_initialized()
lib = _load_wcdb_lib()
cid = str(chatroom_id or "").strip()
if not cid:
return []
out_json = _call_out_json(lib.wcdb_get_group_members, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, list):
out: list[dict[str, Any]] = []
for x in decoded:
if isinstance(x, dict):
out.append(x)
return out
return []
def get_group_nicknames(handle: int, chatroom_id: str) -> dict[str, str]:
_ensure_initialized()
lib = _load_wcdb_lib()
fn = getattr(lib, "wcdb_get_group_nicknames", None)
if not fn:
return {}
cid = str(chatroom_id or "").strip()
if not cid:
return {}
out_json = _call_out_json(fn, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
decoded = _safe_load_json(out_json)
if isinstance(decoded, dict):
return {str(k): str(v) for k, v in decoded.items()}
return {}
def exec_query(handle: int, *, kind: str, path: Optional[str], sql: str) -> list[dict[str, Any]]:
"""Execute raw SQL on a specific db kind/path via WCDB.
@@ -122,6 +122,16 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
(3, 1003, 49, 3, 2, 1735689603, '<msg><appmsg><type>2000</type><des>收到转账0.01元</des></appmsg></msg>', None),
(4, 1004, 1, 4, 2, 1735689604, '普通文本消息', None),
(5, 1005, 10000, 5, 2, 1735689605, '系统提示消息', None),
(
6,
1006,
10000,
6,
2,
1735689606,
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“测试好友”撤回了一条消息]]></replacemsg></revokemsg></sysmsg>',
None,
),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
@@ -413,6 +423,37 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_system_revoke_exports_readable_revoker_content(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
message_types=["system"],
include_media=False,
)
self.assertEqual(job.status, "done", msg=job.error)
payload, _, _ = self._load_export_payload(job.zip_path)
revoke_msg = next((m for m in payload.get("messages", []) if int(m.get("serverId") or 0) == 1006), None)
self.assertIsNotNone(revoke_msg)
self.assertEqual(str(revoke_msg.get("renderType") or ""), "system")
self.assertEqual(str(revoke_msg.get("content") or ""), "“测试好友”撤回了一条消息")
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,111 @@
import sqlite3
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
def _seed_session_db(session_db_path: Path) -> None:
conn = sqlite3.connect(str(session_db_path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT PRIMARY KEY,
unread_count INTEGER DEFAULT 0,
is_hidden INTEGER DEFAULT 0,
summary TEXT DEFAULT '',
draft TEXT DEFAULT '',
last_timestamp INTEGER DEFAULT 0,
sort_timestamp INTEGER DEFAULT 0,
last_msg_locald_id INTEGER DEFAULT 0,
last_msg_type INTEGER DEFAULT 0,
last_msg_sub_type INTEGER DEFAULT 0,
last_msg_sender TEXT DEFAULT '',
last_sender_display_name TEXT DEFAULT ''
)
"""
)
conn.commit()
finally:
conn.close()
class TestChatRealtimeSyncAllUpdatesSenderDisplayName(unittest.TestCase):
def test_sync_all_upserts_last_sender_display_name(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
_seed_session_db(account_dir / "session.db")
conn = _DummyConn()
sessions_rows = [
{
"username": "demo@chatroom",
"unread_count": 0,
"is_hidden": 0,
"summary": "hello",
"draft": "",
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片A",
"last_msg_locald_id": 777,
}
]
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
patch.object(chat_router, "_ensure_decrypted_message_tables", return_value={}),
patch.object(chat_router, "_should_keep_session", return_value=True),
):
resp = chat_router.sync_chat_realtime_messages_all(
_DummyRequest(),
account="acc",
max_scan=20,
include_hidden=True,
include_official=True,
)
self.assertEqual(resp.get("status"), "success")
db = sqlite3.connect(str(account_dir / "session.db"))
try:
row = db.execute(
"SELECT last_sender_display_name, last_msg_sender, last_msg_locald_id FROM SessionTable WHERE username = ? LIMIT 1",
("demo@chatroom",),
).fetchone()
finally:
db.close()
self.assertIsNotNone(row)
self.assertEqual(str(row[0] or ""), "群名片A")
self.assertEqual(str(row[1] or ""), "wxid_demo")
self.assertEqual(int(row[2] or 0), 777)
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,68 @@
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import (
_build_group_sender_display_name_map,
_normalize_session_preview_text,
_replace_preview_sender_prefix,
)
class TestChatSessionPreviewFormatting(unittest.TestCase):
def test_normalize_session_preview_emoji_label(self):
out = _normalize_session_preview_text("[表情]", is_group=False, sender_display_names={})
self.assertEqual(out, "[动画表情]")
def test_normalize_group_preview_sender_display_name(self):
out = _normalize_session_preview_text(
"wxid_u3gwceqvne2m22: [表情]",
is_group=True,
sender_display_names={"wxid_u3gwceqvne2m22": "食神"},
)
self.assertEqual(out, "食神: [动画表情]")
def test_build_group_sender_display_name_map_from_contact_db(self):
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?)",
("wxid_u3gwceqvne2m22", "", "食神", "", "", ""),
)
conn.commit()
finally:
conn.close()
mapping = _build_group_sender_display_name_map(
contact_db_path,
{"demo@chatroom": "wxid_u3gwceqvne2m22: [动画表情]"},
)
self.assertEqual(mapping.get("wxid_u3gwceqvne2m22"), "食神")
def test_replace_preview_sender_prefix_uses_group_nickname(self):
out = _replace_preview_sender_prefix("去码头整点🍟: [动画表情]", "麻辣香锅")
self.assertEqual(out, "麻辣香锅: [动画表情]")
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,103 @@
import sys
import threading
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import chat as chat_router
class _DummyRequest:
base_url = "http://testserver/"
class _DummyConn:
def __init__(self) -> None:
self.handle = 1
self.lock = threading.Lock()
class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
def _run(self, sessions_rows: list[dict]) -> dict:
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
conn = _DummyConn()
with (
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
patch.object(chat_router, "_load_contact_rows", return_value={}),
patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
patch.object(chat_router, "_should_keep_session", return_value=True),
patch.object(chat_router, "_avatar_url_unified", return_value="/avatar"),
):
return chat_router.list_chat_sessions(
_DummyRequest(),
account="acc",
limit=50,
include_hidden=True,
include_official=True,
preview="latest",
source="realtime",
)
def test_realtime_sessions_group_summary_prefixed_by_sender_display_name(self):
resp = self._run(
[
{
"username": "demo@chatroom",
"summary": "hello",
"draft": "",
"unread_count": 0,
"is_hidden": 0,
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片A",
}
]
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("lastMessage"), "群名片A: hello")
def test_realtime_sessions_group_url_summary_keeps_scheme(self):
resp = self._run(
[
{
"username": "url@chatroom",
"summary": "https://example.com/x",
"draft": "",
"unread_count": 0,
"is_hidden": 0,
"last_timestamp": 123,
"sort_timestamp": 123,
"last_msg_type": 1,
"last_msg_sub_type": 0,
"last_msg_sender": "wxid_demo",
"last_sender_display_name": "群名片B",
}
]
)
self.assertEqual(resp.get("status"), "success")
sessions = resp.get("sessions") or []
self.assertEqual(len(sessions), 1)
self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
if __name__ == "__main__":
unittest.main()
+42
View File
@@ -0,0 +1,42 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_system_message_content
class TestChatSystemMessageParsing(unittest.TestCase):
def test_extract_replacemsg_for_revoke(self):
raw_text = (
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“张三”撤回了一条消息]]>'
"</replacemsg></revokemsg></sysmsg>"
)
self.assertEqual(_parse_system_message_content(raw_text), "“张三”撤回了一条消息")
def test_extract_nested_content_in_replacemsg(self):
raw_text = (
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA['
'<content>"黄智欢" 撤回了一条消息</content><revoketime>0</revoketime>'
']]></replacemsg></revokemsg></sysmsg>'
)
self.assertEqual(_parse_system_message_content(raw_text), '"黄智欢" 撤回了一条消息')
def test_extract_revokemsg_text_when_replacemsg_missing(self):
raw_text = "<revokemsg>你撤回了一条消息</revokemsg>"
self.assertEqual(_parse_system_message_content(raw_text), "你撤回了一条消息")
def test_revoke_fallback_when_no_readable_text(self):
raw_text = '<sysmsg type="revokemsg"></sysmsg>'
self.assertEqual(_parse_system_message_content(raw_text), "撤回了一条消息")
def test_normal_system_message_still_cleaned(self):
raw_text = "<sysmsg><template><![CDATA[ 张三 加入了群聊 ]]></template></sysmsg>"
self.assertEqual(_parse_system_message_content(raw_text), "张三 加入了群聊")
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,114 @@
import sqlite3
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _load_group_nickname_map_from_contact_db
def _enc_varint(n: int) -> bytes:
v = int(n)
out = bytearray()
while True:
b = v & 0x7F
v >>= 7
if v:
out.append(b | 0x80)
else:
out.append(b)
break
return bytes(out)
def _enc_tag(field_no: int, wire_type: int) -> bytes:
return _enc_varint((int(field_no) << 3) | int(wire_type))
def _enc_len(field_no: int, data: bytes) -> bytes:
b = bytes(data or b"")
return _enc_tag(field_no, 2) + _enc_varint(len(b)) + b
def _member_entry(*, inner: bytes) -> bytes:
# contact.db ext_buffer uses repeated length-delimited submessages; the top-level field number is not important
# for our best-effort parser, so we use field 1.
return _enc_len(1, inner)
class TestGroupNicknameExtBufferParsing(unittest.TestCase):
def test_parse_pattern_a_field1_username_field2_display(self):
chatroom = "demo@chatroom"
username = "wxid_demo_123456"
display = "群名片A"
inner = _enc_len(1, username.encode("utf-8")) + _enc_len(2, display.encode("utf-8"))
ext_buffer = _member_entry(inner=inner)
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.execute(
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
(1, chatroom, "", ext_buffer),
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
self.assertEqual(out.get(username), display)
def test_parse_pattern_b_field4_username_field1_display(self):
chatroom = "demo2@chatroom"
username = "wxid_demo_abcdef"
display = "hjlbingo"
inner = _enc_len(4, username.encode("utf-8")) + _enc_len(1, display.encode("utf-8"))
ext_buffer = _member_entry(inner=inner)
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.execute(
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
(1, chatroom, "", ext_buffer),
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
self.assertEqual(out.get(username), display)
def test_non_chatroom_returns_empty(self):
with TemporaryDirectory() as td:
contact_db_path = Path(td) / "contact.db"
conn = sqlite3.connect(str(contact_db_path))
try:
conn.execute(
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
)
conn.commit()
finally:
conn.close()
out = _load_group_nickname_map_from_contact_db(contact_db_path, "wxid_not_chatroom", ["wxid_xxx"])
self.assertEqual(out, {})
if __name__ == "__main__":
unittest.main()