mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
Compare commits
3 Commits
@@ -242,14 +242,20 @@
|
||||
@click="selectContact(contact)">
|
||||
<div class="flex items-center space-x-3 w-full">
|
||||
<!-- 联系人头像 -->
|
||||
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300 flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
|
||||
<div v-if="contact.avatar" class="w-full h-full">
|
||||
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
|
||||
</div>
|
||||
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
|
||||
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
|
||||
{{ contact.name.charAt(0) }}
|
||||
<div class="relative flex-shrink-0" :class="{ 'privacy-blur': privacyMode }">
|
||||
<div class="w-[calc(45px/var(--dpr))] h-[calc(45px/var(--dpr))] rounded-md overflow-hidden bg-gray-300">
|
||||
<div v-if="contact.avatar" class="w-full h-full">
|
||||
<img :src="contact.avatar" :alt="contact.name" class="w-full h-full object-cover" referrerpolicy="no-referrer" @error="onAvatarError($event, contact)">
|
||||
</div>
|
||||
<div v-else class="w-full h-full flex items-center justify-center text-white text-xs font-bold"
|
||||
:style="{ backgroundColor: contact.avatarColor || '#4B5563' }">
|
||||
{{ contact.name.charAt(0) }}
|
||||
</div>
|
||||
</div>
|
||||
<span
|
||||
v-if="contact.unreadCount > 0"
|
||||
class="absolute z-10 -top-[calc(4px/var(--dpr))] -right-[calc(4px/var(--dpr))] w-[calc(10px/var(--dpr))] h-[calc(10px/var(--dpr))] bg-[#ed4d4d] rounded-full"
|
||||
></span>
|
||||
</div>
|
||||
|
||||
<!-- 联系人信息 -->
|
||||
@@ -257,13 +263,12 @@
|
||||
<div class="flex items-center justify-between">
|
||||
<h3 class="text-sm font-medium text-gray-900 truncate" :class="{ 'privacy-blur': privacyMode }">{{ contact.name }}</h3>
|
||||
<div class="flex items-center flex-shrink-0 ml-2">
|
||||
<span v-if="contact.unreadCount > 0" class="text-[10px] text-white bg-red-500 rounded-full min-w-[18px] h-[18px] px-1 flex items-center justify-center mr-2">
|
||||
{{ contact.unreadCount > 99 ? '99+' : contact.unreadCount }}
|
||||
</span>
|
||||
<span class="text-xs text-gray-500">{{ contact.lastMessageTime }}</span>
|
||||
</div>
|
||||
</div>
|
||||
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">{{ contact.lastMessage }}</p>
|
||||
<p class="text-xs text-gray-500 truncate mt-0.5 leading-tight" :class="{ 'privacy-blur': privacyMode }">
|
||||
{{ contact.unreadCount > 0 ? `[${contact.unreadCount > 99 ? '99+' : contact.unreadCount}条] ` : '' }}{{ contact.lastMessage }}
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
@@ -616,15 +621,15 @@
|
||||
<img v-else :src="seg.emojiSrc" :alt="seg.content" class="inline-block w-[1.25em] h-[1.25em] align-text-bottom mx-px">
|
||||
</span>
|
||||
</div>
|
||||
<div
|
||||
v-if="message.quoteTitle || message.quoteContent"
|
||||
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[61px] flex items-center bg-[#e1e1e1]">
|
||||
<div class="py-2 min-w-0 w-full">
|
||||
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
|
||||
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
|
||||
<button
|
||||
type="button"
|
||||
class="flex items-center gap-1 min-w-0 hover:opacity-80"
|
||||
<div
|
||||
v-if="message.quoteTitle || message.quoteContent"
|
||||
class="mt-[5px] px-2 text-xs text-neutral-600 rounded max-w-[404px] max-h-[65px] overflow-hidden flex items-start bg-[#e1e1e1]">
|
||||
<div class="py-2 min-w-0 flex-1">
|
||||
<div v-if="isQuotedVoice(message)" class="flex items-center gap-1 min-w-0">
|
||||
<span v-if="message.quoteTitle" class="truncate flex-shrink-0">{{ message.quoteTitle }}:</span>
|
||||
<button
|
||||
type="button"
|
||||
class="flex items-center gap-1 min-w-0 hover:opacity-80"
|
||||
:disabled="!message.quoteVoiceUrl"
|
||||
:class="!message.quoteVoiceUrl ? 'opacity-60 cursor-not-allowed' : ''"
|
||||
@click.stop="message.quoteVoiceUrl && playQuoteVoice(message)"
|
||||
@@ -647,13 +652,35 @@
|
||||
:ref="el => setVoiceRef(getQuoteVoiceId(message), el)"
|
||||
:src="message.quoteVoiceUrl"
|
||||
preload="none"
|
||||
class="hidden"
|
||||
></audio>
|
||||
</div>
|
||||
<div v-else class="line-clamp-2">{{ message.quoteTitle ? (message.quoteTitle + ': ') : '' }}{{ message.quoteContent }}</div>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
class="hidden"
|
||||
></audio>
|
||||
</div>
|
||||
<div v-else class="line-clamp-2">
|
||||
<span v-if="message.quoteTitle">{{ message.quoteTitle }}:</span>
|
||||
<span
|
||||
v-if="message.quoteContent && !(isQuotedImage(message) && message.quoteTitle && message.quoteImageUrl && !message._quoteImageError)"
|
||||
:class="message.quoteTitle ? 'ml-1' : ''"
|
||||
>
|
||||
{{ message.quoteContent }}
|
||||
</span>
|
||||
</div>
|
||||
</div>
|
||||
<div
|
||||
v-if="isQuotedImage(message) && message.quoteImageUrl && !message._quoteImageError"
|
||||
class="ml-2 my-2 flex-shrink-0 w-[45px] h-[45px] rounded overflow-hidden cursor-pointer"
|
||||
@click.stop="openImagePreview(message.quoteImageUrl)"
|
||||
>
|
||||
<img
|
||||
:src="message.quoteImageUrl"
|
||||
alt="引用图片"
|
||||
class="w-full h-full object-contain"
|
||||
loading="lazy"
|
||||
decoding="async"
|
||||
@error="onQuoteImageError(message)"
|
||||
/>
|
||||
</div>
|
||||
</div>
|
||||
</template>
|
||||
<!-- 合并转发聊天记录(Chat History) -->
|
||||
<div
|
||||
v-else-if="message.renderType === 'chatHistory'"
|
||||
@@ -3464,6 +3491,19 @@ const isQuotedVoice = (message) => {
|
||||
return false
|
||||
}
|
||||
|
||||
const isQuotedImage = (message) => {
|
||||
const t = String(message?.quoteType || '').trim()
|
||||
if (t === '3') return true
|
||||
if (String(message?.quoteContent || '').trim() === '[图片]' && String(message?.quoteServerId || '').trim()) return true
|
||||
return false
|
||||
}
|
||||
|
||||
const onQuoteImageError = (message) => {
|
||||
try {
|
||||
if (message) message._quoteImageError = true
|
||||
} catch {}
|
||||
}
|
||||
|
||||
const playQuoteVoice = (message) => {
|
||||
playVoice({ id: getQuoteVoiceId(message) })
|
||||
}
|
||||
@@ -4665,6 +4705,23 @@ const normalizeMessage = (msg) => {
|
||||
}
|
||||
}
|
||||
|
||||
const quoteServerIdStr = String(msg.quoteServerId || '').trim()
|
||||
const quoteTypeStr = String(msg.quoteType || '').trim()
|
||||
const quoteVoiceUrl = quoteServerIdStr
|
||||
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(quoteServerIdStr)}`
|
||||
: ''
|
||||
const quoteImageUrl = (() => {
|
||||
if (!quoteServerIdStr) return ''
|
||||
if (quoteTypeStr !== '3' && String(msg.quoteContent || '').trim() !== '[图片]') return ''
|
||||
const convUsername = String(selectedContact.value?.username || '').trim()
|
||||
const parts = [
|
||||
`account=${encodeURIComponent(selectedAccount.value || '')}`,
|
||||
`server_id=${encodeURIComponent(quoteServerIdStr)}`,
|
||||
convUsername ? `username=${encodeURIComponent(convUsername)}` : ''
|
||||
].filter(Boolean)
|
||||
return parts.length ? `${mediaBase}/api/chat/media/image?${parts.join('&')}` : ''
|
||||
})()
|
||||
|
||||
return {
|
||||
id: msg.id,
|
||||
serverId: msg.serverId || 0,
|
||||
@@ -4701,12 +4758,12 @@ const normalizeMessage = (msg) => {
|
||||
quoteTitle: msg.quoteTitle || '',
|
||||
quoteContent,
|
||||
quoteUsername: msg.quoteUsername || '',
|
||||
quoteServerId: String(msg.quoteServerId || '').trim(),
|
||||
quoteType: String(msg.quoteType || '').trim(),
|
||||
quoteServerId: quoteServerIdStr,
|
||||
quoteType: quoteTypeStr,
|
||||
quoteVoiceLength: msg.quoteVoiceLength || '',
|
||||
quoteVoiceUrl: String(msg.quoteServerId || '').trim()
|
||||
? `${mediaBase}/api/chat/media/voice?account=${encodeURIComponent(selectedAccount.value || '')}&server_id=${encodeURIComponent(String(msg.quoteServerId || '').trim())}`
|
||||
: '',
|
||||
quoteVoiceUrl,
|
||||
quoteImageUrl: quoteImageUrl || '',
|
||||
_quoteImageError: false,
|
||||
amount: msg.amount || '',
|
||||
coverUrl: msg.coverUrl || '',
|
||||
fileSize: msg.fileSize || '',
|
||||
|
||||
@@ -28,6 +28,7 @@ from .chat_helpers import (
|
||||
_load_contact_rows,
|
||||
_lookup_resource_md5,
|
||||
_parse_app_message,
|
||||
_parse_system_message_content,
|
||||
_parse_pat_message,
|
||||
_pick_display_name,
|
||||
_quote_ident,
|
||||
@@ -954,13 +955,7 @@ def _parse_message_for_export(
|
||||
|
||||
if local_type == 10000:
|
||||
render_type = "system"
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
import re as _re
|
||||
|
||||
content_text = _re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = _re.sub(r"\\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 49:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
render_type = str(parsed.get("renderType") or "text")
|
||||
|
||||
@@ -645,6 +645,43 @@ def _extract_xml_tag_or_attr(xml_text: str, name: str) -> str:
|
||||
return _extract_xml_attr(xml_text, name)
|
||||
|
||||
|
||||
def _parse_system_message_content(raw_text: str) -> str:
|
||||
text = str(raw_text or "").strip()
|
||||
if not text:
|
||||
return "[系统消息]"
|
||||
|
||||
def _clean_system_text(value: str) -> str:
|
||||
candidate = str(value or "").strip()
|
||||
if not candidate:
|
||||
return ""
|
||||
|
||||
nested_content = _extract_xml_tag_text(candidate, "content")
|
||||
if nested_content:
|
||||
candidate = nested_content
|
||||
|
||||
candidate = re.sub(r"<!\[CDATA\[", "", candidate, flags=re.IGNORECASE)
|
||||
candidate = re.sub(r"\]\]>", "", candidate)
|
||||
candidate = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", candidate)
|
||||
candidate = re.sub(r"\s+", " ", candidate).strip()
|
||||
return candidate
|
||||
|
||||
if "revokemsg" in text.lower():
|
||||
replace_msg = _extract_xml_tag_text(text, "replacemsg")
|
||||
cleaned_replace_msg = _clean_system_text(replace_msg)
|
||||
if cleaned_replace_msg:
|
||||
return cleaned_replace_msg
|
||||
|
||||
revoke_msg = _extract_xml_tag_text(text, "revokemsg")
|
||||
cleaned_revoke_msg = _clean_system_text(revoke_msg)
|
||||
if cleaned_revoke_msg:
|
||||
return cleaned_revoke_msg
|
||||
|
||||
return "撤回了一条消息"
|
||||
|
||||
content_text = _clean_system_text(text)
|
||||
return content_text or "[系统消息]"
|
||||
|
||||
|
||||
def _extract_refermsg_block(xml_text: str) -> str:
|
||||
if not xml_text:
|
||||
return ""
|
||||
@@ -1053,11 +1090,7 @@ def _build_latest_message_preview(
|
||||
|
||||
content_text = ""
|
||||
if local_type == 10000:
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 244813135921:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
qt = str(parsed.get("quoteTitle") or "").strip()
|
||||
@@ -1093,7 +1126,7 @@ def _build_latest_message_preview(
|
||||
elif local_type == 43 or local_type == 62:
|
||||
content_text = "[视频]"
|
||||
elif local_type == 47:
|
||||
content_text = "[表情]"
|
||||
content_text = "[动画表情]"
|
||||
else:
|
||||
if raw_text and (not raw_text.startswith("<")) and (not raw_text.startswith('"<')):
|
||||
content_text = raw_text
|
||||
@@ -1107,6 +1140,101 @@ def _build_latest_message_preview(
|
||||
return content_text
|
||||
|
||||
|
||||
def _extract_group_preview_sender_username(preview_text: str) -> str:
|
||||
text = str(preview_text or "").strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
match = re.match(r"^([^:\s]{1,128}):\s*.+$", text)
|
||||
if not match:
|
||||
return ""
|
||||
|
||||
sender = str(match.group(1) or "").strip()
|
||||
if not sender:
|
||||
return ""
|
||||
|
||||
if sender.startswith("wxid_") or sender.endswith("@chatroom") or ("@" in sender):
|
||||
return sender
|
||||
if re.fullmatch(r"[A-Za-z][A-Za-z0-9_-]{1,127}", sender):
|
||||
return sender
|
||||
return ""
|
||||
|
||||
|
||||
def _normalize_session_preview_text(
|
||||
preview_text: str,
|
||||
*,
|
||||
is_group: bool,
|
||||
sender_display_names: Optional[dict[str, str]] = None,
|
||||
) -> str:
|
||||
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
text = text.replace("[表情]", "[动画表情]")
|
||||
if (not is_group) or text.startswith("[草稿]"):
|
||||
return text
|
||||
|
||||
match = re.match(r"^([^:\s]{1,128}):\s*(.+)$", text)
|
||||
if not match:
|
||||
return text
|
||||
|
||||
sender_username = str(match.group(1) or "").strip()
|
||||
body = str(match.group(2) or "").strip()
|
||||
if (not sender_username) or (not body):
|
||||
return text
|
||||
|
||||
display_name = str((sender_display_names or {}).get(sender_username) or "").strip()
|
||||
if display_name and display_name != sender_username:
|
||||
return f"{display_name}: {body}"
|
||||
return text
|
||||
|
||||
|
||||
def _replace_preview_sender_prefix(preview_text: str, sender_display_name: str) -> str:
|
||||
text = re.sub(r"\s+", " ", str(preview_text or "").strip()).strip()
|
||||
if not text:
|
||||
return ""
|
||||
|
||||
display_name = str(sender_display_name or "").strip()
|
||||
if (not display_name) or text.startswith("[草稿]"):
|
||||
return text
|
||||
|
||||
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
|
||||
if not match:
|
||||
return text
|
||||
|
||||
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
|
||||
if not body:
|
||||
return text
|
||||
return f"{display_name}: {body}"
|
||||
|
||||
|
||||
def _build_group_sender_display_name_map(
|
||||
contact_db_path: Path,
|
||||
previews: dict[str, str],
|
||||
) -> dict[str, str]:
|
||||
group_sender_usernames: set[str] = set()
|
||||
for conv_username, preview_text in previews.items():
|
||||
if not str(conv_username or "").endswith("@chatroom"):
|
||||
continue
|
||||
sender_username = _extract_group_preview_sender_username(preview_text)
|
||||
if sender_username:
|
||||
group_sender_usernames.add(sender_username)
|
||||
|
||||
if not group_sender_usernames:
|
||||
return {}
|
||||
|
||||
display_names: dict[str, str] = {}
|
||||
sender_contact_rows = _load_contact_rows(contact_db_path, list(group_sender_usernames))
|
||||
for sender_username in group_sender_usernames:
|
||||
row = sender_contact_rows.get(sender_username)
|
||||
if row is None:
|
||||
continue
|
||||
display_name = _pick_display_name(row, sender_username)
|
||||
if display_name and display_name != sender_username:
|
||||
display_names[sender_username] = display_name
|
||||
return display_names
|
||||
|
||||
|
||||
def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> dict[str, str]:
|
||||
if not usernames:
|
||||
return {}
|
||||
@@ -1338,6 +1466,208 @@ def _load_contact_rows(contact_db_path: Path, usernames: list[str]) -> dict[str,
|
||||
conn.close()
|
||||
|
||||
|
||||
def _load_group_nickname_map_from_contact_db(
|
||||
contact_db_path: Path,
|
||||
chatroom_id: str,
|
||||
sender_usernames: list[str],
|
||||
) -> dict[str, str]:
|
||||
"""Best-effort mapping for group member nickname (aka group card) from contact.db.
|
||||
|
||||
WeChat stores per-chatroom member nicknames in `contact.db.chat_room.ext_buffer` as a protobuf-like blob.
|
||||
This helper parses that blob and returns { sender_username -> group_nickname } for the requested senders.
|
||||
|
||||
Notes:
|
||||
- Best-effort: never raises; returns {} on any failure.
|
||||
- Only resolves usernames included in `sender_usernames` to keep parsing cheap.
|
||||
"""
|
||||
|
||||
chatroom = str(chatroom_id or "").strip()
|
||||
if not chatroom.endswith("@chatroom"):
|
||||
return {}
|
||||
|
||||
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
|
||||
if not targets:
|
||||
return {}
|
||||
target_set = set(targets)
|
||||
|
||||
def decode_varint(raw: bytes, offset: int) -> tuple[Optional[int], int]:
|
||||
value = 0
|
||||
shift = 0
|
||||
pos = int(offset)
|
||||
n = len(raw)
|
||||
while pos < n:
|
||||
byte = raw[pos]
|
||||
pos += 1
|
||||
value |= (byte & 0x7F) << shift
|
||||
if (byte & 0x80) == 0:
|
||||
return value, pos
|
||||
shift += 7
|
||||
if shift > 63:
|
||||
return None, n
|
||||
return None, n
|
||||
|
||||
def iter_fields(raw: bytes):
|
||||
idx = 0
|
||||
n = len(raw)
|
||||
while idx < n:
|
||||
tag, idx_next = decode_varint(raw, idx)
|
||||
if tag is None or idx_next <= idx:
|
||||
break
|
||||
idx = idx_next
|
||||
field_no = int(tag) >> 3
|
||||
wire_type = int(tag) & 0x7
|
||||
|
||||
if wire_type == 0:
|
||||
_, idx_next = decode_varint(raw, idx)
|
||||
if idx_next <= idx:
|
||||
break
|
||||
idx = idx_next
|
||||
continue
|
||||
|
||||
if wire_type == 2:
|
||||
size, idx_next = decode_varint(raw, idx)
|
||||
if size is None or idx_next <= idx:
|
||||
break
|
||||
idx = idx_next
|
||||
end = idx + int(size)
|
||||
if end > n:
|
||||
break
|
||||
chunk = raw[idx:end]
|
||||
idx = end
|
||||
yield field_no, wire_type, chunk
|
||||
continue
|
||||
|
||||
if wire_type == 1:
|
||||
idx += 8
|
||||
continue
|
||||
if wire_type == 5:
|
||||
idx += 4
|
||||
continue
|
||||
break
|
||||
|
||||
def is_strong_username_hint(s: str) -> bool:
|
||||
v = str(s or "").strip()
|
||||
return v.startswith("wxid_") or v.endswith("@chatroom") or v.startswith("gh_") or ("@" in v)
|
||||
|
||||
def looks_like_username(s: str) -> bool:
|
||||
v = str(s or "").strip()
|
||||
if not v:
|
||||
return False
|
||||
if is_strong_username_hint(v):
|
||||
return True
|
||||
# Common alias-style WeChat IDs are ASCII-ish and do not contain whitespace.
|
||||
if len(v) < 6 or len(v) > 32:
|
||||
return False
|
||||
if re.search(r"\s", v):
|
||||
return False
|
||||
if not re.match(r"^[A-Za-z][A-Za-z0-9_-]+$", v):
|
||||
return False
|
||||
if v.isdigit():
|
||||
return False
|
||||
return True
|
||||
|
||||
def pick_display(strings: list[tuple[int, str]], target: str) -> str:
|
||||
best_score = -1
|
||||
best = ""
|
||||
for i, (fno, value) in enumerate(strings):
|
||||
v = str(value or "").strip()
|
||||
if (not v) or v == target:
|
||||
continue
|
||||
if is_strong_username_hint(v):
|
||||
continue
|
||||
if "\n" in v or "\r" in v:
|
||||
continue
|
||||
if len(v) > 64:
|
||||
continue
|
||||
|
||||
score = 0
|
||||
if int(fno) == 2:
|
||||
score += 100
|
||||
if not looks_like_username(v):
|
||||
score += 20
|
||||
score += max(0, 32 - len(v))
|
||||
# Stable tie-breaker: prefer earlier appearance.
|
||||
score = score * 1000 - i
|
||||
if score > best_score:
|
||||
best_score = score
|
||||
best = v
|
||||
return best
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT ext_buffer FROM chat_room WHERE username = ? LIMIT 1",
|
||||
(chatroom,),
|
||||
).fetchone()
|
||||
if row is None:
|
||||
return {}
|
||||
|
||||
ext = row[0]
|
||||
if ext is None:
|
||||
return {}
|
||||
if isinstance(ext, memoryview):
|
||||
ext_buf = ext.tobytes()
|
||||
elif isinstance(ext, (bytes, bytearray)):
|
||||
ext_buf = bytes(ext)
|
||||
else:
|
||||
return {}
|
||||
if not ext_buf:
|
||||
return {}
|
||||
|
||||
out: dict[str, str] = {}
|
||||
for _, wire_type, chunk in iter_fields(ext_buf):
|
||||
if wire_type != 2 or (not chunk):
|
||||
continue
|
||||
|
||||
# Parse submessage and collect UTF-8 strings.
|
||||
strings: list[tuple[int, str]] = []
|
||||
try:
|
||||
for sfno, swire, sval in iter_fields(chunk):
|
||||
if swire != 2:
|
||||
continue
|
||||
if not sval:
|
||||
continue
|
||||
if len(sval) > 256:
|
||||
continue
|
||||
try:
|
||||
txt = bytes(sval).decode("utf-8", errors="strict")
|
||||
except Exception:
|
||||
continue
|
||||
txt = txt.strip()
|
||||
if not txt:
|
||||
continue
|
||||
strings.append((int(sfno), txt))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if not strings:
|
||||
continue
|
||||
|
||||
present = [v for _, v in strings if v in target_set and v not in out]
|
||||
if not present:
|
||||
continue
|
||||
|
||||
for target in present:
|
||||
disp = pick_display(strings, target)
|
||||
if disp:
|
||||
out[target] = disp
|
||||
if len(out) >= len(target_set):
|
||||
break
|
||||
|
||||
return out
|
||||
except Exception:
|
||||
return {}
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def _load_usernames_by_display_names(contact_db_path: Path, names: list[str]) -> dict[str, str]:
|
||||
"""Best-effort mapping from display name -> username using contact.db.
|
||||
|
||||
@@ -1515,11 +1845,7 @@ def _row_to_search_hit(
|
||||
|
||||
if local_type == 10000:
|
||||
render_type = "system"
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 49:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
render_type = str(parsed.get("renderType") or "text")
|
||||
|
||||
@@ -23,17 +23,17 @@ logger = get_logger(__name__)
|
||||
|
||||
|
||||
# 运行时输出目录(桌面端可通过 WECHAT_TOOL_DATA_DIR 指向可写目录)
|
||||
_OUTPUT_DATABASES_DIR = get_output_databases_dir()
|
||||
_PACKAGE_ROOT = Path(__file__).resolve().parent
|
||||
|
||||
|
||||
def _list_decrypted_accounts() -> list[str]:
|
||||
"""列出已解密输出的账号目录名(仅保留包含 session.db + contact.db 的账号)"""
|
||||
if not _OUTPUT_DATABASES_DIR.exists():
|
||||
output_db_dir = get_output_databases_dir()
|
||||
if not output_db_dir.exists():
|
||||
return []
|
||||
|
||||
accounts: list[str] = []
|
||||
for p in _OUTPUT_DATABASES_DIR.iterdir():
|
||||
for p in output_db_dir.iterdir():
|
||||
if not p.is_dir():
|
||||
continue
|
||||
if (p / "session.db").exists() and (p / "contact.db").exists():
|
||||
@@ -45,6 +45,7 @@ def _list_decrypted_accounts() -> list[str]:
|
||||
|
||||
def _resolve_account_dir(account: Optional[str]) -> Path:
|
||||
"""解析账号目录,并进行路径安全校验(防止路径穿越)"""
|
||||
output_db_dir = get_output_databases_dir()
|
||||
accounts = _list_decrypted_accounts()
|
||||
if not accounts:
|
||||
raise HTTPException(
|
||||
@@ -53,8 +54,8 @@ def _resolve_account_dir(account: Optional[str]) -> Path:
|
||||
)
|
||||
|
||||
selected = account or accounts[0]
|
||||
base = _OUTPUT_DATABASES_DIR.resolve()
|
||||
candidate = (_OUTPUT_DATABASES_DIR / selected).resolve()
|
||||
base = output_db_dir.resolve()
|
||||
candidate = (output_db_dir / selected).resolve()
|
||||
|
||||
if candidate != base and base not in candidate.parents:
|
||||
raise HTTPException(status_code=400, detail="Invalid account path.")
|
||||
|
||||
@@ -39,11 +39,17 @@ from ..chat_helpers import (
|
||||
_make_snippet,
|
||||
_match_tokens,
|
||||
_load_contact_rows,
|
||||
_load_group_nickname_map_from_contact_db,
|
||||
_load_usernames_by_display_names,
|
||||
_load_latest_message_previews,
|
||||
_build_group_sender_display_name_map,
|
||||
_normalize_session_preview_text,
|
||||
_extract_group_preview_sender_username,
|
||||
_replace_preview_sender_prefix,
|
||||
_lookup_resource_md5,
|
||||
_normalize_xml_url,
|
||||
_parse_app_message,
|
||||
_parse_system_message_content,
|
||||
_parse_pat_message,
|
||||
_pick_display_name,
|
||||
_query_head_image_usernames,
|
||||
@@ -69,6 +75,8 @@ from ..wcdb_realtime import (
|
||||
WCDB_REALTIME,
|
||||
get_avatar_urls as _wcdb_get_avatar_urls,
|
||||
get_display_names as _wcdb_get_display_names,
|
||||
get_group_members as _wcdb_get_group_members,
|
||||
get_group_nicknames as _wcdb_get_group_nicknames,
|
||||
get_messages as _wcdb_get_messages,
|
||||
get_sessions as _wcdb_get_sessions,
|
||||
)
|
||||
@@ -97,6 +105,142 @@ def _avatar_url_unified(
|
||||
return _build_avatar_url(str(account_dir.name or ""), u)
|
||||
|
||||
|
||||
def _load_group_nickname_map_from_wcdb(
|
||||
*,
|
||||
account_dir: Path,
|
||||
chatroom_id: str,
|
||||
sender_usernames: list[str],
|
||||
rt_conn=None,
|
||||
) -> dict[str, str]:
|
||||
chatroom = str(chatroom_id or "").strip()
|
||||
if not chatroom.endswith("@chatroom"):
|
||||
return {}
|
||||
|
||||
targets = list(dict.fromkeys([str(x or "").strip() for x in sender_usernames if str(x or "").strip()]))
|
||||
if not targets:
|
||||
return {}
|
||||
|
||||
try:
|
||||
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
target_set = set(targets)
|
||||
out: dict[str, str] = {}
|
||||
|
||||
try:
|
||||
with wcdb_conn.lock:
|
||||
nickname_map = _wcdb_get_group_nicknames(wcdb_conn.handle, chatroom)
|
||||
for username, nickname in (nickname_map or {}).items():
|
||||
su = str(username or "").strip()
|
||||
nn = str(nickname or "").strip()
|
||||
if su and nn and su in target_set:
|
||||
out[su] = nn
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
unresolved = [u for u in targets if u not in out]
|
||||
if not unresolved:
|
||||
return out
|
||||
|
||||
try:
|
||||
with wcdb_conn.lock:
|
||||
members = _wcdb_get_group_members(wcdb_conn.handle, chatroom)
|
||||
except Exception:
|
||||
return out
|
||||
|
||||
if not members:
|
||||
return out
|
||||
|
||||
unresolved_set = set(unresolved)
|
||||
for member in members:
|
||||
try:
|
||||
username = str(member.get("username") or "").strip()
|
||||
except Exception:
|
||||
username = ""
|
||||
if (not username) or (username not in unresolved_set):
|
||||
continue
|
||||
|
||||
nickname = ""
|
||||
for key in ("nickname", "displayName", "remark", "originalName"):
|
||||
try:
|
||||
candidate = str(member.get(key) or "").strip()
|
||||
except Exception:
|
||||
candidate = ""
|
||||
if candidate:
|
||||
nickname = candidate
|
||||
break
|
||||
if nickname:
|
||||
out[username] = nickname
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _load_group_nickname_map(
|
||||
*,
|
||||
account_dir: Path,
|
||||
contact_db_path: Path,
|
||||
chatroom_id: str,
|
||||
sender_usernames: list[str],
|
||||
rt_conn=None,
|
||||
) -> dict[str, str]:
|
||||
"""Resolve group member nickname (group card) via WCDB and contact.db ext_buffer (best-effort)."""
|
||||
|
||||
contact_map: dict[str, str] = {}
|
||||
try:
|
||||
contact_map = _load_group_nickname_map_from_contact_db(
|
||||
contact_db_path,
|
||||
chatroom_id,
|
||||
sender_usernames,
|
||||
)
|
||||
except Exception:
|
||||
contact_map = {}
|
||||
|
||||
wcdb_map: dict[str, str] = {}
|
||||
try:
|
||||
wcdb_map = _load_group_nickname_map_from_wcdb(
|
||||
account_dir=account_dir,
|
||||
chatroom_id=chatroom_id,
|
||||
sender_usernames=sender_usernames,
|
||||
rt_conn=rt_conn,
|
||||
)
|
||||
except Exception:
|
||||
wcdb_map = {}
|
||||
|
||||
if not contact_map and not wcdb_map:
|
||||
return {}
|
||||
|
||||
# Merge: WCDB wins (newer DLLs may provide higher-quality group nicknames).
|
||||
merged: dict[str, str] = {}
|
||||
merged.update(contact_map)
|
||||
merged.update(wcdb_map)
|
||||
return merged
|
||||
|
||||
|
||||
def _resolve_sender_display_name(
|
||||
*,
|
||||
sender_username: str,
|
||||
sender_contact_rows: dict[str, sqlite3.Row],
|
||||
wcdb_display_names: dict[str, str],
|
||||
group_nicknames: Optional[dict[str, str]] = None,
|
||||
) -> str:
|
||||
su = str(sender_username or "").strip()
|
||||
if not su:
|
||||
return ""
|
||||
|
||||
gn = str((group_nicknames or {}).get(su) or "").strip()
|
||||
if gn:
|
||||
return gn
|
||||
|
||||
row = sender_contact_rows.get(su)
|
||||
display_name = _pick_display_name(row, su)
|
||||
if display_name == su:
|
||||
wd = str(wcdb_display_names.get(su) or "").strip()
|
||||
if wd and wd != su:
|
||||
display_name = wd
|
||||
return display_name
|
||||
|
||||
|
||||
def _realtime_sync_lock(account: str, username: str) -> threading.Lock:
|
||||
key = (str(account or "").strip(), str(username or "").strip())
|
||||
with _REALTIME_SYNC_MU:
|
||||
@@ -557,8 +701,11 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
|
||||
"draft",
|
||||
"last_timestamp",
|
||||
"sort_timestamp",
|
||||
"last_msg_locald_id",
|
||||
"last_msg_type",
|
||||
"last_msg_sub_type",
|
||||
"last_msg_sender",
|
||||
"last_sender_display_name",
|
||||
]
|
||||
update_cols = [c for c in desired_cols if c in cols]
|
||||
if not update_cols:
|
||||
@@ -583,7 +730,15 @@ def _upsert_session_table_rows(conn: sqlite3.Connection, rows: list[dict[str, An
|
||||
continue
|
||||
values: list[Any] = []
|
||||
for c in update_cols:
|
||||
if c in {"unread_count", "is_hidden", "last_timestamp", "sort_timestamp", "last_msg_type", "last_msg_sub_type"}:
|
||||
if c in {
|
||||
"unread_count",
|
||||
"is_hidden",
|
||||
"last_timestamp",
|
||||
"sort_timestamp",
|
||||
"last_msg_locald_id",
|
||||
"last_msg_type",
|
||||
"last_msg_sub_type",
|
||||
}:
|
||||
values.append(_int((r or {}).get(c)))
|
||||
else:
|
||||
values.append(_text((r or {}).get(c)))
|
||||
@@ -1510,8 +1665,17 @@ def sync_chat_realtime_messages_all(
|
||||
"sort_timestamp",
|
||||
item.get("sortTimestamp", item.get("last_timestamp", item.get("lastTimestamp", 0))),
|
||||
),
|
||||
"last_msg_locald_id": item.get(
|
||||
"last_msg_locald_id",
|
||||
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
|
||||
),
|
||||
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
|
||||
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
|
||||
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
|
||||
"last_sender_display_name": item.get(
|
||||
"last_sender_display_name",
|
||||
item.get("lastSenderDisplayName", ""),
|
||||
),
|
||||
}
|
||||
# Prefer the row with the newer sort timestamp for the same username.
|
||||
prev = realtime_rows_by_user.get(uname)
|
||||
@@ -2137,11 +2301,7 @@ def _append_full_messages_from_rows(
|
||||
|
||||
if local_type == 10000:
|
||||
render_type = "system"
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 49:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
render_type = str(parsed.get("renderType") or "text")
|
||||
@@ -2598,6 +2758,13 @@ def _postprocess_full_messages(
|
||||
wcdb_display_names = {}
|
||||
wcdb_avatar_urls = {}
|
||||
|
||||
group_nicknames = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=username,
|
||||
sender_usernames=uniq_senders,
|
||||
)
|
||||
|
||||
for m in merged:
|
||||
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
|
||||
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
|
||||
@@ -2613,13 +2780,12 @@ def _postprocess_full_messages(
|
||||
su = str(m.get("senderUsername") or "")
|
||||
if not su:
|
||||
continue
|
||||
row = sender_contact_rows.get(su)
|
||||
display_name = _pick_display_name(row, su)
|
||||
if display_name == su:
|
||||
wd = str(wcdb_display_names.get(su) or "").strip()
|
||||
if wd and wd != su:
|
||||
display_name = wd
|
||||
m["senderDisplayName"] = display_name
|
||||
m["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=sender_contact_rows,
|
||||
wcdb_display_names=wcdb_display_names,
|
||||
group_nicknames=group_nicknames,
|
||||
)
|
||||
avatar_url = base_url + _avatar_url_unified(
|
||||
account_dir=account_dir,
|
||||
username=su,
|
||||
@@ -2836,6 +3002,17 @@ def list_chat_sessions(
|
||||
"sort_timestamp": item.get("sort_timestamp", item.get("sortTimestamp", item.get("last_timestamp", 0))),
|
||||
"last_msg_type": item.get("last_msg_type", item.get("lastMsgType", 0)),
|
||||
"last_msg_sub_type": item.get("last_msg_sub_type", item.get("lastMsgSubType", 0)),
|
||||
# Keep these fields so group session previews can render "sender: content" without
|
||||
# crashing (realtime rows are dicts, not sqlite Rows).
|
||||
"last_msg_sender": item.get("last_msg_sender", item.get("lastMsgSender", "")),
|
||||
"last_sender_display_name": item.get(
|
||||
"last_sender_display_name",
|
||||
item.get("lastSenderDisplayName", ""),
|
||||
),
|
||||
"last_msg_locald_id": item.get(
|
||||
"last_msg_locald_id",
|
||||
item.get("lastMsgLocaldId", item.get("lastMsgLocalId", 0)),
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
@@ -2923,12 +3100,16 @@ def list_chat_sessions(
|
||||
try:
|
||||
need_display: list[str] = []
|
||||
need_avatar: list[str] = []
|
||||
if source_norm == "realtime":
|
||||
# In realtime mode, always ask WCDB for display names: decrypted contact.db can be stale.
|
||||
need_display = [str(u or "").strip() for u in usernames if str(u or "").strip()]
|
||||
for u in usernames:
|
||||
if not u:
|
||||
continue
|
||||
row = contact_rows.get(u)
|
||||
if _pick_display_name(row, u) == u:
|
||||
need_display.append(u)
|
||||
if source_norm != "realtime":
|
||||
row = contact_rows.get(u)
|
||||
if _pick_display_name(row, u) == u:
|
||||
need_display.append(u)
|
||||
if source_norm == "realtime":
|
||||
# In realtime mode, prefer WCDB-resolved avatar URLs (contact.db can be stale).
|
||||
if u not in local_avatar_usernames:
|
||||
@@ -2983,14 +3164,40 @@ def list_chat_sessions(
|
||||
if v:
|
||||
last_previews[u] = v
|
||||
|
||||
group_sender_display_names: dict[str, str] = _build_group_sender_display_name_map(
|
||||
contact_db_path,
|
||||
last_previews,
|
||||
)
|
||||
unresolved = []
|
||||
for conv_username, preview_text in last_previews.items():
|
||||
if not str(conv_username or "").endswith("@chatroom"):
|
||||
continue
|
||||
sender_username = _extract_group_preview_sender_username(preview_text)
|
||||
if sender_username and sender_username not in group_sender_display_names:
|
||||
unresolved.append(sender_username)
|
||||
unresolved = list(dict.fromkeys(unresolved))
|
||||
if unresolved:
|
||||
try:
|
||||
wcdb_conn = rt_conn or WCDB_REALTIME.ensure_connected(account_dir)
|
||||
with wcdb_conn.lock:
|
||||
wcdb_names = _wcdb_get_display_names(wcdb_conn.handle, unresolved)
|
||||
for sender_username in unresolved:
|
||||
wcdb_name = str(wcdb_names.get(sender_username) or "").strip()
|
||||
if wcdb_name and wcdb_name != sender_username:
|
||||
group_sender_display_names[sender_username] = wcdb_name
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
sessions: list[dict[str, Any]] = []
|
||||
for r in filtered:
|
||||
username = r["username"]
|
||||
c_row = contact_rows.get(username)
|
||||
|
||||
display_name = _pick_display_name(c_row, username)
|
||||
if display_name == username:
|
||||
wd = str(wcdb_display_names.get(username) or "").strip()
|
||||
wd = str(wcdb_display_names.get(username) or "").strip()
|
||||
if source_norm == "realtime" and wd and wd != username:
|
||||
display_name = wd
|
||||
elif display_name == username:
|
||||
if wd and wd != username:
|
||||
display_name = wd
|
||||
|
||||
@@ -3046,6 +3253,37 @@ def list_chat_sessions(
|
||||
if last_msg_type == 81604378673 or (last_msg_type == 49 and last_msg_sub_type == 19):
|
||||
last_message = "[聊天记录]"
|
||||
|
||||
last_message = _normalize_session_preview_text(
|
||||
last_message,
|
||||
is_group=bool(str(username or "").endswith("@chatroom")),
|
||||
sender_display_names=group_sender_display_names,
|
||||
)
|
||||
if str(username or "").endswith("@chatroom") and str(last_message or "") and not str(last_message).startswith("[草稿]"):
|
||||
# Prefer group card nickname when available. In realtime mode, WCDB session rows can provide
|
||||
# `last_sender_display_name`, but we may still get a summary that doesn't include "sender:".
|
||||
# Also guard against URL schemes like "https://..." being mis-parsed as "https: //...".
|
||||
raw_sender_display = ""
|
||||
try:
|
||||
raw_sender_display = r["last_sender_display_name"]
|
||||
except Exception:
|
||||
try:
|
||||
raw_sender_display = r.get("last_sender_display_name", "")
|
||||
except Exception:
|
||||
raw_sender_display = ""
|
||||
sender_display = _decode_sqlite_text(raw_sender_display).strip()
|
||||
if sender_display:
|
||||
text = re.sub(r"\s+", " ", str(last_message or "").strip()).strip()
|
||||
match = re.match(r"^([^:\n]{1,128}):\s*(.+)$", text)
|
||||
if match:
|
||||
prefix = str(match.group(1) or "").strip()
|
||||
body = re.sub(r"\s+", " ", str(match.group(2) or "").strip()).strip()
|
||||
if prefix.lower() in {"http", "https"} and body.startswith("//"):
|
||||
last_message = f"{sender_display}: {text}"
|
||||
else:
|
||||
last_message = f"{sender_display}: {body}"
|
||||
else:
|
||||
last_message = f"{sender_display}: {text}"
|
||||
|
||||
last_time = _format_session_time(r["sort_timestamp"] or r["last_timestamp"])
|
||||
|
||||
sessions.append(
|
||||
@@ -3248,13 +3486,7 @@ def _collect_chat_messages(
|
||||
|
||||
if local_type == 10000:
|
||||
render_type = "system"
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
import re
|
||||
|
||||
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 49:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
render_type = str(parsed.get("renderType") or "text")
|
||||
@@ -3957,13 +4189,7 @@ def list_chat_messages(
|
||||
|
||||
if local_type == 10000:
|
||||
render_type = "system"
|
||||
if "revokemsg" in raw_text:
|
||||
content_text = "撤回了一条消息"
|
||||
else:
|
||||
import re
|
||||
|
||||
content_text = re.sub(r"</?[_a-zA-Z0-9]+[^>]*>", "", raw_text)
|
||||
content_text = re.sub(r"\s+", " ", content_text).strip() or "[系统消息]"
|
||||
content_text = _parse_system_message_content(raw_text)
|
||||
elif local_type == 49:
|
||||
parsed = _parse_app_message(raw_text)
|
||||
render_type = str(parsed.get("renderType") or "text")
|
||||
@@ -4412,6 +4638,13 @@ def list_chat_messages(
|
||||
wcdb_display_names = {}
|
||||
wcdb_avatar_urls = {}
|
||||
|
||||
group_nicknames = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=username,
|
||||
sender_usernames=uniq_senders,
|
||||
)
|
||||
|
||||
for m in merged:
|
||||
# If appmsg doesn't provide sourcedisplayname, try mapping sourceusername to display name.
|
||||
if (not str(m.get("from") or "").strip()) and str(m.get("fromUsername") or "").strip():
|
||||
@@ -4427,13 +4660,12 @@ def list_chat_messages(
|
||||
su = str(m.get("senderUsername") or "")
|
||||
if not su:
|
||||
continue
|
||||
row = sender_contact_rows.get(su)
|
||||
display_name = _pick_display_name(row, su)
|
||||
if display_name == su:
|
||||
wd = str(wcdb_display_names.get(su) or "").strip()
|
||||
if wd and wd != su:
|
||||
display_name = wd
|
||||
m["senderDisplayName"] = display_name
|
||||
m["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=sender_contact_rows,
|
||||
wcdb_display_names=wcdb_display_names,
|
||||
group_nicknames=group_nicknames,
|
||||
)
|
||||
avatar_url = base_url + _avatar_url_unified(
|
||||
account_dir=account_dir,
|
||||
username=su,
|
||||
@@ -4930,19 +5162,24 @@ async def _search_chat_messages_via_fts(
|
||||
username=username,
|
||||
local_avatar_usernames=local_avatar_usernames,
|
||||
)
|
||||
group_nicknames = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=username,
|
||||
sender_usernames=[str(x.get("senderUsername") or "") for x in hits],
|
||||
)
|
||||
|
||||
for h in hits:
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
h["conversationName"] = conv_name
|
||||
h["conversationAvatar"] = conv_avatar
|
||||
if su:
|
||||
row = contact_rows.get(su)
|
||||
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
|
||||
if display_name == su:
|
||||
wd = str(wcdb_display_names.get(su) or "").strip()
|
||||
if wd and wd != su:
|
||||
display_name = wd
|
||||
h["senderDisplayName"] = display_name
|
||||
h["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=contact_rows,
|
||||
wcdb_display_names=wcdb_display_names,
|
||||
group_nicknames=group_nicknames,
|
||||
)
|
||||
avatar_url = base_url + _avatar_url_unified(
|
||||
account_dir=account_dir,
|
||||
username=su,
|
||||
@@ -4986,6 +5223,23 @@ async def _search_chat_messages_via_fts(
|
||||
wcdb_display_names = {}
|
||||
wcdb_avatar_urls = {}
|
||||
|
||||
group_senders_by_room: dict[str, list[str]] = {}
|
||||
for h in hits:
|
||||
cu = str(h.get("username") or "").strip()
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
if (not cu.endswith("@chatroom")) or (not su):
|
||||
continue
|
||||
group_senders_by_room.setdefault(cu, []).append(su)
|
||||
|
||||
group_nickname_cache: dict[str, dict[str, str]] = {}
|
||||
for cu, senders in group_senders_by_room.items():
|
||||
group_nickname_cache[cu] = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=cu,
|
||||
sender_usernames=senders,
|
||||
)
|
||||
|
||||
for h in hits:
|
||||
cu = str(h.get("username") or "").strip()
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
@@ -5003,13 +5257,12 @@ async def _search_chat_messages_via_fts(
|
||||
)
|
||||
h["conversationAvatar"] = conv_avatar
|
||||
if su:
|
||||
row = contact_rows.get(su)
|
||||
display_name = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
|
||||
if display_name == su:
|
||||
wd = str(wcdb_display_names.get(su) or "").strip()
|
||||
if wd and wd != su:
|
||||
display_name = wd
|
||||
h["senderDisplayName"] = display_name
|
||||
h["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=contact_rows,
|
||||
wcdb_display_names=wcdb_display_names,
|
||||
group_nicknames=group_nickname_cache.get(cu, {}),
|
||||
)
|
||||
avatar_url = base_url + _avatar_url_unified(
|
||||
account_dir=account_dir,
|
||||
username=su,
|
||||
@@ -5272,13 +5525,23 @@ async def search_chat_messages(
|
||||
contact_rows = _load_contact_rows(contact_db_path, uniq_usernames)
|
||||
conv_row = contact_rows.get(username)
|
||||
conv_name = _pick_display_name(conv_row, username)
|
||||
group_nicknames = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=username,
|
||||
sender_usernames=[str(x.get("senderUsername") or "") for x in page],
|
||||
)
|
||||
|
||||
for h in page:
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
h["conversationName"] = conv_name
|
||||
if su:
|
||||
row = contact_rows.get(su)
|
||||
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == username else su)
|
||||
h["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=contact_rows,
|
||||
wcdb_display_names={},
|
||||
group_nicknames=group_nicknames,
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
@@ -5360,6 +5623,23 @@ async def search_chat_messages(
|
||||
)
|
||||
contact_rows = _load_contact_rows(contact_db_path, uniq_contacts)
|
||||
|
||||
group_senders_by_room: dict[str, list[str]] = {}
|
||||
for h in page:
|
||||
cu = str(h.get("username") or "").strip()
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
if (not cu.endswith("@chatroom")) or (not su):
|
||||
continue
|
||||
group_senders_by_room.setdefault(cu, []).append(su)
|
||||
|
||||
group_nickname_cache: dict[str, dict[str, str]] = {}
|
||||
for cu, senders in group_senders_by_room.items():
|
||||
group_nickname_cache[cu] = _load_group_nickname_map(
|
||||
account_dir=account_dir,
|
||||
contact_db_path=contact_db_path,
|
||||
chatroom_id=cu,
|
||||
sender_usernames=senders,
|
||||
)
|
||||
|
||||
for h in page:
|
||||
cu = str(h.get("username") or "").strip()
|
||||
su = str(h.get("senderUsername") or "").strip()
|
||||
@@ -5367,8 +5647,12 @@ async def search_chat_messages(
|
||||
conv_name = _pick_display_name(crow, cu) if cu else ""
|
||||
h["conversationName"] = conv_name or cu
|
||||
if su:
|
||||
row = contact_rows.get(su)
|
||||
h["senderDisplayName"] = _pick_display_name(row, su) if row is not None else (conv_name if su == cu else su)
|
||||
h["senderDisplayName"] = _resolve_sender_display_name(
|
||||
sender_username=su,
|
||||
sender_contact_rows=contact_rows,
|
||||
wcdb_display_names={},
|
||||
group_nicknames=group_nickname_cache.get(cu, {}),
|
||||
)
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
|
||||
@@ -688,6 +688,83 @@ def _lookup_resource_md5_by_server_id(account_dir_str: str, server_id: int, want
|
||||
pass
|
||||
|
||||
|
||||
@lru_cache(maxsize=4096)
|
||||
def _lookup_image_md5_by_server_id_from_messages(account_dir_str: str, server_id: int, username: str) -> str:
|
||||
account_dir_str = str(account_dir_str or "").strip()
|
||||
username = str(username or "").strip()
|
||||
if not account_dir_str or not username:
|
||||
return ""
|
||||
|
||||
try:
|
||||
sid = int(server_id or 0)
|
||||
except Exception:
|
||||
sid = 0
|
||||
if not sid:
|
||||
return ""
|
||||
|
||||
try:
|
||||
chat_hash = hashlib.md5(username.encode()).hexdigest()
|
||||
except Exception:
|
||||
return ""
|
||||
if not chat_hash:
|
||||
return ""
|
||||
|
||||
table_name = f"Msg_{chat_hash}"
|
||||
account_dir = Path(account_dir_str)
|
||||
|
||||
db_paths: list[Path] = []
|
||||
try:
|
||||
for p in account_dir.glob("message_*.db"):
|
||||
try:
|
||||
if p.is_file():
|
||||
db_paths.append(p)
|
||||
except Exception:
|
||||
continue
|
||||
except Exception:
|
||||
db_paths = []
|
||||
|
||||
if not db_paths:
|
||||
return ""
|
||||
db_paths.sort(key=lambda p: p.name)
|
||||
|
||||
for db_path in db_paths:
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
try:
|
||||
row = conn.execute(
|
||||
f"SELECT local_type, packed_info_data FROM {table_name} "
|
||||
"WHERE server_id = ? ORDER BY create_time DESC LIMIT 1",
|
||||
(sid,),
|
||||
).fetchone()
|
||||
except Exception:
|
||||
row = None
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
if not row:
|
||||
continue
|
||||
|
||||
try:
|
||||
local_type = int(row[0] or 0)
|
||||
except Exception:
|
||||
local_type = 0
|
||||
if local_type != 3:
|
||||
continue
|
||||
|
||||
md5 = _extract_md5_from_packed_info(row[1])
|
||||
md5_norm = str(md5 or "").strip().lower()
|
||||
if _is_valid_md5(md5_norm):
|
||||
return md5_norm
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
def _is_safe_http_url(url: str) -> bool:
|
||||
u = str(url or "").strip()
|
||||
if not u:
|
||||
@@ -1062,6 +1139,12 @@ async def get_chat_image(
|
||||
resource_md5 = _lookup_resource_md5_by_server_id(str(account_dir), int(server_id), want_local_type=3)
|
||||
if resource_md5:
|
||||
md5 = resource_md5
|
||||
elif username:
|
||||
md5_from_msg = _lookup_image_md5_by_server_id_from_messages(
|
||||
str(account_dir), int(server_id), str(username)
|
||||
)
|
||||
if md5_from_msg:
|
||||
md5 = md5_from_msg
|
||||
|
||||
# md5 模式:优先从解密资源目录读取(更快)
|
||||
if md5:
|
||||
|
||||
@@ -102,6 +102,17 @@ def _load_wcdb_lib() -> ctypes.CDLL:
|
||||
lib.wcdb_get_group_members.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.POINTER(ctypes.c_char_p)]
|
||||
lib.wcdb_get_group_members.restype = ctypes.c_int
|
||||
|
||||
# Optional (newer DLLs): wcdb_get_group_nicknames(handle, chatroom_id, out_json)
|
||||
try:
|
||||
lib.wcdb_get_group_nicknames.argtypes = [
|
||||
ctypes.c_int64,
|
||||
ctypes.c_char_p,
|
||||
ctypes.POINTER(ctypes.c_char_p),
|
||||
]
|
||||
lib.wcdb_get_group_nicknames.restype = ctypes.c_int
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Optional: execute arbitrary SQL on a selected database kind/path.
|
||||
# Signature: wcdb_exec_query(handle, kind, path, sql, out_json)
|
||||
try:
|
||||
@@ -355,6 +366,41 @@ def get_avatar_urls(handle: int, usernames: list[str]) -> dict[str, str]:
|
||||
return {}
|
||||
|
||||
|
||||
def get_group_members(handle: int, chatroom_id: str) -> list[dict[str, Any]]:
|
||||
_ensure_initialized()
|
||||
lib = _load_wcdb_lib()
|
||||
cid = str(chatroom_id or "").strip()
|
||||
if not cid:
|
||||
return []
|
||||
out_json = _call_out_json(lib.wcdb_get_group_members, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
|
||||
decoded = _safe_load_json(out_json)
|
||||
if isinstance(decoded, list):
|
||||
out: list[dict[str, Any]] = []
|
||||
for x in decoded:
|
||||
if isinstance(x, dict):
|
||||
out.append(x)
|
||||
return out
|
||||
return []
|
||||
|
||||
|
||||
def get_group_nicknames(handle: int, chatroom_id: str) -> dict[str, str]:
|
||||
_ensure_initialized()
|
||||
lib = _load_wcdb_lib()
|
||||
fn = getattr(lib, "wcdb_get_group_nicknames", None)
|
||||
if not fn:
|
||||
return {}
|
||||
|
||||
cid = str(chatroom_id or "").strip()
|
||||
if not cid:
|
||||
return {}
|
||||
|
||||
out_json = _call_out_json(fn, ctypes.c_int64(int(handle)), cid.encode("utf-8"))
|
||||
decoded = _safe_load_json(out_json)
|
||||
if isinstance(decoded, dict):
|
||||
return {str(k): str(v) for k, v in decoded.items()}
|
||||
return {}
|
||||
|
||||
|
||||
def exec_query(handle: int, *, kind: str, path: Optional[str], sql: str) -> list[dict[str, Any]]:
|
||||
"""Execute raw SQL on a specific db kind/path via WCDB.
|
||||
|
||||
|
||||
@@ -122,6 +122,16 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
|
||||
(3, 1003, 49, 3, 2, 1735689603, '<msg><appmsg><type>2000</type><des>收到转账0.01元</des></appmsg></msg>', None),
|
||||
(4, 1004, 1, 4, 2, 1735689604, '普通文本消息', None),
|
||||
(5, 1005, 10000, 5, 2, 1735689605, '系统提示消息', None),
|
||||
(
|
||||
6,
|
||||
1006,
|
||||
10000,
|
||||
6,
|
||||
2,
|
||||
1735689606,
|
||||
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“测试好友”撤回了一条消息]]></replacemsg></revokemsg></sysmsg>',
|
||||
None,
|
||||
),
|
||||
]
|
||||
conn.executemany(
|
||||
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
@@ -413,6 +423,37 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
|
||||
else:
|
||||
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
|
||||
|
||||
def test_system_revoke_exports_readable_revoker_content(self):
|
||||
with TemporaryDirectory() as td:
|
||||
root = Path(td)
|
||||
account = "wxid_test"
|
||||
username = "wxid_friend"
|
||||
self._prepare_account(root, account=account, username=username)
|
||||
|
||||
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
|
||||
try:
|
||||
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
|
||||
svc = self._reload_export_modules()
|
||||
job = self._create_job(
|
||||
svc.CHAT_EXPORT_MANAGER,
|
||||
account=account,
|
||||
username=username,
|
||||
message_types=["system"],
|
||||
include_media=False,
|
||||
)
|
||||
self.assertEqual(job.status, "done", msg=job.error)
|
||||
|
||||
payload, _, _ = self._load_export_payload(job.zip_path)
|
||||
revoke_msg = next((m for m in payload.get("messages", []) if int(m.get("serverId") or 0) == 1006), None)
|
||||
self.assertIsNotNone(revoke_msg)
|
||||
self.assertEqual(str(revoke_msg.get("renderType") or ""), "system")
|
||||
self.assertEqual(str(revoke_msg.get("content") or ""), "“测试好友”撤回了一条消息")
|
||||
finally:
|
||||
if prev_data is None:
|
||||
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
|
||||
else:
|
||||
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,111 @@
|
||||
import sqlite3
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool.routers import chat as chat_router
|
||||
|
||||
|
||||
class _DummyRequest:
|
||||
base_url = "http://testserver/"
|
||||
|
||||
|
||||
class _DummyConn:
|
||||
def __init__(self) -> None:
|
||||
self.handle = 1
|
||||
self.lock = threading.Lock()
|
||||
|
||||
|
||||
def _seed_session_db(session_db_path: Path) -> None:
|
||||
conn = sqlite3.connect(str(session_db_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE SessionTable (
|
||||
username TEXT PRIMARY KEY,
|
||||
unread_count INTEGER DEFAULT 0,
|
||||
is_hidden INTEGER DEFAULT 0,
|
||||
summary TEXT DEFAULT '',
|
||||
draft TEXT DEFAULT '',
|
||||
last_timestamp INTEGER DEFAULT 0,
|
||||
sort_timestamp INTEGER DEFAULT 0,
|
||||
last_msg_locald_id INTEGER DEFAULT 0,
|
||||
last_msg_type INTEGER DEFAULT 0,
|
||||
last_msg_sub_type INTEGER DEFAULT 0,
|
||||
last_msg_sender TEXT DEFAULT '',
|
||||
last_sender_display_name TEXT DEFAULT ''
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
|
||||
class TestChatRealtimeSyncAllUpdatesSenderDisplayName(unittest.TestCase):
|
||||
def test_sync_all_upserts_last_sender_display_name(self):
|
||||
with TemporaryDirectory() as td:
|
||||
account_dir = Path(td) / "acc"
|
||||
account_dir.mkdir(parents=True, exist_ok=True)
|
||||
_seed_session_db(account_dir / "session.db")
|
||||
|
||||
conn = _DummyConn()
|
||||
sessions_rows = [
|
||||
{
|
||||
"username": "demo@chatroom",
|
||||
"unread_count": 0,
|
||||
"is_hidden": 0,
|
||||
"summary": "hello",
|
||||
"draft": "",
|
||||
"last_timestamp": 123,
|
||||
"sort_timestamp": 123,
|
||||
"last_msg_type": 1,
|
||||
"last_msg_sub_type": 0,
|
||||
"last_msg_sender": "wxid_demo",
|
||||
"last_sender_display_name": "群名片A",
|
||||
"last_msg_locald_id": 777,
|
||||
}
|
||||
]
|
||||
|
||||
with (
|
||||
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
|
||||
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
|
||||
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
|
||||
patch.object(chat_router, "_ensure_decrypted_message_tables", return_value={}),
|
||||
patch.object(chat_router, "_should_keep_session", return_value=True),
|
||||
):
|
||||
resp = chat_router.sync_chat_realtime_messages_all(
|
||||
_DummyRequest(),
|
||||
account="acc",
|
||||
max_scan=20,
|
||||
include_hidden=True,
|
||||
include_official=True,
|
||||
)
|
||||
|
||||
self.assertEqual(resp.get("status"), "success")
|
||||
|
||||
db = sqlite3.connect(str(account_dir / "session.db"))
|
||||
try:
|
||||
row = db.execute(
|
||||
"SELECT last_sender_display_name, last_msg_sender, last_msg_locald_id FROM SessionTable WHERE username = ? LIMIT 1",
|
||||
("demo@chatroom",),
|
||||
).fetchone()
|
||||
finally:
|
||||
db.close()
|
||||
|
||||
self.assertIsNotNone(row)
|
||||
self.assertEqual(str(row[0] or ""), "群名片A")
|
||||
self.assertEqual(str(row[1] or ""), "wxid_demo")
|
||||
self.assertEqual(int(row[2] or 0), 777)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
import sqlite3
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool.chat_helpers import (
|
||||
_build_group_sender_display_name_map,
|
||||
_normalize_session_preview_text,
|
||||
_replace_preview_sender_prefix,
|
||||
)
|
||||
|
||||
|
||||
class TestChatSessionPreviewFormatting(unittest.TestCase):
|
||||
def test_normalize_session_preview_emoji_label(self):
|
||||
out = _normalize_session_preview_text("[表情]", is_group=False, sender_display_names={})
|
||||
self.assertEqual(out, "[动画表情]")
|
||||
|
||||
def test_normalize_group_preview_sender_display_name(self):
|
||||
out = _normalize_session_preview_text(
|
||||
"wxid_u3gwceqvne2m22: [表情]",
|
||||
is_group=True,
|
||||
sender_display_names={"wxid_u3gwceqvne2m22": "食神"},
|
||||
)
|
||||
self.assertEqual(out, "食神: [动画表情]")
|
||||
|
||||
def test_build_group_sender_display_name_map_from_contact_db(self):
|
||||
with TemporaryDirectory() as td:
|
||||
contact_db_path = Path(td) / "contact.db"
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE contact (
|
||||
username TEXT,
|
||||
remark TEXT,
|
||||
nick_name TEXT,
|
||||
alias TEXT,
|
||||
big_head_url TEXT,
|
||||
small_head_url TEXT
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?)",
|
||||
("wxid_u3gwceqvne2m22", "", "食神", "", "", ""),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
mapping = _build_group_sender_display_name_map(
|
||||
contact_db_path,
|
||||
{"demo@chatroom": "wxid_u3gwceqvne2m22: [动画表情]"},
|
||||
)
|
||||
self.assertEqual(mapping.get("wxid_u3gwceqvne2m22"), "食神")
|
||||
|
||||
def test_replace_preview_sender_prefix_uses_group_nickname(self):
|
||||
out = _replace_preview_sender_prefix("去码头整点🍟: [动画表情]", "麻辣香锅")
|
||||
self.assertEqual(out, "麻辣香锅: [动画表情]")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,103 @@
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
|
||||
from wechat_decrypt_tool.routers import chat as chat_router
|
||||
|
||||
|
||||
class _DummyRequest:
|
||||
base_url = "http://testserver/"
|
||||
|
||||
|
||||
class _DummyConn:
|
||||
def __init__(self) -> None:
|
||||
self.handle = 1
|
||||
self.lock = threading.Lock()
|
||||
|
||||
|
||||
class TestChatSessionsRealtimeSenderPreview(unittest.TestCase):
|
||||
def _run(self, sessions_rows: list[dict]) -> dict:
|
||||
with TemporaryDirectory() as td:
|
||||
account_dir = Path(td) / "acc"
|
||||
account_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
conn = _DummyConn()
|
||||
with (
|
||||
patch.object(chat_router, "_resolve_account_dir", return_value=account_dir),
|
||||
patch.object(chat_router.WCDB_REALTIME, "ensure_connected", return_value=conn),
|
||||
patch.object(chat_router, "_wcdb_get_sessions", return_value=sessions_rows),
|
||||
patch.object(chat_router, "_wcdb_get_display_names", return_value={}),
|
||||
patch.object(chat_router, "_wcdb_get_avatar_urls", return_value={}),
|
||||
patch.object(chat_router, "_load_contact_rows", return_value={}),
|
||||
patch.object(chat_router, "_query_head_image_usernames", return_value=set()),
|
||||
patch.object(chat_router, "_should_keep_session", return_value=True),
|
||||
patch.object(chat_router, "_avatar_url_unified", return_value="/avatar"),
|
||||
):
|
||||
return chat_router.list_chat_sessions(
|
||||
_DummyRequest(),
|
||||
account="acc",
|
||||
limit=50,
|
||||
include_hidden=True,
|
||||
include_official=True,
|
||||
preview="latest",
|
||||
source="realtime",
|
||||
)
|
||||
|
||||
def test_realtime_sessions_group_summary_prefixed_by_sender_display_name(self):
|
||||
resp = self._run(
|
||||
[
|
||||
{
|
||||
"username": "demo@chatroom",
|
||||
"summary": "hello",
|
||||
"draft": "",
|
||||
"unread_count": 0,
|
||||
"is_hidden": 0,
|
||||
"last_timestamp": 123,
|
||||
"sort_timestamp": 123,
|
||||
"last_msg_type": 1,
|
||||
"last_msg_sub_type": 0,
|
||||
"last_msg_sender": "wxid_demo",
|
||||
"last_sender_display_name": "群名片A",
|
||||
}
|
||||
]
|
||||
)
|
||||
self.assertEqual(resp.get("status"), "success")
|
||||
sessions = resp.get("sessions") or []
|
||||
self.assertEqual(len(sessions), 1)
|
||||
self.assertEqual(sessions[0].get("lastMessage"), "群名片A: hello")
|
||||
|
||||
def test_realtime_sessions_group_url_summary_keeps_scheme(self):
|
||||
resp = self._run(
|
||||
[
|
||||
{
|
||||
"username": "url@chatroom",
|
||||
"summary": "https://example.com/x",
|
||||
"draft": "",
|
||||
"unread_count": 0,
|
||||
"is_hidden": 0,
|
||||
"last_timestamp": 123,
|
||||
"sort_timestamp": 123,
|
||||
"last_msg_type": 1,
|
||||
"last_msg_sub_type": 0,
|
||||
"last_msg_sender": "wxid_demo",
|
||||
"last_sender_display_name": "群名片B",
|
||||
}
|
||||
]
|
||||
)
|
||||
self.assertEqual(resp.get("status"), "success")
|
||||
sessions = resp.get("sessions") or []
|
||||
self.assertEqual(len(sessions), 1)
|
||||
self.assertEqual(sessions[0].get("lastMessage"), "群名片B: https://example.com/x")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool.chat_helpers import _parse_system_message_content
|
||||
|
||||
|
||||
class TestChatSystemMessageParsing(unittest.TestCase):
|
||||
def test_extract_replacemsg_for_revoke(self):
|
||||
raw_text = (
|
||||
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA[“张三”撤回了一条消息]]>'
|
||||
"</replacemsg></revokemsg></sysmsg>"
|
||||
)
|
||||
self.assertEqual(_parse_system_message_content(raw_text), "“张三”撤回了一条消息")
|
||||
|
||||
def test_extract_nested_content_in_replacemsg(self):
|
||||
raw_text = (
|
||||
'<sysmsg type="revokemsg"><revokemsg><replacemsg><![CDATA['
|
||||
'<content>"黄智欢" 撤回了一条消息</content><revoketime>0</revoketime>'
|
||||
']]></replacemsg></revokemsg></sysmsg>'
|
||||
)
|
||||
self.assertEqual(_parse_system_message_content(raw_text), '"黄智欢" 撤回了一条消息')
|
||||
|
||||
def test_extract_revokemsg_text_when_replacemsg_missing(self):
|
||||
raw_text = "<revokemsg>你撤回了一条消息</revokemsg>"
|
||||
self.assertEqual(_parse_system_message_content(raw_text), "你撤回了一条消息")
|
||||
|
||||
def test_revoke_fallback_when_no_readable_text(self):
|
||||
raw_text = '<sysmsg type="revokemsg"></sysmsg>'
|
||||
self.assertEqual(_parse_system_message_content(raw_text), "撤回了一条消息")
|
||||
|
||||
def test_normal_system_message_still_cleaned(self):
|
||||
raw_text = "<sysmsg><template><![CDATA[ 张三 加入了群聊 ]]></template></sysmsg>"
|
||||
self.assertEqual(_parse_system_message_content(raw_text), "张三 加入了群聊")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -0,0 +1,114 @@
|
||||
import sqlite3
|
||||
import sys
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool.chat_helpers import _load_group_nickname_map_from_contact_db
|
||||
|
||||
|
||||
def _enc_varint(n: int) -> bytes:
|
||||
v = int(n)
|
||||
out = bytearray()
|
||||
while True:
|
||||
b = v & 0x7F
|
||||
v >>= 7
|
||||
if v:
|
||||
out.append(b | 0x80)
|
||||
else:
|
||||
out.append(b)
|
||||
break
|
||||
return bytes(out)
|
||||
|
||||
|
||||
def _enc_tag(field_no: int, wire_type: int) -> bytes:
|
||||
return _enc_varint((int(field_no) << 3) | int(wire_type))
|
||||
|
||||
|
||||
def _enc_len(field_no: int, data: bytes) -> bytes:
|
||||
b = bytes(data or b"")
|
||||
return _enc_tag(field_no, 2) + _enc_varint(len(b)) + b
|
||||
|
||||
|
||||
def _member_entry(*, inner: bytes) -> bytes:
|
||||
# contact.db ext_buffer uses repeated length-delimited submessages; the top-level field number is not important
|
||||
# for our best-effort parser, so we use field 1.
|
||||
return _enc_len(1, inner)
|
||||
|
||||
|
||||
class TestGroupNicknameExtBufferParsing(unittest.TestCase):
|
||||
def test_parse_pattern_a_field1_username_field2_display(self):
|
||||
chatroom = "demo@chatroom"
|
||||
username = "wxid_demo_123456"
|
||||
display = "群名片A"
|
||||
|
||||
inner = _enc_len(1, username.encode("utf-8")) + _enc_len(2, display.encode("utf-8"))
|
||||
ext_buffer = _member_entry(inner=inner)
|
||||
|
||||
with TemporaryDirectory() as td:
|
||||
contact_db_path = Path(td) / "contact.db"
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
|
||||
(1, chatroom, "", ext_buffer),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
|
||||
self.assertEqual(out.get(username), display)
|
||||
|
||||
def test_parse_pattern_b_field4_username_field1_display(self):
|
||||
chatroom = "demo2@chatroom"
|
||||
username = "wxid_demo_abcdef"
|
||||
display = "hjlbingo"
|
||||
|
||||
inner = _enc_len(4, username.encode("utf-8")) + _enc_len(1, display.encode("utf-8"))
|
||||
ext_buffer = _member_entry(inner=inner)
|
||||
|
||||
with TemporaryDirectory() as td:
|
||||
contact_db_path = Path(td) / "contact.db"
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
|
||||
)
|
||||
conn.execute(
|
||||
"INSERT INTO chat_room(id, username, owner, ext_buffer) VALUES (?, ?, ?, ?)",
|
||||
(1, chatroom, "", ext_buffer),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
out = _load_group_nickname_map_from_contact_db(contact_db_path, chatroom, [username])
|
||||
self.assertEqual(out.get(username), display)
|
||||
|
||||
def test_non_chatroom_returns_empty(self):
|
||||
with TemporaryDirectory() as td:
|
||||
contact_db_path = Path(td) / "contact.db"
|
||||
conn = sqlite3.connect(str(contact_db_path))
|
||||
try:
|
||||
conn.execute(
|
||||
"CREATE TABLE chat_room(id INTEGER PRIMARY KEY, username TEXT, owner TEXT, ext_buffer BLOB)"
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
out = _load_group_nickname_map_from_contact_db(contact_db_path, "wxid_not_chatroom", ["wxid_xxx"])
|
||||
self.assertEqual(out, {})
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user