Merge branch 'main' into feat/sns-media

This commit is contained in:
H3CoF6
2026-02-14 00:27:52 +08:00
25 changed files with 9077 additions and 86 deletions

View File

@@ -57,7 +57,12 @@ const contentClass = computed(() =>
: 'flex-1 overflow-auto min-h-0'
)
const showSidebar = computed(() => !String(route.path || '').startsWith('/wrapped'))
const showSidebar = computed(() => {
const path = String(route.path || '')
if (path === '/') return false
if (path === '/decrypt' || path === '/detection-result' || path === '/decrypt-result') return false
return !(path === '/wrapped' || path.startsWith('/wrapped/'))
})
</script>
<style>

View File

@@ -0,0 +1,793 @@
<template>
<div ref="cardRoot" class="h-full w-full">
<WrappedCardShell :card-id="card.id" :title="card.title" :narrative="''" :variant="variant">
<template #narrative>
<div class="mt-1 wrapped-body text-sm sm:text-base text-[#6F6F6F] leading-relaxed">
<p class="whitespace-normal">
<template v-for="(seg, idx) in narrativeSegments" :key="`n-${idx}`">
<img
v-if="seg.type === 'emoji'"
:src="seg.src"
class="inline-block align-[-0.18em] rounded-[3px] wx-inline-emoji"
:style="{ width: `${seg.sizeEm}em`, height: `${seg.sizeEm}em` }"
:alt="seg.alt || 'emoji'"
/>
<img
v-else-if="seg.type === 'sticker'"
:src="seg.src"
class="inline-block align-[-0.16em] rounded-[4px] wx-inline-emoji"
:style="{ width: `${seg.sizeEm}em`, height: `${seg.sizeEm}em` }"
:alt="seg.alt || 'sticker'"
/>
<span
v-else-if="seg.type === 'num'"
class="wrapped-number text-[#07C160] font-semibold"
>
{{ seg.content }}
</span>
<span v-else>{{ seg.content }}</span>
</template>
</p>
</div>
</template>
<div class="w-full -mt-1 sm:-mt-2">
<div class="grid grid-cols-1 lg:grid-cols-12 gap-2">
<div class="lg:col-span-7 space-y-2 sm:space-y-2.5">
<div class="rounded-2xl border border-[#EDEDED] bg-white/65 p-2.5 sm:p-3">
<div class="wrapped-label text-xs text-[#00000066]">高频表情卡堆Vue Bits</div>
<div v-if="stackCardsData.length > 0" class="mt-2">
<div class="relative h-[9.4rem] sm:h-[9.8rem] rounded-xl overflow-visible">
<div class="absolute inset-0 flex items-center justify-center">
<Stack
:randomRotation="true"
:sensitivity="180"
:sendToBackOnClick="false"
:cardDimensions="stackCardDimensions"
:cardsData="stackCardsData"
/>
</div>
</div>
<div class="mt-2 flex flex-col items-center justify-center text-center">
<div class="wrapped-number text-base text-[#07C160] font-semibold leading-tight">
{{ formatInt(Number(stackTopCount || 0)) }}
</div>
<div
class="mt-0.5 inline-flex items-center gap-1.5 rounded-md bg-[#00000008] px-1.5 py-1 max-w-full"
:title="heroStickerOwnerName ? `常发送给 ${heroStickerOwnerName}` : '常发送给:未知'"
>
<span class="w-4 h-4 rounded-md overflow-hidden bg-[#0000000d] flex items-center justify-center flex-shrink-0">
<img
v-if="heroStickerOwnerAvatarUrl && avatarOk.topStickerOwner"
:src="heroStickerOwnerAvatarUrl"
class="w-full h-full object-cover"
alt="avatar"
@error="avatarOk.topStickerOwner = false"
/>
<span v-else class="wrapped-number text-[10px] text-[#00000066]">
{{ avatarFallback(heroStickerOwnerName) }}
</span>
</span>
<span class="wrapped-body text-[11px] text-[#00000080] truncate">
常发送给 <span class="text-[#07C160] font-semibold">{{ heroStickerOwnerName || '未知' }}</span>
</span>
</div>
<div class="mt-1 wrapped-label text-[10px] text-[#00000055]">拖动表情卡片翻一翻</div>
</div>
</div>
<div v-else class="mt-2 wrapped-body text-xs text-[#00000055]">
暂无可展示的高频表情图片
</div>
</div>
<div class="rounded-2xl border border-[#EDEDED] bg-white/65 p-2.5 sm:p-3">
<div class="flex items-center justify-between gap-3">
<div class="wrapped-label text-xs text-[#00000066]">Emoji Top小黄脸 + Unicode</div>
<div class="flex items-center gap-2 flex-shrink-0">
<span class="wrapped-label text-[10px] px-2 py-0.5 rounded-md border bg-[#07C160]/10 text-[#07C160] border-[#07C160]/25">
小黄脸
</span>
<span class="wrapped-label text-[10px] px-2 py-0.5 rounded-md border bg-[#0EA5E9]/10 text-[#0EA5E9] border-[#0EA5E9]/25">
Unicode
</span>
</div>
</div>
<div v-if="emojiBubbleRows.length > 0" class="mt-2">
<div class="grid grid-cols-4 sm:grid-cols-8 gap-2 sm:gap-2.5 place-items-end">
<div
v-for="row in emojiBubbleRows"
:key="row.id"
class="min-w-0 flex flex-col items-center gap-1"
:title="`${row.label} · ${formatInt(row.count)}次`"
>
<div
class="flex items-center justify-center rounded-full shadow-sm"
:class="row.kind === 'wechat' ? 'bg-[#07C160]/14' : 'bg-[#0EA5E9]/12'"
:style="{ width: `${row.size}px`, height: `${row.size}px` }"
>
<img
v-if="row.kind === 'wechat' && row.assetPath && emojiAssetOk[row.key] !== false"
:src="resolveEmojiAsset(row.assetPath)"
class="w-3/4 h-3/4 object-contain"
alt="emoji"
@error="emojiAssetOk[row.key] = false"
/>
<span v-else class="text-xl leading-none">
{{ row.kind === 'unicode' ? row.label : '🙂' }}
</span>
</div>
<div
class="wrapped-number text-[10px] font-semibold leading-none"
:class="row.kind === 'wechat' ? 'text-[#07C160]' : 'text-[#0EA5E9]'"
>
{{ formatInt(row.count) }}
</div>
</div>
</div>
</div>
<div v-else class="mt-2 wrapped-body text-xs text-[#00000066]">
今年没有统计到可识别的 Emoji小黄脸/Unicode
</div>
</div>
</div>
<div class="lg:col-span-5 h-full min-h-[20rem] sm:min-h-[21.5rem] rounded-2xl border border-[#EDEDED] bg-white/65 p-2.5 sm:p-3 relative overflow-hidden">
<div class="relative z-[1] h-full flex flex-col">
<div class="wrapped-label text-xs text-[#00000066]">斗图热力时段</div>
<div class="mt-1 wrapped-body text-sm text-[#000000e6]">
<template v-if="peakHour !== null && peakWeekdayName">
高峰在
<span class="wrapped-number text-[#07C160] font-semibold">{{ peakWeekdayName }} {{ peakHour }}:00</span>
</template>
<template v-else>
今年没有明显的斗图高峰时段
</template>
</div>
<div class="mt-2.5 h-16 sm:h-20 flex items-end gap-[2px]">
<div
v-for="item in hourBars"
:key="`hour-${item.hour}`"
class="flex-1 min-w-0 rounded-sm bg-[#07C160]/20"
:style="{ height: `${item.heightPct}%` }"
:title="`${item.hour}:00 · ${formatInt(item.count)}次`"
/>
</div>
<div class="mt-1.5 flex items-center justify-between wrapped-label text-[10px] text-[#00000055]">
<span>00</span>
<span>06</span>
<span>12</span>
<span>18</span>
<span>23</span>
</div>
<div class="mt-3 rounded-xl border border-[#EDEDED] bg-white/60 p-2.5 flex-1 flex flex-col">
<div class="wrapped-label text-[11px] text-[#00000066]">表情新鲜度</div>
<div class="mt-2 grid grid-cols-1 gap-2.5">
<div class="rounded-lg bg-[#07C160]/10 px-3 py-2.5 flex items-center justify-between gap-3">
<div class="min-w-0">
<div class="wrapped-label text-[11px] text-[#00000066]">年度新解锁</div>
<div class="mt-1.5 wrapped-number text-lg text-[#07C160] font-semibold leading-tight">
{{ formatInt(newStickerCountThisYear) }}
</div>
<div class="wrapped-label text-[11px] text-[#00000055]">
占类型 {{ newStickerSharePct }}%
</div>
</div>
<div v-if="newStickerDecorDisplayItems.length > 0" class="grid grid-cols-3 gap-2 flex-shrink-0">
<div
v-for="item in newStickerDecorDisplayItems"
:key="`new-chip-${item.id}`"
class="w-11 h-11 sm:w-12 sm:h-12 rounded-[10px] overflow-hidden ring-1 ring-[#00000014] bg-white/60"
>
<img
v-if="item.src && stickerDecorOk[item.id] !== false"
:src="item.src"
class="w-full h-full object-cover"
alt=""
@error="stickerDecorOk[item.id] = false"
/>
</div>
</div>
</div>
<div class="rounded-lg bg-[#0EA5E9]/10 px-3 py-2.5 flex items-center justify-between gap-3">
<div class="min-w-0">
<div class="wrapped-label text-[11px] text-[#00000066]">回温表情</div>
<div class="mt-1.5 wrapped-number text-lg text-[#0EA5E9] font-semibold leading-tight">
{{ formatInt(revivedStickerCount) }}
</div>
<div class="wrapped-label text-[11px] text-[#00000055] leading-tight">
间隔{{ formatInt(revivedMinGapDays) }}最长 {{ formatInt(revivedMaxGapDays) }}
</div>
</div>
<div v-if="revivedStickerDecorDisplayItems.length > 0" class="grid grid-cols-3 gap-2 flex-shrink-0">
<div
v-for="item in revivedStickerDecorDisplayItems"
:key="`rev-chip-${item.id}`"
class="w-11 h-11 sm:w-12 sm:h-12 rounded-[10px] overflow-hidden ring-1 ring-[#00000014] bg-white/60"
>
<img
v-if="item.src && stickerDecorOk[item.id] !== false"
:src="item.src"
class="w-full h-full object-cover"
alt=""
@error="stickerDecorOk[item.id] = false"
/>
</div>
</div>
</div>
</div>
<div class="mt-2 wrapped-body text-[11px] text-[#00000066]">
今年共用过 <span class="wrapped-number text-[#07C160] font-semibold">{{ formatInt(uniqueStickerTypeCount) }}</span> 种表情
回温占比 <span class="wrapped-number text-[#0EA5E9] font-semibold">{{ revivedStickerSharePct }}</span>%
</div>
</div>
</div>
</div>
</div>
</div>
</WrappedCardShell>
<Teleport to="body">
<div v-if="cursorFxEnabled && isCardVisible && cursorTrails.length > 0" class="emoji-cursor-layer" aria-hidden="true">
<img
v-for="item in cursorTrails"
:key="item.id"
class="emoji-cursor-item"
:style="{
left: `${item.x}px`,
top: `${item.y}px`,
width: `${item.size}px`,
height: `${item.size}px`,
'--drift-x': `${item.driftX}px`,
'--drift-y': `${item.driftY}px`
}"
:src="item.src"
alt=""
draggable="false"
/>
</div>
</Teleport>
</div>
</template>
<script setup>
import { computed, onBeforeUnmount, onMounted, reactive, ref, watch } from 'vue'
import Stack from '~/components/wrapped/shared/VueBitsStack.vue'
import WechatEmojiTable, { parseTextWithEmoji } from '~/utils/wechat-emojis'
const props = defineProps({
card: { type: Object, required: true },
variant: { type: String, default: 'panel' } // 'panel' | 'slide'
})
const nfInt = new Intl.NumberFormat('zh-CN', { maximumFractionDigits: 0 })
const formatInt = (n) => nfInt.format(Math.round(Number(n) || 0))
const mediaBase = process.client ? 'http://localhost:8000' : ''
const resolveMediaUrl = (value, opts = { backend: false }) => {
const raw = String(value || '').trim()
if (!raw) return ''
if (/^https?:\/\//i.test(raw)) {
try {
const host = new URL(raw).hostname.toLowerCase()
if (host.endsWith('.qpic.cn') || host.endsWith('.qlogo.cn')) {
return `${mediaBase}/api/chat/media/proxy_image?url=${encodeURIComponent(raw)}`
}
} catch {}
return raw
}
if (opts.backend || raw.startsWith('/api/')) {
return `${mediaBase}${raw.startsWith('/') ? '' : '/'}${raw}`
}
return raw.startsWith('/') ? raw : `/${raw}`
}
const resolveEmojiAsset = (value) => {
const raw = String(value || '').trim()
if (!raw) return ''
if (/^https?:\/\//i.test(raw)) return raw
if (raw.startsWith('/wxemoji/')) return raw
if (raw.startsWith('/')) return raw
return `/wxemoji/${raw}`
}
const resolveStickerUrl = (st) => {
const remote = resolveMediaUrl(st?.emojiUrl, { backend: true })
if (remote) return remote
const asset = resolveEmojiAsset(st?.emojiAssetPath)
return asset || ''
}
const splitTextByNumbers = (text) => {
const raw = String(text || '')
if (!raw) return []
const parts = raw.split(/(\d[\d,.]*)/g)
const out = []
for (const p of parts) {
if (!p) continue
if (/^\d[\d,.]*$/.test(p)) out.push({ type: 'num', content: p })
else out.push({ type: 'text', content: p })
}
return out
}
const appendParsedText = (segments, text) => {
const parsed = parseTextWithEmoji(String(text || ''))
for (const seg of parsed) {
if (seg?.type === 'emoji' && seg.emojiSrc) {
segments.push({
type: 'emoji',
src: resolveEmojiAsset(seg.emojiSrc),
alt: seg.content || 'emoji',
sizeEm: 1.12
})
continue
}
const inner = splitTextByNumbers(seg?.content || '')
for (const x of inner) segments.push(x)
}
}
const sentStickerCount = computed(() => Number(props.card?.data?.sentStickerCount || 0))
const stickerActiveDays = computed(() => Number(props.card?.data?.stickerActiveDays || 0))
const peakHour = computed(() => {
const h = props.card?.data?.peakHour
return Number.isFinite(Number(h)) ? Number(h) : null
})
const peakWeekdayName = computed(() => String(props.card?.data?.peakWeekdayName || '').trim())
const stickerPerActiveDayText = computed(() => {
const v = Number(props.card?.data?.stickerPerActiveDay || 0)
return Number.isFinite(v) ? v.toFixed(1) : '0.0'
})
const uniqueStickerTypeCount = computed(() => Number(props.card?.data?.uniqueStickerTypeCount || 0))
const newStickerCountThisYear = computed(() => Number(props.card?.data?.newStickerCountThisYear || 0))
const revivedStickerCount = computed(() => Number(props.card?.data?.revivedStickerCount || 0))
const revivedMinGapDays = computed(() => Number(props.card?.data?.revivedMinGapDays || 60))
const revivedMaxGapDays = computed(() => Number(props.card?.data?.revivedMaxGapDays || 0))
const newStickerSharePct = computed(() => {
const v = Number(props.card?.data?.newStickerShare)
if (Number.isFinite(v) && v >= 0) return Math.max(0, Math.min(100, Math.round(v * 100)))
const total = Math.max(0, Number(uniqueStickerTypeCount.value || 0))
if (total <= 0) return 0
return Math.max(0, Math.min(100, Math.round((Number(newStickerCountThisYear.value || 0) / total) * 100)))
})
const revivedStickerSharePct = computed(() => {
const v = Number(props.card?.data?.revivedStickerShare)
if (Number.isFinite(v) && v >= 0) return Math.max(0, Math.min(100, Math.round(v * 100)))
const total = Math.max(0, Number(uniqueStickerTypeCount.value || 0))
if (total <= 0) return 0
return Math.max(0, Math.min(100, Math.round((Number(revivedStickerCount.value || 0) / total) * 100)))
})
const newStickerSamples = computed(() => {
const arr = props.card?.data?.newStickerSamples
return Array.isArray(arr) ? arr : []
})
const revivedStickerSamples = computed(() => {
const arr = props.card?.data?.revivedStickerSamples
return Array.isArray(arr) ? arr : []
})
const buildDecorStickerItems = (rows, prefix) => {
const out = []
const seen = new Set()
for (let idx = 0; idx < rows.length; idx += 1) {
const st = rows[idx] || {}
const rawId = String(st?.md5 || st?.emojiAssetPath || st?.emojiUrl || `${prefix}-${idx}`).trim()
if (!rawId || seen.has(rawId)) continue
seen.add(rawId)
out.push({
id: `${prefix}-${rawId}`,
src: resolveStickerUrl(st),
count: Math.max(0, Number(st?.count || 0)),
gapDays: Math.max(0, Number(st?.gapDays || 0))
})
if (out.length >= 4) break
}
return out
}
const topStickers = computed(() => {
const arr = props.card?.data?.topStickers
return Array.isArray(arr) ? arr : []
})
const newStickerDecorItems = computed(() => buildDecorStickerItems(newStickerSamples.value, 'new'))
const revivedStickerDecorItems = computed(() => buildDecorStickerItems(revivedStickerSamples.value, 'revived'))
const topStickerDecorItems = computed(() => buildDecorStickerItems(topStickers.value, 'top'))
const toRenderableDecor = (items) => items.filter((x) => String(x?.src || '').trim())
const decorFallbackPool = computed(() => {
const out = []
const seen = new Set()
for (const item of [
...toRenderableDecor(newStickerDecorItems.value),
...toRenderableDecor(revivedStickerDecorItems.value),
...toRenderableDecor(topStickerDecorItems.value)
]) {
const key = String(item?.src || '').trim()
if (!key || seen.has(key)) continue
seen.add(key)
out.push(item)
if (out.length >= 6) break
}
return out
})
const newStickerDecorDisplayItems = computed(() => {
const own = toRenderableDecor(newStickerDecorItems.value).slice(0, 3)
if (own.length > 0) return own
return decorFallbackPool.value.slice(0, 3)
})
const revivedStickerDecorDisplayItems = computed(() => {
const own = toRenderableDecor(revivedStickerDecorItems.value).slice(0, 3)
if (own.length > 0) return own
const fallback = decorFallbackPool.value.slice(3, 6)
return fallback.length > 0 ? fallback : decorFallbackPool.value.slice(0, 3)
})
const heroSticker = computed(() => {
const arr = topStickers.value
return Array.isArray(arr) && arr.length > 0 ? arr[0] : null
})
const heroStickerUrl = computed(() => resolveStickerUrl(heroSticker.value))
const heroStickerOwnerName = computed(() => String(heroSticker.value?.sampleDisplayName || heroSticker.value?.sampleUsername || '').trim())
const heroStickerOwnerAvatarUrl = computed(() => resolveMediaUrl(heroSticker.value?.sampleAvatarUrl, { backend: true }))
const stackCardDimensions = { width: 140, height: 140 }
const stackCardsData = computed(() => {
const seen = new Set()
const out = []
for (const st of topStickers.value) {
const key = String(st?.md5 || st?.emojiAssetPath || st?.emojiUrl || '').trim()
if (!key || seen.has(key)) continue
const src = resolveStickerUrl(st)
if (!src) continue
seen.add(key)
out.push({ id: key, img: src })
if (out.length >= 4) break
}
if (out.length > 0) return out
return allWechatEmojiAssets.value.slice(0, 4).map((img, idx) => ({ id: `wx-${idx}`, img }))
})
const stackTopCount = computed(() => Number(heroSticker.value?.count || 0))
const hourBars = computed(() => {
const raw = Array.isArray(props.card?.data?.stickerHourCounts) ? props.card.data.stickerHourCounts : []
const counts = Array.from({ length: 24 }, (_, i) => Math.max(0, Number(raw[i] || 0)))
const maxV = Math.max(1, ...counts)
return counts.map((count, hour) => ({
hour,
count,
heightPct: Math.max(8, Math.round((count / maxV) * 100))
}))
})
const topTextEmojis = computed(() => {
const arr = props.card?.data?.topTextEmojis
return Array.isArray(arr) ? arr : []
})
const topWechatEmojis = computed(() => {
const arr = props.card?.data?.topWechatEmojis
return Array.isArray(arr) ? arr : []
})
const topUnicodeEmojis = computed(() => {
const arr = props.card?.data?.topUnicodeEmojis
return Array.isArray(arr) ? arr : []
})
const smallWechatEmojiChips = computed(() => {
if (topWechatEmojis.value.length > 0) {
return topWechatEmojis.value.map((x) => ({
key: String(x?.key || ''),
count: Number(x?.count || 0),
assetPath: String(x?.assetPath || '')
}))
}
return topTextEmojis.value.map((x) => ({
key: String(x?.key || ''),
count: Number(x?.count || 0),
assetPath: String(x?.assetPath || '')
}))
})
const emojiAssetOk = reactive({})
watch(
smallWechatEmojiChips,
(arr) => {
for (const k of Object.keys(emojiAssetOk)) delete emojiAssetOk[k]
if (!Array.isArray(arr)) return
for (const em of arr) {
const key = String(em?.key || '').trim()
if (key) emojiAssetOk[key] = true
}
},
{ immediate: true }
)
const emojiChartRows = computed(() => {
const wechat = smallWechatEmojiChips.value
.slice(0, 4)
.map((x, idx) => ({
id: `w-${String(x?.key || idx)}`,
kind: 'wechat',
key: String(x?.key || idx),
label: String(x?.key || '').trim() || '微信表情',
count: Math.max(0, Number(x?.count || 0)),
assetPath: String(x?.assetPath || '').trim()
}))
const unicode = topUnicodeEmojis.value
.slice(0, 4)
.map((x, idx) => ({
id: `u-${String(x?.emoji || idx)}`,
kind: 'unicode',
key: String(x?.emoji || idx),
label: String(x?.emoji || '').trim() || '😀',
count: Math.max(0, Number(x?.count || 0)),
assetPath: ''
}))
const rows = [...wechat, ...unicode].filter((x) => x.count > 0 && x.label)
const maxV = Math.max(1, ...rows.map((x) => x.count))
return rows
.sort((a, b) => b.count - a.count)
.map((x) => ({
...x,
pct: Math.max(8, Math.round((x.count / maxV) * 100))
}))
})
const emojiBubbleRows = computed(() => {
const rows = emojiChartRows.value
if (!rows.length) return []
const maxV = Math.max(1, ...rows.map((x) => x.count))
return rows.map((x) => {
const ratio = Math.max(0, Math.min(1, x.count / maxV))
const size = Math.round(32 + Math.sqrt(ratio) * 36)
return { ...x, size: Math.max(28, Math.min(72, size)) }
})
})
const stickerDecorOk = reactive({})
watch(
() => [...newStickerDecorItems.value, ...revivedStickerDecorItems.value],
(arr) => {
for (const k of Object.keys(stickerDecorOk)) delete stickerDecorOk[k]
if (!Array.isArray(arr)) return
for (const x of arr) {
const key = String(x?.id || '').trim()
if (key) stickerDecorOk[key] = true
}
},
{ immediate: true }
)
const avatarOk = reactive({ topStickerOwner: true })
watch(heroStickerOwnerAvatarUrl, () => { avatarOk.topStickerOwner = true })
const avatarFallback = (name) => {
const s = String(name || '').trim()
return s ? s[0] : '?'
}
const narrativeSegments = computed(() => {
const out = []
if (sentStickerCount.value > 0) {
appendParsedText(
out,
`这一年,你用 ${formatInt(sentStickerCount.value)} 张表情包把聊天变得更有温度;在 ${formatInt(
stickerActiveDays.value
)} 个活跃日里,日均 ${stickerPerActiveDayText.value} 张。`
)
} else {
appendParsedText(out, '这一年你几乎没发过表情包。')
}
if (peakHour.value !== null && peakWeekdayName.value) {
appendParsedText(out, `你最活跃的时刻是 ${peakWeekdayName.value} ${peakHour.value}:00。`)
}
let hasTail = false
if (heroSticker.value) {
appendParsedText(out, '年度 C 位表情是 ')
if (heroStickerUrl.value) {
out.push({ type: 'sticker', src: heroStickerUrl.value, alt: '年度 C 位表情', sizeEm: 1.22 })
} else {
appendParsedText(out, '(图片缺失)')
}
appendParsedText(out, `${formatInt(Number(heroSticker.value?.count || 0))} 次)`)
hasTail = true
}
const topWechat = topWechatEmojis.value[0]
const topText = topTextEmojis.value[0]
if (topWechat) {
appendParsedText(out, `${hasTail ? '' : ''}你最常用的小黄脸是 `)
if (topWechat.assetPath) {
out.push({
type: 'emoji',
src: resolveEmojiAsset(topWechat.assetPath),
alt: topWechat.key || 'emoji',
sizeEm: 1.16
})
} else {
appendParsedText(out, topWechat.key || '')
}
appendParsedText(out, `${formatInt(Number(topWechat.count || 0))} 次)`)
hasTail = true
} else if (topText) {
appendParsedText(out, `${hasTail ? '' : ''}在文字聊天里,你最常打的小黄脸是 `)
if (topText.assetPath) {
out.push({
type: 'emoji',
src: resolveEmojiAsset(topText.assetPath),
alt: topText.key || 'emoji',
sizeEm: 1.16
})
} else {
appendParsedText(out, topText.key || '')
}
appendParsedText(out, `${formatInt(Number(topText.count || 0))} 次)`)
hasTail = true
} else {
appendParsedText(out, `${hasTail ? '' : ''}今年没有命中可识别的小黄脸`)
hasTail = true
}
const topUnicode = topUnicodeEmojis.value[0]
if (topUnicode) {
appendParsedText(
out,
`${hasTail ? '' : ''}普通 Emoji 最常用 ${topUnicode.emoji}${formatInt(Number(topUnicode.count || 0))} 次)。`
)
} else if (hasTail) {
appendParsedText(out, '。')
}
return out
})
const cardRoot = ref(null)
const isCardVisible = ref(false)
const cursorTrails = ref([])
const cursorFxEnabled = ref(true)
const allWechatEmojiAssets = computed(() => {
const values = Object.values(WechatEmojiTable || {})
const uniq = new Set()
for (const x of values) {
const p = resolveEmojiAsset(String(x || '').trim())
if (p) uniq.add(p)
}
return Array.from(uniq)
})
let trailSeq = 0
let lastSpawnAt = 0
let lastSpawnX = -9999
let lastSpawnY = -9999
const TRAIL_LIFETIME_MS = 780
const MIN_SPAWN_INTERVAL_MS = 28
const MIN_SPAWN_DISTANCE = 10
const MAX_TRAIL_COUNT = 32
const checkCardVisible = () => {
if (!process.client) return false
const rect = cardRoot.value?.getBoundingClientRect?.()
if (!rect) return false
const vh = window.innerHeight || 0
const vw = window.innerWidth || 0
return rect.bottom > vh * 0.12 && rect.top < vh * 0.88 && rect.right > 0 && rect.left < vw
}
const spawnCursorTrail = (x, y) => {
const pool = allWechatEmojiAssets.value
if (!pool.length) return
const src = pool[Math.floor(Math.random() * pool.length)]
const item = {
id: ++trailSeq,
x,
y,
src,
size: 18 + Math.floor(Math.random() * 8),
driftX: Math.round((Math.random() - 0.5) * 30),
driftY: 20 + Math.floor(Math.random() * 20)
}
cursorTrails.value = [...cursorTrails.value.slice(-MAX_TRAIL_COUNT), item]
setTimeout(() => {
cursorTrails.value = cursorTrails.value.filter((t) => t.id !== item.id)
}, TRAIL_LIFETIME_MS)
}
const onWindowPointerMove = (e) => {
if (!process.client || !cursorFxEnabled.value) return
if (e?.pointerType === 'touch') return
const visible = checkCardVisible()
isCardVisible.value = visible
if (!visible) return
const x = Number(e.clientX)
const y = Number(e.clientY)
if (!Number.isFinite(x) || !Number.isFinite(y)) return
const now = performance.now()
const dx = x - lastSpawnX
const dy = y - lastSpawnY
const dist = Math.hypot(dx, dy)
if ((now - lastSpawnAt) < MIN_SPAWN_INTERVAL_MS && dist < MIN_SPAWN_DISTANCE) return
lastSpawnAt = now
lastSpawnX = x
lastSpawnY = y
spawnCursorTrail(x, y)
}
const onWindowPointerLeave = () => {
lastSpawnX = -9999
lastSpawnY = -9999
}
onMounted(() => {
if (!process.client) return
try {
const mq = window.matchMedia('(prefers-reduced-motion: reduce)')
cursorFxEnabled.value = !mq.matches
} catch {
cursorFxEnabled.value = true
}
isCardVisible.value = checkCardVisible()
window.addEventListener('pointermove', onWindowPointerMove, { passive: true })
window.addEventListener('pointerleave', onWindowPointerLeave, { passive: true })
})
onBeforeUnmount(() => {
if (process.client) {
window.removeEventListener('pointermove', onWindowPointerMove)
window.removeEventListener('pointerleave', onWindowPointerLeave)
}
cursorTrails.value = []
})
</script>
<style scoped>
.wx-inline-emoji {
object-fit: contain;
}
.emoji-cursor-layer {
position: fixed;
inset: 0;
pointer-events: none;
overflow: hidden;
z-index: 80;
}
.emoji-cursor-item {
position: fixed;
pointer-events: none;
user-select: none;
transform: translate(-50%, -50%);
opacity: 0.94;
animation: emoji-cursor-float 780ms ease-out forwards;
filter: drop-shadow(0 1px 2px rgba(0, 0, 0, 0.18));
}
@keyframes emoji-cursor-float {
0% {
opacity: 0.94;
transform: translate(-50%, -50%) scale(0.88);
}
100% {
opacity: 0;
transform: translate(calc(-50% + var(--drift-x)), calc(-50% - var(--drift-y))) scale(1.16);
}
}
</style>

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,294 @@
<template>
<div class="vb-stack relative select-none" :style="stackStyle">
<div
v-for="(card, index) in cards"
:key="String(card.id)"
class="vb-stack-card absolute top-1/2 left-1/2 overflow-hidden rounded-2xl bg-white/80 shadow-sm"
:style="{
width: `${dims.width}px`,
height: `${dims.height}px`,
touchAction: index === cards.length - 1 ? 'none' : 'auto',
pointerEvents: index === cards.length - 1 ? 'auto' : 'none'
}"
:ref="(el) => onCardRef(card.id, el)"
@pointerdown="(e) => onPointerDown(e, card.id, index)"
@pointermove="onPointerMove"
@pointerup="onPointerUp"
@pointercancel="onPointerCancel"
>
<img :src="card.img" class="w-full h-full object-cover" alt="" draggable="false" />
</div>
</div>
</template>
<script setup lang="ts">
import { computed, nextTick, onMounted, ref, watch } from 'vue'
import { gsap } from 'gsap'
type StackCard = {
id: string | number
img: string
}
const props = withDefaults(
defineProps<{
randomRotation?: boolean
sensitivity?: number
sendToBackOnClick?: boolean
cardDimensions?: { width?: number; height?: number }
cardsData?: StackCard[]
}>(),
{
randomRotation: false,
sensitivity: 180,
sendToBackOnClick: true,
cardDimensions: () => ({ width: 200, height: 200 }),
cardsData: () => []
}
)
const dims = computed(() => {
const w = Number(props.cardDimensions?.width)
const h = Number(props.cardDimensions?.height)
return {
width: Number.isFinite(w) && w > 0 ? Math.floor(w) : 200,
height: Number.isFinite(h) && h > 0 ? Math.floor(h) : 200
}
})
const stackStyle = computed(() => ({
width: `${dims.value.width}px`,
height: `${dims.value.height}px`
}))
const cards = ref<StackCard[]>([])
const elMap = new Map<string, HTMLDivElement>()
const rotationMap = ref(new Map<string, number>())
const mounted = ref(false)
const ROT_RANGE_DEG = 7
const STACK_OFFSET_X = 2
const STACK_OFFSET_Y = 4
const STACK_SCALE_STEP = 0.03
const MAX_TILT_DEG = 22
function normalizeCards(data: unknown): StackCard[] {
const raw = Array.isArray(data) ? data : []
const out: StackCard[] = []
for (const item of raw) {
const id = (item as any)?.id
const img = String((item as any)?.img || '').trim()
if (id === null || id === undefined) continue
if (!img) continue
out.push({ id, img })
}
return out
}
function ensureRotations(list: StackCard[]) {
const next = new Map(rotationMap.value)
if (!props.randomRotation) {
for (const c of list) next.set(String(c.id), 0)
rotationMap.value = next
return
}
for (const c of list) {
const key = String(c.id)
if (next.has(key)) continue
next.set(key, Math.round((Math.random() * 2 - 1) * ROT_RANGE_DEG))
}
rotationMap.value = next
}
function onCardRef(id: StackCard['id'], el: Element | null) {
const key = String(id)
if (el && el instanceof HTMLDivElement) elMap.set(key, el)
else elMap.delete(key)
}
function applyLayout(animate: boolean) {
if (!mounted.value) return
const total = cards.value.length
if (total === 0) return
cards.value.forEach((card, idx) => {
const key = String(card.id)
const el = elMap.get(key)
if (!el) return
const orderFromTop = total - 1 - idx
const x = orderFromTop * STACK_OFFSET_X
const y = orderFromTop * STACK_OFFSET_Y
const scale = Math.max(0.88, 1 - orderFromTop * STACK_SCALE_STEP)
const rotation = rotationMap.value.get(key) ?? 0
const tweenVars: gsap.TweenVars = {
x,
y,
rotation,
rotationX: 0,
rotationY: 0,
scale,
xPercent: -50,
yPercent: -50,
zIndex: idx + 1,
transformOrigin: 'center center',
ease: 'power3.out',
duration: animate ? 0.32 : 0
}
gsap.killTweensOf(el)
if (animate) gsap.to(el, tweenVars)
else gsap.set(el, tweenVars)
})
}
watch(
() => props.cardsData,
(val) => {
const nextCards = normalizeCards(val)
cards.value = nextCards
ensureRotations(nextCards)
nextTick(() => applyLayout(false))
},
{ immediate: true }
)
watch(
() => [props.randomRotation, props.cardDimensions?.width, props.cardDimensions?.height] as const,
() => {
ensureRotations(cards.value)
nextTick(() => applyLayout(false))
}
)
onMounted(() => {
mounted.value = true
applyLayout(false)
})
const clamp = (v: number, min: number, max: number) => Math.min(max, Math.max(min, v))
let activePointerId: number | null = null
let activeCardId: string | null = null
let startClientX = 0
let startClientY = 0
let startX = 0
let startY = 0
let startRotationZ = 0
let startScale = 1
let lastDx = 0
let lastDy = 0
function sendToBack(id: string) {
if (cards.value.length < 2) return
const list = cards.value.slice()
const idx = list.findIndex((c) => String(c.id) === id)
if (idx < 0) return
const [card] = list.splice(idx, 1)
if (!card) return
cards.value = [card, ...list]
}
function onPointerDown(e: PointerEvent, id: StackCard['id'], index: number) {
if (activePointerId !== null) return
if (index !== cards.value.length - 1) return
const key = String(id)
const el = elMap.get(key)
if (!el) return
activePointerId = e.pointerId
activeCardId = key
startClientX = e.clientX
startClientY = e.clientY
startX = Number(gsap.getProperty(el, 'x')) || 0
startY = Number(gsap.getProperty(el, 'y')) || 0
startRotationZ = Number(gsap.getProperty(el, 'rotation')) || 0
startScale = Number(gsap.getProperty(el, 'scale')) || 1
lastDx = 0
lastDy = 0
try {
el.setPointerCapture(e.pointerId)
} catch {}
gsap.killTweensOf(el)
gsap.set(el, { zIndex: 999 })
gsap.to(el, { scale: startScale * 1.03, duration: 0.12, ease: 'power2.out' })
}
function onPointerMove(e: PointerEvent) {
if (activePointerId === null || e.pointerId !== activePointerId) return
const key = activeCardId
if (!key) return
const el = elMap.get(key)
if (!el) return
const dx = e.clientX - startClientX
const dy = e.clientY - startClientY
lastDx = dx
lastDy = dy
const w = Math.max(1, dims.value.width)
const h = Math.max(1, dims.value.height)
const nx = dx / (w * 0.55)
const ny = dy / (h * 0.55)
const tiltY = clamp(Math.tanh(nx) * MAX_TILT_DEG, -MAX_TILT_DEG, MAX_TILT_DEG)
const tiltX = clamp(-Math.tanh(ny) * MAX_TILT_DEG, -MAX_TILT_DEG, MAX_TILT_DEG)
gsap.set(el, {
x: startX + dx,
y: startY + dy,
rotation: startRotationZ + dx / 18,
rotationX: tiltX,
rotationY: tiltY
})
}
function finishPointer(id: number, shouldSendBack: boolean) {
const key = activeCardId
activePointerId = null
activeCardId = null
const el = key ? elMap.get(key) : null
if (el) {
try {
el.releasePointerCapture(id)
} catch {}
}
if (shouldSendBack && key) sendToBack(key)
applyLayout(true)
}
function onPointerUp(e: PointerEvent) {
if (activePointerId === null || e.pointerId !== activePointerId) return
const dist = Math.hypot(lastDx, lastDy)
const clickLike = dist < 6
const shouldSendBack = dist > Number(props.sensitivity || 0) || (props.sendToBackOnClick && clickLike)
finishPointer(e.pointerId, shouldSendBack)
}
function onPointerCancel(e: PointerEvent) {
if (activePointerId === null || e.pointerId !== activePointerId) return
finishPointer(e.pointerId, false)
}
</script>
<style scoped>
.vb-stack {
perspective: 1000px;
}
.vb-stack-card {
will-change: transform;
transform-style: preserve-3d;
backface-visibility: hidden;
cursor: grab;
}
.vb-stack-card:active {
cursor: grabbing;
}
</style>

View File

@@ -329,6 +329,10 @@ const PREVIEW_BY_KIND = {
'chat/reply_speed': {
summary: '回复速度',
question: '谁是你愿意秒回的那个人?'
},
'emoji/annual_universe': {
summary: '梗图年鉴',
question: '你这一年最常丢出的表情包是哪张?'
}
}

View File

@@ -294,6 +294,7 @@ export const useApi = () => {
media_kinds: Array.isArray(data.media_kinds) ? data.media_kinds : ['image', 'emoji', 'video', 'video_thumb', 'voice', 'file'],
output_dir: data.output_dir == null ? null : String(data.output_dir || '').trim(),
allow_process_key_extract: !!data.allow_process_key_extract,
download_remote_media: !!data.download_remote_media,
privacy_mode: !!data.privacy_mode,
file_name: data.file_name || null
}

View File

@@ -722,7 +722,7 @@
@click.stop="openChatHistoryModal(message)"
>
<div class="wechat-chat-history-body">
<div class="wechat-chat-history-title">{{ message.title || '聊天记录' }}</div>
<div class="wechat-chat-history-title">{{ message.title || '合并消息' }}</div>
<div class="wechat-chat-history-preview" v-if="getChatHistoryPreviewLines(message).length">
<div
v-for="(line, idx) in getChatHistoryPreviewLines(message)"
@@ -734,14 +734,15 @@
</div>
</div>
<div class="wechat-chat-history-bottom">
<span>聊天记录</span>
<span>合并消息</span>
</div>
</div>
<div v-else-if="message.renderType === 'transfer'"
class="wechat-transfer-card msg-radius"
:class="[{ 'wechat-transfer-received': message.transferReceived, 'wechat-transfer-returned': isTransferReturned(message) }, message.isSent ? 'wechat-transfer-sent-side' : 'wechat-transfer-received-side']">
:class="[{ 'wechat-transfer-received': message.transferReceived, 'wechat-transfer-returned': isTransferReturned(message), 'wechat-transfer-overdue': isTransferOverdue(message) }, message.isSent ? 'wechat-transfer-sent-side' : 'wechat-transfer-received-side']">
<div class="wechat-transfer-content">
<img src="/assets/images/wechat/wechat-returned.png" v-if="isTransferReturned(message)" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/overdue.png" v-else-if="isTransferOverdue(message)" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/wechat-trans-icon2.png" v-else-if="message.transferReceived" class="wechat-transfer-icon" alt="">
<img src="/assets/images/wechat/wechat-trans-icon1.png" v-else class="wechat-transfer-icon" alt="">
<div class="wechat-transfer-info">
@@ -1233,7 +1234,7 @@
@click.stop
>
<div class="px-4 py-3 bg-neutral-100 border-b border-gray-200 flex items-center justify-between">
<div class="text-sm text-[#161616] truncate">{{ chatHistoryModalTitle || '聊天记录' }}</div>
<div class="text-sm text-[#161616] truncate">{{ chatHistoryModalTitle || '合并消息' }}</div>
<button
type="button"
class="p-2 rounded hover:bg-black/5"
@@ -1495,6 +1496,10 @@
<input type="radio" value="txt" v-model="exportFormat" class="hidden" />
<span>TXT</span>
</label>
<label class="flex items-center gap-1.5 px-3 py-1.5 rounded-md border cursor-pointer transition-colors" :class="exportFormat === 'html' ? 'bg-[#03C160] text-white border-[#03C160]' : 'bg-white border-gray-200 text-gray-700 hover:bg-gray-50'">
<input type="radio" value="html" v-model="exportFormat" class="hidden" />
<span>HTML</span>
</label>
</div>
</div>
@@ -1516,6 +1521,19 @@
</div>
</div>
<div v-if="exportFormat === 'html'" class="mt-3">
<div class="text-sm font-medium text-gray-800 mb-2">HTML 选项</div>
<div class="p-3 bg-gray-50 rounded-md border border-gray-200">
<label class="flex items-start gap-2 text-sm text-gray-700">
<input type="checkbox" v-model="exportDownloadRemoteMedia" :disabled="privacyMode" />
<span>允许联网下载链接/引用缩略图提高离线完整性</span>
</label>
<div class="mt-1 text-xs text-gray-500">
HTML 生效会在导出时尝试下载远程缩略图并写入 ZIP已做安全限制隐私模式下自动忽略
</div>
</div>
</div>
<div v-if="exportScope === 'selected'" class="mt-3">
<div class="flex items-center gap-2 mb-2">
<button
@@ -2010,6 +2028,7 @@ const messageTypeFilterOptions = [
{ value: 'emoji', label: '表情' },
{ value: 'video', label: '视频' },
{ value: 'voice', label: '语音' },
{ value: 'chatHistory', label: '合并消息' },
{ value: 'transfer', label: '转账' },
{ value: 'redPacket', label: '红包' },
{ value: 'file', label: '文件' },
@@ -2488,13 +2507,15 @@ const exportError = ref('')
// current: 当前会话(映射为 selected + 单个 username
const exportScope = ref('current') // current | selected | all | groups | singles
const exportFormat = ref('json') // json | txt
const exportFormat = ref('json') // json | txt | html
const exportDownloadRemoteMedia = ref(true)
const exportMessageTypeOptions = [
{ value: 'text', label: '文本' },
{ value: 'image', label: '图片' },
{ value: 'emoji', label: '表情' },
{ value: 'video', label: '视频' },
{ value: 'voice', label: '语音' },
{ value: 'chatHistory', label: '合并消息' },
{ value: 'transfer', label: '转账' },
{ value: 'redPacket', label: '红包' },
{ value: 'file', label: '文件' },
@@ -3063,6 +3084,15 @@ const startChatExport = async () => {
const selectedTypeSet = new Set(messageTypes.map((t) => String(t || '').trim()))
const mediaKindSet = new Set()
if (selectedTypeSet.has('chatHistory')) {
// 合并消息内部可能包含任意媒体类型;即使只勾选了 chatHistory也需要打包媒体才可离线查看。
mediaKindSet.add('image')
mediaKindSet.add('emoji')
mediaKindSet.add('video')
mediaKindSet.add('video_thumb')
mediaKindSet.add('voice')
mediaKindSet.add('file')
}
if (selectedTypeSet.has('image')) mediaKindSet.add('image')
if (selectedTypeSet.has('emoji')) mediaKindSet.add('emoji')
if (selectedTypeSet.has('video')) {
@@ -3091,6 +3121,7 @@ const startChatExport = async () => {
message_types: messageTypes,
include_media: includeMedia,
media_kinds: mediaKinds,
download_remote_media: exportFormat.value === 'html' && !!exportDownloadRemoteMedia.value,
output_dir: isDesktopExportRuntime() ? String(exportFolder.value || '').trim() : null,
privacy_mode: !!privacyMode.value,
file_name: exportFileName.value || null
@@ -4017,6 +4048,16 @@ const isTransferReturned = (message) => {
return text.includes('退回') || text.includes('退还')
}
const isTransferOverdue = (message) => {
const paySubType = String(message?.paySubType || '').trim()
if (paySubType === '10') return true
const s = String(message?.transferStatus || '').trim()
const c = String(message?.content || '').trim()
const text = `${s} ${c}`.trim()
if (!text) return false
return text.includes('过期')
}
const getTransferTitle = (message) => {
const paySubType = String(message.paySubType || '').trim()
// paysubtype 含义:
@@ -4952,7 +4993,7 @@ const openChatHistoryQuote = (rec) => {
const openChatHistoryModal = (message) => {
if (!process.client) return
chatHistoryModalTitle.value = String(message?.title || '聊天记录')
chatHistoryModalTitle.value = String(message?.title || '合并消息')
const recordItem = String(message?.recordItem || '').trim()
const parsed = parseChatHistoryRecord(recordItem)
@@ -6100,6 +6141,24 @@ const LinkCard = defineComponent({
color: #fff;
}
/* 过期的转账样式 */
.wechat-transfer-overdue {
background: #E9CFB3;
}
.wechat-transfer-overdue::after {
background: #E9CFB3;
}
.wechat-transfer-overdue .wechat-transfer-amount,
.wechat-transfer-overdue .wechat-transfer-status {
color: #fff;
}
.wechat-transfer-overdue .wechat-transfer-bottom span {
color: #fff;
}
/* 红包消息样式 - 微信风格 */
.wechat-redpacket-card {
width: 210px;

View File

@@ -163,6 +163,12 @@
variant="slide"
class="h-full w-full"
/>
<Card04EmojiUniverse
v-else-if="c && (c.kind === 'emoji/annual_universe' || c.id === 4)"
:card="c"
variant="slide"
class="h-full w-full"
/>
<WrappedCardShell
v-else
:card-id="Number(c?.id || (idx + 1))"

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.5 KiB

File diff suppressed because it is too large Load Diff

View File

@@ -8,7 +8,7 @@ from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from urllib.parse import quote, urlparse
from urllib.parse import parse_qs, quote, urlparse
from fastapi import HTTPException
@@ -634,6 +634,32 @@ def _is_mp_weixin_article_url(url: str) -> bool:
return "mp.weixin.qq.com/" in lu
def _is_mp_weixin_feed_article_url(url: str) -> bool:
"""Detect WeChat's PC feed/recommendation mp.weixin.qq.com share URLs.
These links often carry an `exptype` like:
masonry_feed_brief_content_elite_for_pcfeeds_u2i
WeChat desktop tends to render them in a cover-card style (image + bottom title),
so we use this as a hint to choose the 'cover' linkStyle.
"""
u = str(url or "").strip()
if not u:
return False
try:
parsed = urlparse(u)
q = parse_qs(parsed.query or "")
for v in (q.get("exptype") or []):
if "masonry_feed" in str(v or "").lower():
return True
except Exception:
pass
return "exptype=masonry_feed" in u.lower()
def _classify_link_share(*, app_type: int, url: str, source_username: str, desc: str) -> tuple[str, str]:
src = str(source_username or "").strip().lower()
is_official_article = bool(
@@ -647,7 +673,15 @@ def _classify_link_share(*, app_type: int, url: str, source_username: str, desc:
hashtag_count = len(re.findall(r"#[^#\s]+", d))
# 公众号文章中「封面图 + 底栏标题」卡片特征:摘要以 #话题# 风格为主。
link_style = "cover" if (is_official_article and (d.startswith("#") or hashtag_count >= 2)) else "default"
cover_like = bool(
is_official_article
and (
d.startswith("#")
or hashtag_count >= 2
or _is_mp_weixin_feed_article_url(url)
)
)
link_style = "cover" if cover_like else "default"
return link_type, link_style
@@ -948,8 +982,12 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"recordItem": record_item or "",
}
if app_type in (5, 68) and url:
thumb_url = _normalize_xml_url(_extract_xml_tag_text(text, "thumburl"))
if app_type in (4, 5, 68) and url:
# Many appmsg link cards (notably Bilibili shares with <type>4</type>) include a <patMsg> metadata block.
# DO NOT treat "<patmsg" presence as a pat message: it would misclassify normal link cards as "[拍一拍]".
thumb_url = _normalize_xml_url(
_extract_xml_tag_text(text, "thumburl") or _extract_xml_tag_text(text, "cdnthumburl")
)
link_type, link_style = _classify_link_share(
app_type=app_type,
url=url,
@@ -1093,7 +1131,10 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"quoteVoiceLength": quote_voice_length,
}
if app_type == 62 or "<patmsg" in lower or 'type="patmsg"' in lower or "type='patmsg'" in lower:
# Some versions may mark pat messages via sysmsg/appmsg tag attribute: <sysmsg type="patmsg">...</sysmsg>.
# Be strict here: lots of non-pat appmsg payloads still carry a nested <patMsg>...</patMsg> metadata block.
patmsg_attr = bool(re.search(r"<(sysmsg|appmsg)\b[^>]*\btype=['\"]patmsg['\"]", lower))
if app_type == 62 or patmsg_attr:
return {"renderType": "system", "content": "[拍一拍]"}
if app_type == 2000 or (

View File

@@ -2742,6 +2742,90 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None:
# - 将原始转账消息1/8回填为“已被接收”
# - 若同一 transferId 同时存在原始消息与 paysubtype=3 消息,则将 paysubtype=3 的那条校正为“已收款”
def _is_transfer_expired_system_message(text: Any) -> bool:
content = str(text or "").strip()
if not content:
return False
if "转账" not in content or "过期" not in content:
return False
if "未接收" in content and ("24小时" in content or "二十四小时" in content):
return True
return "已过期" in content and ("收款方" in content or "转账" in content)
def _mark_pending_transfers_expired_by_system_messages() -> set[str]:
expired_system_times: list[int] = []
pending_candidates: list[tuple[int, int]] = [] # (index, createTime)
for idx, msg in enumerate(merged):
rt = str(msg.get("renderType") or "").strip()
if rt == "system":
if _is_transfer_expired_system_message(msg.get("content")):
try:
ts = int(msg.get("createTime") or 0)
except Exception:
ts = 0
if ts > 0:
expired_system_times.append(ts)
continue
if rt != "transfer":
continue
pst = str(msg.get("paySubType") or "").strip()
if pst not in ("1", "8"):
continue
try:
ts = int(msg.get("createTime") or 0)
except Exception:
ts = 0
if ts <= 0:
continue
pending_candidates.append((idx, ts))
if not expired_system_times or not pending_candidates:
return set()
used_pending_indexes: set[int] = set()
expired_transfer_ids: set[str] = set()
# 过期系统提示通常出现在转账发起约 24 小时后。
# 为避免误匹配,要求时间差落在 [22h, 26h] 范围内,并选择最接近 24h 的待收款消息。
for sys_ts in sorted(expired_system_times):
best_index = -1
best_distance = 10**9
for idx, transfer_ts in pending_candidates:
if idx in used_pending_indexes:
continue
delta = sys_ts - transfer_ts
if delta < 0:
continue
if delta < 22 * 3600 or delta > 26 * 3600:
continue
distance = abs(delta - 24 * 3600)
if distance < best_distance:
best_distance = distance
best_index = idx
if best_index < 0:
continue
used_pending_indexes.add(best_index)
transfer_msg = merged[best_index]
transfer_msg["paySubType"] = "10"
transfer_msg["transferStatus"] = "已过期"
tid = str(transfer_msg.get("transferId") or "").strip()
if tid:
expired_transfer_ids.add(tid)
return expired_transfer_ids
expired_transfer_ids = _mark_pending_transfers_expired_by_system_messages()
returned_transfer_ids: set[str] = set() # 退还状态的 transferId
received_transfer_ids: set[str] = set() # 已收款状态的 transferId
returned_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于退还回退匹配
@@ -2828,6 +2912,8 @@ def _postprocess_transfer_messages(merged: list[dict[str, Any]]) -> None:
tid = str(m.get("transferId") or "").strip()
if not tid or tid not in pending_transfer_ids:
continue
if tid in expired_transfer_ids:
continue
mid = str(m.get("id") or "").strip()
if mid and mid in backfilled_message_ids:
continue

View File

@@ -12,17 +12,31 @@ from ..path_fix import PathFixRoute
router = APIRouter(route_class=PathFixRoute)
ExportFormat = Literal["json", "txt"]
ExportFormat = Literal["json", "txt", "html"]
ExportScope = Literal["selected", "all", "groups", "singles"]
MediaKind = Literal["image", "emoji", "video", "video_thumb", "voice", "file"]
MessageType = Literal["text", "image", "emoji", "video", "voice", "file", "link", "transfer", "redPacket", "system", "quote", "voip"]
MessageType = Literal[
"text",
"image",
"emoji",
"video",
"voice",
"chatHistory",
"file",
"link",
"transfer",
"redPacket",
"system",
"quote",
"voip",
]
class ChatExportCreateRequest(BaseModel):
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
scope: ExportScope = Field("selected", description="导出范围selected=指定会话all=全部groups=仅群聊singles=仅单聊")
usernames: list[str] = Field(default_factory=list, description="会话 username 列表scope=selected 时使用)")
format: ExportFormat = Field("json", description="导出格式json 或 txtzip 内每个会话一个文件)")
format: ExportFormat = Field("json", description="导出格式json/txt/htmlzip 内每个会话一个文件html 可离线打开 index.html 查看")
start_time: Optional[int] = Field(None, description="起始时间Unix 秒,含)")
end_time: Optional[int] = Field(None, description="结束时间Unix 秒,含)")
include_hidden: bool = Field(False, description="是否包含隐藏会话scope!=selected 时)")
@@ -41,6 +55,10 @@ class ChatExportCreateRequest(BaseModel):
False,
description="预留字段:本项目不从微信进程提取媒体密钥,请使用 wx_key 获取并保存/批量解密",
)
download_remote_media: bool = Field(
False,
description="HTML 导出时允许联网下载链接/引用缩略图等远程媒体(提高离线完整性)",
)
privacy_mode: bool = Field(
False,
description="隐私模式导出:隐藏会话/用户名/内容,不打包头像与媒体",
@@ -64,6 +82,7 @@ async def create_chat_export(req: ChatExportCreateRequest):
message_types=req.message_types,
output_dir=req.output_dir,
allow_process_key_extract=req.allow_process_key_extract,
download_remote_media=req.download_remote_media,
privacy_mode=req.privacy_mode,
file_name=req.file_name,
)

File diff suppressed because it is too large Load Diff

View File

@@ -16,15 +16,16 @@ from .cards.card_00_global_overview import build_card_00_global_overview
from .cards.card_01_cyber_schedule import WeekdayHourHeatmap, build_card_01_cyber_schedule, compute_weekday_hour_heatmap
from .cards.card_02_message_chars import build_card_02_message_chars
from .cards.card_03_reply_speed import build_card_03_reply_speed
from .cards.card_04_emoji_universe import build_card_04_emoji_universe
logger = get_logger(__name__)
# We use this number to version the cache filename so adding more cards won't accidentally serve
# an older partial cache.
_IMPLEMENTED_UPTO_ID = 3
_IMPLEMENTED_UPTO_ID = 4
# Bump this when we change card payloads/ordering while keeping the same implemented_upto.
_CACHE_VERSION = 9
_CACHE_VERSION = 15
# "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card.
@@ -58,6 +59,13 @@ _WRAPPED_CARD_MANIFEST: tuple[dict[str, Any], ...] = (
"category": "B",
"kind": "chat/reply_speed",
},
{
"id": 4,
"title": "这一年,你的表情包里藏了多少心情?",
"scope": "global",
"category": "B",
"kind": "emoji/annual_universe",
},
)
_WRAPPED_CARD_ID_SET = {int(c["id"]) for c in _WRAPPED_CARD_MANIFEST}
@@ -274,7 +282,7 @@ def build_wrapped_annual_response(
) -> dict[str, Any]:
"""Build annual wrapped response for the given account/year.
For now we implement cards up to id=3 (plus a meta overview card id=0).
For now we implement cards up to id=4 (plus a meta overview card id=0).
"""
account_dir = _resolve_account_dir(account)
@@ -317,6 +325,8 @@ def build_wrapped_annual_response(
cards.append(build_card_02_message_chars(account_dir=account_dir, year=y))
# Page 5: reply speed / best chat buddy.
cards.append(build_card_03_reply_speed(account_dir=account_dir, year=y))
# Page 6: annual emoji universe / meme almanac.
cards.append(build_card_04_emoji_universe(account_dir=account_dir, year=y))
obj: dict[str, Any] = {
"account": account_dir.name,
@@ -508,6 +518,8 @@ def build_wrapped_annual_card(
card = build_card_02_message_chars(account_dir=account_dir, year=y)
elif cid == 3:
card = build_card_03_reply_speed(account_dir=account_dir, year=y)
elif cid == 4:
card = build_card_04_emoji_universe(account_dir=account_dir, year=y)
else:
# Should be unreachable due to _WRAPPED_CARD_ID_SET check.
raise ValueError(f"Unknown Wrapped card id: {cid}")

View File

@@ -0,0 +1,50 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_app_message
class TestChatAppMessageType4PatMsgRegression(unittest.TestCase):
def test_type4_link_with_patmsg_metadata_is_not_misclassified_as_pat(self):
raw_text = (
"<msg>"
'<appmsg appid="wxcb8d4298c6a09bcb" sdkver="0">'
"<title>【中配】抽象可能让你的代码变差 - CodeAesthetic</title>"
"<des>UP主黑纹白斑马</des>"
"<type>4</type>"
"<url>https://b23.tv/au68guF</url>"
"<appname>哔哩哔哩</appname>"
"<appattach><cdnthumburl>3057020100044b30</cdnthumburl></appattach>"
"<patMsg><chatUser /></patMsg>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("url"), "https://b23.tv/au68guF")
self.assertEqual(parsed.get("title"), "【中配】抽象可能让你的代码变差 - CodeAesthetic")
self.assertEqual(parsed.get("from"), "哔哩哔哩")
self.assertNotEqual(parsed.get("content"), "[拍一拍]")
def test_type62_is_still_pat(self):
raw_text = '<msg><appmsg><title>"A" 拍了拍 "B"</title><type>62</type></appmsg></msg>'
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "system")
self.assertEqual(parsed.get("content"), "[拍一拍]")
def test_sysmsg_type_patmsg_attr_is_still_pat(self):
raw_text = '<sysmsg type="patmsg"><foo>bar</foo></sysmsg>'
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "system")
self.assertEqual(parsed.get("content"), "[拍一拍]")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,218 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportChatHistoryModal(unittest.TestCase):
_MD5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
record_item = (
"<recorditem>"
"<datalist>"
"<dataitem>"
"<datatype>2</datatype>"
f"<fullmd5>{self._MD5}</fullmd5>"
"</dataitem>"
"</datalist>"
"</recorditem>"
)
chat_history_xml = (
"<msg><appmsg>"
"<type>19</type>"
"<title>聊天记录</title>"
"<des>记录预览</des>"
f"<recorditem><![CDATA[{record_item}]]></recorditem>"
"</appmsg></msg>"
)
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 49, 1, 2, 1735689601, chat_history_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_media_files(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / f"{self._MD5}.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_media_files(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=["chatHistory", "image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_chat_history_modal_has_media_index_and_record_item(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn(f"media/images/{self._MD5}.jpg", names)
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn('id="chatHistoryModal"', html_text)
self.assertIn('data-wce-chat-history="1"', html_text)
self.assertIn('data-record-item-b64="', html_text)
self.assertIn('id="wceMediaIndex"', html_text)
self.assertIn(self._MD5, html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,353 @@
import os
import json
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportHtmlFormat(unittest.TestCase):
_FILE_MD5 = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
_VOICE_SERVER_ID = 2001
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
image_xml = '<msg><img md5="aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" cdnthumburl="img_file_id_1" /></msg>'
voice_xml = '<msg><voicemsg voicelength="3000" /></msg>'
file_md5 = self._FILE_MD5
file_xml = (
"<msg><appmsg>"
"<type>6</type>"
"<title>demo.pdf</title>"
"<totallen>2048</totallen>"
f"<md5>{file_md5}</md5>"
"</appmsg></msg>"
)
link_xml = (
"<msg><appmsg>"
"<type>5</type>"
"<title>示例链接</title>"
"<des>这是描述</des>"
"<url>https://example.com/</url>"
"<thumburl>https://example.com/thumb.jpg</thumburl>"
"<sourceusername>gh_test</sourceusername>"
"<sourcedisplayname>测试公众号</sourcedisplayname>"
"</appmsg></msg>"
)
chat_history_xml = (
"<msg><appmsg>"
"<type>19</type>"
"<title>聊天记录</title>"
"<des>记录预览</des>"
"<recorditem><desc>张三: hi\n李四: ok</desc></recorditem>"
"</appmsg></msg>"
)
transfer_xml = (
"<msg><appmsg>"
"<type>2000</type>"
"<title>微信转账</title>"
"<wcpayinfo>"
"<pay_memo>转账备注</pay_memo>"
"<feedesc>¥1.23</feedesc>"
"<paysubtype>3</paysubtype>"
"<transferid>transfer_123</transferid>"
"</wcpayinfo>"
"</appmsg></msg>"
)
red_packet_xml = (
"<msg><appmsg>"
"<type>2001</type>"
"<title>红包</title>"
"<wcpayinfo>"
"<sendertitle>恭喜发财,大吉大利</sendertitle>"
"<senderdes>微信红包</senderdes>"
"</wcpayinfo>"
"</appmsg></msg>"
)
voip_xml = (
"<msg><VoIPBubbleMsg>"
"<room_type>1</room_type>"
"<msg>语音通话</msg>"
"</VoIPBubbleMsg></msg>"
)
quote_voice_xml = (
"<msg><appmsg>"
"<type>57</type>"
"<title>回复语音</title>"
"<refermsg>"
"<type>34</type>"
f"<svrid>{self._VOICE_SERVER_ID}</svrid>"
"<fromusr>wxid_friend</fromusr>"
"<displayname>测试好友</displayname>"
"<content>wxid_friend:3000:1:</content>"
"</refermsg>"
"</appmsg></msg>"
)
rows = [
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
(2, 1002, 1, 2, 2, 1735689602, "普通文本消息[微笑]", None),
(3, 1003, 49, 3, 1, 1735689603, transfer_xml, None),
(4, 1004, 49, 4, 2, 1735689604, red_packet_xml, None),
(5, 1005, 49, 5, 1, 1735689605, file_xml, None),
(6, 1006, 49, 6, 2, 1735689606, link_xml, None),
(7, 1007, 49, 7, 2, 1735689607, chat_history_xml, None),
(8, 1008, 50, 8, 2, 1735689608, voip_xml, None),
(9, self._VOICE_SERVER_ID, 34, 9, 1, 1735689609, voice_xml, None),
(10, 1010, 49, 10, 1, 1735689610, quote_voice_xml, None),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
finally:
conn.close()
def _seed_media_files(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9")
(resource_root / "bb").mkdir(parents=True, exist_ok=True)
(resource_root / "bb" / f"{self._FILE_MD5}.dat").write_bytes(b"dummy")
conn = sqlite3.connect(str(account_dir / "media_0.db"))
try:
conn.execute(
"""
CREATE TABLE VoiceInfo (
svr_id INTEGER,
create_time INTEGER,
voice_data BLOB
)
"""
)
conn.execute(
"INSERT INTO VoiceInfo VALUES (?, ?, ?)",
(self._VOICE_SERVER_ID, 1735689609, b"SILK_VOICE_DATA"),
)
conn.commit()
finally:
conn.close()
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_media_files(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"],
message_types=[],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_html_export_contains_index_and_conversation_page(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
self.assertTrue(job.zip_path and job.zip_path.exists())
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn("index.html", names)
self.assertIn("assets/wechat-chat-export.css", names)
self.assertIn("assets/wechat-chat-export.js", names)
manifest = json.loads(zf.read("manifest.json").decode("utf-8"))
self.assertEqual(manifest.get("format"), "html")
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn('data-wce-rail-avatar="1"', html_text)
self.assertIn('data-wce-session-list="1"', html_text)
self.assertIn('id="sessionSearchInput"', html_text)
self.assertIn('data-wce-time-divider="1"', html_text)
self.assertIn('id="messageTypeFilter"', html_text)
self.assertIn('value="chatHistory"', html_text)
self.assertIn('id="chatHistoryModal"', html_text)
self.assertIn('data-wce-chat-history="1"', html_text)
self.assertIn('data-record-item-b64="', html_text)
self.assertIn('id="wceMediaIndex"', html_text)
self.assertIn('data-wce-quote-voice-btn="1"', html_text)
self.assertNotIn('title="刷新消息"', html_text)
self.assertNotIn('title="导出聊天记录"', html_text)
self.assertNotIn("搜索聊天记录", html_text)
self.assertNotIn("朋友圈", html_text)
self.assertNotIn("年度总结", html_text)
self.assertNotIn("设置", html_text)
self.assertNotIn("隐私模式", html_text)
self.assertTrue(any(n.startswith("media/images/") for n in names))
self.assertIn("../../media/images/", html_text)
self.assertIn("wechat-transfer-card", html_text)
self.assertIn("wechat-redpacket-card", html_text)
self.assertIn("wechat-chat-history-card", html_text)
self.assertIn("wechat-voip-bubble", html_text)
self.assertIn("wechat-link-card", html_text)
self.assertIn("wechat-file-card", html_text)
self.assertIn("wechat-voice-wrapper", html_text)
css_text = zf.read("assets/wechat-chat-export.css").decode("utf-8", errors="ignore")
self.assertIn("wechat-transfer-card", css_text)
self.assertNotIn("wechat-transfer-card[data-v-", css_text)
js_text = zf.read("assets/wechat-chat-export.js").decode("utf-8", errors="ignore")
self.assertIn("wechat-voice-bubble", js_text)
self.assertIn("voice-playing", js_text)
self.assertIn("data-wce-quote-voice-btn", js_text)
self.assertIn("assets/images/wechat/wechat-trans-icon1.png", names)
self.assertIn("assets/images/wechat/zip.png", names)
self.assertIn("assets/images/wechat/WeChat-Icon-Logo.wine.svg", names)
self.assertTrue(any(n.startswith("fonts/") and n.endswith(".woff2") for n in names))
self.assertIn("wxemoji/Expression_1@2x.png", names)
self.assertIn("../../wxemoji/Expression_1@2x.png", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,199 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportImageMd5CandidateFallback(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
bad_md5 = "ffffffffffffffffffffffffffffffff"
image_xml = f'<msg><img md5="{bad_md5}" hdmd5="{good_md5}" cdnthumburl="img_file_id_1" /></msg>'
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_decrypted_resource(self, account_dir: Path) -> None:
resource_root = account_dir / "resource"
(resource_root / "aa").mkdir(parents=True, exist_ok=True)
(resource_root / "aa" / "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
self._seed_decrypted_resource(account_dir)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=[],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_falls_back_to_secondary_md5_candidate(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn("media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", names)
self.assertFalse(any("ffffffffffffffffffffffffffffffff" in n for n in names if n.startswith("media/images/")))
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8", errors="ignore")
self.assertIn("../../media/images/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa.jpg", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,235 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestChatExportImageMd5PrefersMessageResource(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_source_info(self, account_dir: Path) -> None:
wxid_dir = account_dir / "_wxid_dummy"
db_storage_dir = account_dir / "_db_storage_dummy"
wxid_dir.mkdir(parents=True, exist_ok=True)
db_storage_dir.mkdir(parents=True, exist_ok=True)
(account_dir / "_source.json").write_text(
'{"wxid_dir": "' + str(wxid_dir).replace("\\", "\\\\") + '", "db_storage_path": "' + str(db_storage_dir).replace("\\", "\\\\") + '"}',
encoding="utf-8",
)
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str, bad_md5: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
image_xml = f'<msg><img md5="{bad_md5}" /></msg>'
conn.execute(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 1, 2, 1735689601, image_xml, None),
)
conn.commit()
finally:
conn.close()
def _seed_message_resource_db(self, path: Path, *, good_md5: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE MessageResourceInfo (
message_id INTEGER,
message_svr_id INTEGER,
message_local_type INTEGER,
chat_id INTEGER,
message_local_id INTEGER,
message_create_time INTEGER,
packed_info BLOB
)
"""
)
# packed_info may contain multiple tokens; include a realistic *.dat reference so the extractor prefers it.
packed_info = f"{good_md5}_t.dat".encode("ascii")
conn.execute(
"INSERT INTO MessageResourceInfo VALUES (?, ?, ?, ?, ?, ?, ?)",
(1, 1001, 3, 0, 1, 1735689601, packed_info),
)
conn.commit()
finally:
conn.close()
def _seed_decrypted_resource(self, account_dir: Path, *, good_md5: str) -> None:
resource_root = account_dir / "resource"
(resource_root / good_md5[:2]).mkdir(parents=True, exist_ok=True)
# Minimal JPEG payload (valid SOI/EOI).
(resource_root / good_md5[:2] / f"{good_md5}.jpg").write_bytes(b"\xff\xd8\xff\xd9")
def _prepare_account(self, root: Path, *, account: str, username: str, bad_md5: str, good_md5: str) -> Path:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_source_info(account_dir)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
self._seed_message_db(account_dir / "message_0.db", account=account, username=username, bad_md5=bad_md5)
self._seed_message_resource_db(account_dir / "message_resource.db", good_md5=good_md5)
self._seed_decrypted_resource(account_dir, good_md5=good_md5)
return account_dir
def _create_job(self, manager, *, account: str, username: str):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image"],
message_types=["image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_prefers_message_resource_md5_over_xml_md5(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
bad_md5 = "ffffffffffffffffffffffffffffffff"
good_md5 = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
self._prepare_account(root, account=account, username=username, bad_md5=bad_md5, good_md5=good_md5)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
job = self._create_job(svc.CHAT_EXPORT_MANAGER, account=account, username=username)
self.assertEqual(job.status, "done", msg=job.error)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
self.assertIn(f"media/images/{good_md5}.jpg", names)
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8", errors="ignore")
self.assertIn(f"../../media/images/{good_md5}.jpg", html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -198,6 +198,7 @@ class TestChatExportMessageTypesSemantics(unittest.TestCase):
message_types=message_types,
output_dir=None,
allow_process_key_extract=False,
download_remote_media=False,
privacy_mode=privacy_mode,
file_name=None,
)

View File

@@ -0,0 +1,304 @@
import os
import hashlib
import sqlite3
import sys
import unittest
import zipfile
import importlib
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class _FakeResponse:
def __init__(self, body: bytes, *, content_type: str) -> None:
self.status_code = 200
self.headers = {
"Content-Type": str(content_type or "").strip(),
"Content-Length": str(len(body)),
}
self._body = body
def iter_content(self, chunk_size=65536):
data = self._body or b""
for i in range(0, len(data), int(chunk_size or 65536)):
yield data[i : i + int(chunk_size or 65536)]
def close(self):
return None
class TestChatExportRemoteThumbOption(unittest.TestCase):
def _reload_export_modules(self):
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.chat_helpers as chat_helpers
import wechat_decrypt_tool.media_helpers as media_helpers
import wechat_decrypt_tool.chat_export_service as chat_export_service
importlib.reload(app_paths)
importlib.reload(chat_helpers)
importlib.reload(media_helpers)
importlib.reload(chat_export_service)
return chat_export_service
def _seed_contact_db(self, path: Path, *, account: str, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", "测试好友", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, username: str) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
conn.execute(
"INSERT INTO SessionTable VALUES (?, ?, ?)",
(username, 0, 1735689600),
)
conn.commit()
finally:
conn.close()
def _seed_message_db(self, path: Path, *, account: str, username: str) -> tuple[str, str]:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB
)
"""
)
link_thumb = "https://1.1.1.1/thumb.png"
quote_thumb = "https://1.1.1.1/quote.png"
link_xml = (
"<msg><appmsg>"
"<type>5</type>"
"<title>示例链接</title>"
"<des>这是描述</des>"
"<url>https://example.com/</url>"
f"<thumburl>{link_thumb}</thumburl>"
"</appmsg></msg>"
)
quote_xml = (
"<msg><appmsg>"
"<type>57</type>"
"<title>回复</title>"
"<refermsg>"
"<type>49</type>"
"<svrid>8888</svrid>"
"<fromusr>wxid_other</fromusr>"
"<displayname>对方</displayname>"
"<content>"
"<msg><appmsg><type>5</type><title>被引用链接</title><url>https://example.com/</url>"
f"<thumburl>{quote_thumb}</thumburl>"
"</appmsg></msg>"
"</content>"
"</refermsg>"
"</appmsg></msg>"
)
rows = [
(1, 1001, 49, 1, 2, 1735689601, link_xml, None),
(2, 1002, 49, 2, 2, 1735689602, quote_xml, None),
]
conn.executemany(
f"INSERT INTO {table_name} (local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content) VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
rows,
)
conn.commit()
return link_thumb, quote_thumb
finally:
conn.close()
def _prepare_account(self, root: Path, *, account: str, username: str) -> tuple[Path, str, str]:
account_dir = root / "output" / "databases" / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, username=username)
self._seed_session_db(account_dir / "session.db", username=username)
link_thumb, quote_thumb = self._seed_message_db(account_dir / "message_0.db", account=account, username=username)
return account_dir, link_thumb, quote_thumb
def _create_job(self, manager, *, account: str, username: str, download_remote_media: bool):
job = manager.create_job(
account=account,
scope="selected",
usernames=[username],
export_format="html",
start_time=None,
end_time=None,
include_hidden=False,
include_official=False,
include_media=True,
media_kinds=["image", "emoji", "video", "video_thumb", "voice", "file"],
message_types=["link", "quote", "image"],
output_dir=None,
allow_process_key_extract=False,
download_remote_media=download_remote_media,
privacy_mode=False,
file_name=None,
)
for _ in range(200):
latest = manager.get_job(job.export_id)
if latest and latest.status in {"done", "error", "cancelled"}:
return latest
import time as _time
_time.sleep(0.05)
self.fail("export job did not finish in time")
def test_remote_thumb_disabled_does_not_download(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
with mock.patch.object(
svc.requests,
"get",
side_effect=AssertionError("requests.get should not be called when download_remote_media=False"),
) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=False,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertEqual(m_get.call_count, 0)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
self.assertIn(f'src="{link_thumb}"', html_text)
self.assertIn(f'src="{quote_thumb}"', html_text)
self.assertFalse(any(n.startswith("media/remote/") for n in names))
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data
def test_remote_thumb_enabled_downloads_and_rewrites(self):
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_test"
username = "wxid_friend"
_, link_thumb, quote_thumb = self._prepare_account(root, account=account, username=username)
prev_data = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
svc = self._reload_export_modules()
fake_png = b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde"
def _fake_get(url, **_kwargs):
return _FakeResponse(fake_png, content_type="image/png")
with mock.patch.object(svc.requests, "get", side_effect=_fake_get) as m_get:
job = self._create_job(
svc.CHAT_EXPORT_MANAGER,
account=account,
username=username,
download_remote_media=True,
)
self.assertEqual(job.status, "done", msg=job.error)
self.assertGreaterEqual(m_get.call_count, 1)
with zipfile.ZipFile(job.zip_path, "r") as zf:
names = set(zf.namelist())
html_path = next((n for n in names if n.endswith("/messages.html")), "")
self.assertTrue(html_path)
html_text = zf.read(html_path).decode("utf-8")
h1 = hashlib.sha256(link_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc1 = f"media/remote/{h1[:32]}.png"
self.assertIn(arc1, names)
self.assertIn(f"../../{arc1}", html_text)
self.assertNotIn(f'src="{link_thumb}"', html_text)
h2 = hashlib.sha256(quote_thumb.encode("utf-8", errors="ignore")).hexdigest()
arc2 = f"media/remote/{h2[:32]}.png"
self.assertIn(arc2, names)
self.assertIn(f"../../{arc2}", html_text)
self.assertNotIn(f'src="{quote_thumb}"', html_text)
finally:
if prev_data is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data

View File

@@ -0,0 +1,58 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.chat_helpers import _parse_app_message
class TestChatOfficialArticleCoverStyle(unittest.TestCase):
def test_mp_weixin_feed_url_is_cover_style(self):
raw_text = (
"<msg>"
"<appmsg>"
"<title>时尚穿搭「这样的jk你喜欢吗」</title>"
"<des>这样的jk你喜欢吗</des>"
"<type>5</type>"
"<url>"
"http://mp.weixin.qq.com/s?__biz=MzkxOTY4MjIxOA==&amp;mid=2247508015&amp;idx=1&amp;sn=931dce677c6e70b4365792b14e7e8ff0"
"&amp;exptype=masonry_feed_brief_content_elite_for_pcfeeds_u2i&amp;ranksessionid=1770868256_1&amp;req_id=1770867949535989#rd"
"</url>"
"<thumburl>https://mmbiz.qpic.cn/sz_mmbiz_jpg/foo/640?wx_fmt=jpeg&amp;wxfrom=401</thumburl>"
"<sourcedisplayname>甜图社</sourcedisplayname>"
"<sourceusername>gh_abc123</sourceusername>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "cover")
def test_mp_weixin_non_feed_url_keeps_default_style(self):
raw_text = (
"<msg>"
"<appmsg>"
"<title>普通分享</title>"
"<des>这样的jk你喜欢吗</des>"
"<type>5</type>"
"<url>http://mp.weixin.qq.com/s?__biz=foo&amp;mid=1&amp;idx=1&amp;sn=bar#rd</url>"
"<sourcedisplayname>甜图社</sourcedisplayname>"
"<sourceusername>gh_abc123</sourceusername>"
"</appmsg>"
"</msg>"
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "default")
if __name__ == "__main__":
unittest.main()

View File

@@ -62,7 +62,68 @@ class TestTransferPostprocess(unittest.TestCase):
self.assertEqual(merged[0].get("transferStatus"), "已被接收")
def test_pending_transfer_marked_expired_by_system_message(self):
merged = [
{
"id": "message_0:Msg_x:100",
"renderType": "transfer",
"paySubType": "1",
"transferId": "t-expired-1",
"amount": "¥500.00",
"createTime": 1770742598,
"isSent": True,
"transferStatus": "转账",
},
{
"id": "message_0:Msg_x:101",
"renderType": "system",
"type": 10000,
"createTime": 1770829000,
"content": "收款方24小时内未接收你的转账已过期",
},
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("paySubType"), "10")
self.assertEqual(merged[0].get("transferStatus"), "已过期")
def test_expired_matching_wins_over_amount_time_received_fallback(self):
merged = [
{
"id": "message_0:Msg_x:200",
"renderType": "transfer",
"paySubType": "1",
"transferId": "t-expired-2",
"amount": "¥500.00",
"createTime": 1770742598,
"isSent": True,
"transferStatus": "",
},
{
"id": "message_0:Msg_x:201",
"renderType": "transfer",
"paySubType": "3",
"transferId": "t-other",
"amount": "¥500.00",
"createTime": 1770828800,
"isSent": False,
"transferStatus": "已收款",
},
{
"id": "message_0:Msg_x:202",
"renderType": "system",
"type": 10000,
"createTime": 1770829000,
"content": "收款方24小时内未接收你的转账已过期",
},
]
chat_router._postprocess_transfer_messages(merged)
self.assertEqual(merged[0].get("paySubType"), "10")
self.assertEqual(merged[0].get("transferStatus"), "已过期")
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,773 @@
import hashlib
import sqlite3
import sys
import unittest
from datetime import datetime
from pathlib import Path
from tempfile import TemporaryDirectory
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestWrappedEmojiUniverse(unittest.TestCase):
def _ts(self, y: int, m: int, d: int, h: int = 0, mi: int = 0, s: int = 0) -> int:
return int(datetime(y, m, d, h, mi, s).timestamp())
def _seed_contact_db(self, path: Path, *, account: str, usernames: list[str]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE contact (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"""
CREATE TABLE stranger (
username TEXT,
remark TEXT,
nick_name TEXT,
alias TEXT,
local_type INTEGER,
verify_flag INTEGER,
big_head_url TEXT,
small_head_url TEXT
)
"""
)
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(account, "", "", "", 1, 0, "", ""),
)
for idx, username in enumerate(usernames):
conn.execute(
"INSERT INTO contact VALUES (?, ?, ?, ?, ?, ?, ?, ?)",
(username, "", f"好友{idx + 1}", "", 1, 0, "", ""),
)
conn.commit()
finally:
conn.close()
def _seed_session_db(self, path: Path, *, usernames: list[str]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT,
is_hidden INTEGER,
sort_timestamp INTEGER
)
"""
)
for username in usernames:
conn.execute("INSERT INTO SessionTable VALUES (?, ?, ?)", (username, 0, 1735689600))
conn.commit()
finally:
conn.close()
def _seed_message_db(
self,
path: Path,
*,
account: str,
username: str,
rows: list[dict[str, object]],
) -> None:
table_name = f"msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE Name2Id (rowid INTEGER PRIMARY KEY, user_name TEXT)")
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (1, account))
conn.execute("INSERT INTO Name2Id(rowid, user_name) VALUES (?, ?)", (2, username))
conn.execute(
f"""
CREATE TABLE {table_name} (
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB,
packed_info_data BLOB
)
"""
)
for row in rows:
conn.execute(
f"""
INSERT INTO {table_name}
(local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content, packed_info_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
int(row.get("local_id", 0)),
int(row.get("server_id", 0)),
int(row.get("local_type", 0)),
int(row.get("sort_seq", row.get("local_id", 0))),
int(row.get("real_sender_id", 1)),
int(row.get("create_time", 0)),
str(row.get("message_content", "")),
row.get("compress_content"),
row.get("packed_info_data"),
),
)
conn.commit()
finally:
conn.close()
def _seed_index_db(self, path: Path, *, rows: list[dict[str, object]]) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute(
"""
CREATE TABLE message_fts (
text TEXT,
username TEXT,
render_type TEXT,
create_time INTEGER,
sort_seq INTEGER,
local_id INTEGER,
server_id INTEGER,
local_type INTEGER,
db_stem TEXT,
table_name TEXT,
sender_username TEXT,
is_hidden INTEGER,
is_official INTEGER
)
"""
)
for row in rows:
conn.execute(
"""
INSERT INTO message_fts (
text, username, render_type, create_time, sort_seq, local_id, server_id, local_type,
db_stem, table_name, sender_username, is_hidden, is_official
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
str(row.get("text", "")),
str(row.get("username", "")),
str(row.get("render_type", "")),
int(row.get("create_time", 0)),
int(row.get("sort_seq", 0)),
int(row.get("local_id", 0)),
int(row.get("server_id", 0)),
int(row.get("local_type", 0)),
str(row.get("db_stem", "message_0")),
str(row.get("table_name", "")),
str(row.get("sender_username", "")),
int(row.get("is_hidden", 0)),
int(row.get("is_official", 0)),
),
)
conn.commit()
finally:
conn.close()
def _seed_resource_db(
self,
path: Path,
*,
username: str,
md5: str,
server_id: int,
local_id: int,
create_time: int,
) -> None:
conn = sqlite3.connect(str(path))
try:
conn.execute("CREATE TABLE ChatName2Id (user_name TEXT)")
conn.execute("INSERT INTO ChatName2Id (rowid, user_name) VALUES (?, ?)", (7, username))
conn.execute(
"""
CREATE TABLE MessageResourceInfo (
message_id INTEGER PRIMARY KEY AUTOINCREMENT,
message_svr_id INTEGER,
chat_id INTEGER,
message_local_type INTEGER,
packed_info BLOB,
message_local_id INTEGER,
message_create_time INTEGER
)
"""
)
packed = f"/tmp/{md5}.dat".encode("utf-8")
conn.execute(
"""
INSERT INTO MessageResourceInfo
(message_svr_id, chat_id, message_local_type, packed_info, message_local_id, message_create_time)
VALUES (?, ?, ?, ?, ?, ?)
""",
(int(server_id), 7, 47, packed, int(local_id), int(create_time)),
)
conn.commit()
finally:
conn.close()
def test_only_sticker_messages_outputs_core_stats(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_a"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
md5_a = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"
md5_b = "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb"
rows = [
{
"local_id": 1,
"server_id": 1001,
"local_type": 47,
"create_time": self._ts(2025, 1, 1, 10, 5, 0),
"message_content": f'<msg><emoji md5="{md5_a}" cdnurl="http://cdn/a.gif"/></msg>',
},
{
"local_id": 2,
"server_id": 1002,
"local_type": 47,
"create_time": self._ts(2025, 1, 1, 10, 30, 0),
"message_content": f'<msg><emoji md5="{md5_a}" cdnurl="http://cdn/a2.gif"/></msg>',
},
{
"local_id": 3,
"server_id": 1003,
"local_type": 47,
"create_time": self._ts(2025, 1, 2, 22, 10, 0),
"message_content": f'<msg><emoji md5="{md5_b}" cdnurl="http://cdn/b.gif"/></msg>',
},
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
table_name = f"msg_{hashlib.md5(friend.encode('utf-8')).hexdigest()}"
fts_rows = []
for row in rows:
fts_rows.append(
{
"text": "[表情]",
"username": friend,
"render_type": "emoji",
"create_time": row["create_time"],
"sort_seq": row["local_id"],
"local_id": row["local_id"],
"server_id": row["server_id"],
"local_type": 47,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
}
)
self._seed_index_db(account_dir / "chat_search_index.db", rows=fts_rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertTrue(data["settings"]["usedIndex"])
self.assertEqual(data["sentStickerCount"], 3)
self.assertEqual(data["peakHour"], 10)
self.assertIsNotNone(data["peakWeekday"])
self.assertEqual(data["topBattlePartner"]["username"], friend)
self.assertEqual(data["topBattlePartner"]["stickerCount"], 3)
self.assertEqual(data["topBattlePartner"]["maskedName"], data["topBattlePartner"]["displayName"])
self.assertEqual(data["topStickers"][0]["md5"], md5_a)
self.assertEqual(data["topStickers"][0]["count"], 2)
self.assertTrue(str(data["topStickers"][0].get("sampleDisplayName") or "").strip())
self.assertTrue(str(data["topStickers"][0].get("sampleAvatarUrl") or "").startswith("/api/chat/avatar"))
def test_fallback_to_resource_md5_when_xml_missing(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_b"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
ts = self._ts(2025, 3, 8, 21, 0, 0)
rows = [
{
"local_id": 11,
"server_id": 220011,
"local_type": 47,
"create_time": ts,
"message_content": '<msg><emoji cdnurl="http://cdn/no_md5.gif"/></msg>',
}
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
md5_fallback = "cccccccccccccccccccccccccccccccc"
self._seed_resource_db(
account_dir / "message_resource.db",
username=friend,
md5=md5_fallback,
server_id=220011,
local_id=11,
create_time=ts,
)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertFalse(data["settings"]["usedIndex"])
self.assertEqual(data["sentStickerCount"], 1)
self.assertEqual(data["topStickers"][0]["md5"], md5_fallback)
self.assertEqual(data["topStickers"][0]["count"], 1)
def test_text_emoji_mapping_from_wechat_emojis_ts(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_c"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
table_name = f"msg_{hashlib.md5(friend.encode('utf-8')).hexdigest()}"
fts_rows = [
{
"text": "早上好[微笑][微笑]🙂🙂",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 4, 1, 9, 0, 0),
"local_id": 1,
"server_id": 901,
"local_type": 1,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
},
{
"text": "晚上见[微笑][发呆]😂",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 4, 1, 22, 0, 0),
"local_id": 2,
"server_id": 902,
"local_type": 1,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
},
]
self._seed_index_db(account_dir / "chat_search_index.db", rows=fts_rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertTrue(data["settings"]["usedIndex"])
self.assertGreaterEqual(len(data["topTextEmojis"]), 1)
self.assertEqual(data["topTextEmojis"][0]["key"], "[微笑]")
self.assertEqual(data["topTextEmojis"][0]["count"], 3)
self.assertTrue(data["topTextEmojis"][0]["assetPath"].endswith("Expression_1@2x.png"))
self.assertGreaterEqual(len(data["topUnicodeEmojis"]), 1)
self.assertEqual(data["topUnicodeEmojis"][0]["emoji"], "🙂")
self.assertEqual(data["topUnicodeEmojis"][0]["count"], 2)
def test_wechat_builtin_emoji_from_packed_info_data(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_e"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
# packed_info_data protobuf varints:
# 08 04 => field#1=4
# 10 33 => field#2=51 (Expression_51@2x)
rows = [
{
"local_id": 1,
"server_id": 501,
"local_type": 47,
"create_time": self._ts(2025, 7, 1, 10, 0, 0),
"message_content": "binary_emoji_payload_a",
"packed_info_data": bytes.fromhex("08041033"),
},
{
"local_id": 2,
"server_id": 502,
"local_type": 47,
"create_time": self._ts(2025, 7, 1, 10, 1, 0),
"message_content": "binary_emoji_payload_b",
"packed_info_data": bytes.fromhex("08041033"),
},
{
"local_id": 3,
"server_id": 503,
"local_type": 47,
"create_time": self._ts(2025, 7, 1, 11, 0, 0),
"message_content": "binary_emoji_payload_c",
"packed_info_data": bytes.fromhex("0804104a"),
},
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertFalse(data["settings"]["usedIndex"])
self.assertEqual(data["sentStickerCount"], 3)
self.assertGreaterEqual(len(data["topWechatEmojis"]), 1)
self.assertEqual(data["topWechatEmojis"][0]["id"], 51)
self.assertEqual(data["topWechatEmojis"][0]["count"], 2)
self.assertTrue(data["topWechatEmojis"][0]["assetPath"].endswith("Expression_51@2x.png"))
self.assertGreaterEqual(len(data["topStickers"]), 1)
self.assertEqual(data["topStickers"][0]["emojiId"], 51)
self.assertEqual(data["topStickers"][0]["count"], 2)
self.assertTrue(str(data["topStickers"][0].get("emojiAssetPath") or "").endswith("Expression_51@2x.png"))
def test_index_counts_only_sent_messages(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_sent_only"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
rows = [
{
"text": "[ 微 笑 ]",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 6, 2, 9, 0, 0),
"local_id": 101,
"server_id": 4001,
"local_type": 1,
"table_name": "msg_dummy",
"sender_username": account,
},
{
"text": "[ 发 呆 ]",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 6, 2, 9, 1, 0),
"local_id": 102,
"server_id": 4002,
"local_type": 1,
"table_name": "msg_dummy",
"sender_username": friend,
},
{
"text": "[表情]",
"username": friend,
"render_type": "emoji",
"create_time": self._ts(2025, 6, 2, 9, 2, 0),
"local_id": 201,
"server_id": 5001,
"local_type": 47,
"table_name": "msg_dummy",
"sender_username": account,
},
{
"text": "[表情]",
"username": friend,
"render_type": "emoji",
"create_time": self._ts(2025, 6, 2, 9, 3, 0),
"local_id": 202,
"server_id": 5002,
"local_type": 47,
"table_name": "msg_dummy",
"sender_username": friend,
},
]
self._seed_index_db(account_dir / "chat_search_index.db", rows=rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertTrue(data["settings"]["usedIndex"])
self.assertEqual(data["sentStickerCount"], 1)
keys = {x.get("key") for x in data.get("topTextEmojis") or []}
self.assertIn("[微笑]", keys)
self.assertNotIn("[发呆]", keys)
def test_raw_db_counts_only_sent_messages(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_raw_dir"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
rows = [
{
"local_id": 1,
"server_id": 1001,
"local_type": 1,
"real_sender_id": 1,
"create_time": self._ts(2025, 7, 1, 8, 0, 0),
"message_content": "/::B",
},
{
"local_id": 2,
"server_id": 1002,
"local_type": 1,
"real_sender_id": 2,
"create_time": self._ts(2025, 7, 1, 8, 1, 0),
"message_content": "/::B",
},
{
"local_id": 3,
"server_id": 1101,
"local_type": 47,
"real_sender_id": 1,
"create_time": self._ts(2025, 7, 1, 9, 0, 0),
"message_content": "binary_emoji_payload_a",
"packed_info_data": bytes.fromhex("08031033"),
},
{
"local_id": 4,
"server_id": 1102,
"local_type": 47,
"real_sender_id": 2,
"create_time": self._ts(2025, 7, 1, 9, 1, 0),
"message_content": "binary_emoji_payload_b",
"packed_info_data": bytes.fromhex("08031033"),
},
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertFalse(data["settings"]["usedIndex"])
self.assertEqual(data["sentStickerCount"], 1)
self.assertEqual(data["topWechatEmojis"][0]["id"], 51)
self.assertEqual(data["topWechatEmojis"][0]["count"], 1)
self.assertGreaterEqual(len(data["topTextEmojis"]), 1)
self.assertEqual(data["topTextEmojis"][0]["key"], "[色]")
self.assertEqual(data["topTextEmojis"][0]["count"], 1)
self.assertTrue(data["topTextEmojis"][0]["assetPath"].endswith("Expression_3@2x.png"))
def test_new_and_revived_sticker_metrics(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_new_revived"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
md5_revived = "dddddddddddddddddddddddddddddddd"
md5_recent = "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"
md5_new = "ffffffffffffffffffffffffffffffff"
rows = [
{
"local_id": 1,
"server_id": 5001,
"local_type": 47,
"create_time": self._ts(2024, 1, 1, 9, 0, 0),
"message_content": f'<msg><emoji md5="{md5_revived}" /></msg>',
},
{
"local_id": 2,
"server_id": 5002,
"local_type": 47,
"create_time": self._ts(2024, 12, 28, 10, 0, 0),
"message_content": f'<msg><emoji md5="{md5_recent}" /></msg>',
},
{
"local_id": 3,
"server_id": 5003,
"local_type": 47,
"create_time": self._ts(2025, 1, 5, 11, 0, 0),
"message_content": f'<msg><emoji md5="{md5_recent}" /></msg>',
},
{
"local_id": 4,
"server_id": 5004,
"local_type": 47,
"create_time": self._ts(2025, 3, 15, 12, 0, 0),
"message_content": f'<msg><emoji md5="{md5_revived}" /></msg>',
},
{
"local_id": 5,
"server_id": 5005,
"local_type": 47,
"create_time": self._ts(2025, 5, 10, 13, 0, 0),
"message_content": f'<msg><emoji md5="{md5_new}" /></msg>',
},
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertEqual(data["sentStickerCount"], 3)
self.assertEqual(data["uniqueStickerTypeCount"], 3)
self.assertEqual(data["newStickerCountThisYear"], 1)
self.assertEqual(data["revivedStickerCount"], 1)
self.assertEqual(data["revivedMinGapDays"], 60)
self.assertGreaterEqual(int(data.get("revivedMaxGapDays") or 0), 400)
new_samples = list(data.get("newStickerSamples") or [])
revived_samples = list(data.get("revivedStickerSamples") or [])
self.assertTrue(any(str(x.get("md5") or "") == md5_new for x in new_samples))
self.assertTrue(any(str(x.get("md5") or "") == md5_revived for x in revived_samples))
revived_item = next((x for x in revived_samples if str(x.get("md5") or "") == md5_revived), {})
self.assertGreaterEqual(int(revived_item.get("gapDays") or 0), 400)
def test_empty_year_returns_safe_empty_state(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import build_card_04_emoji_universe
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[])
self._seed_session_db(account_dir / "session.db", usernames=[])
card = build_card_04_emoji_universe(account_dir=account_dir, year=2025)
self.assertEqual(card["id"], 4)
self.assertEqual(card["status"], "ok")
self.assertEqual(card["data"]["sentStickerCount"], 0)
self.assertIn("几乎没用表情表达", card["narrative"])
self.assertIsInstance(card["data"]["lines"], list)
self.assertGreaterEqual(len(card["data"]["lines"]), 1)
self.assertEqual(card["data"].get("topUnicodeEmojis"), [])
def test_tie_break_is_stable_by_key(self):
from wechat_decrypt_tool.wrapped.cards.card_04_emoji_universe import compute_emoji_universe_stats
with TemporaryDirectory() as td:
root = Path(td)
account = "wxid_me"
friend = "wxid_friend_d"
account_dir = root / account
account_dir.mkdir(parents=True, exist_ok=True)
self._seed_contact_db(account_dir / "contact.db", account=account, usernames=[friend])
self._seed_session_db(account_dir / "session.db", usernames=[friend])
md5_a = "11111111111111111111111111111111"
md5_b = "22222222222222222222222222222222"
rows = [
{
"local_id": 1,
"server_id": 301,
"local_type": 47,
"create_time": self._ts(2025, 6, 1, 8, 0, 0),
"message_content": f'<msg><emoji md5="{md5_a}" /></msg>',
},
{
"local_id": 2,
"server_id": 302,
"local_type": 47,
"create_time": self._ts(2025, 6, 1, 8, 1, 0),
"message_content": f'<msg><emoji md5="{md5_b}" /></msg>',
},
{
"local_id": 3,
"server_id": 303,
"local_type": 47,
"create_time": self._ts(2025, 6, 1, 8, 2, 0),
"message_content": f'<msg><emoji md5="{md5_a}" /></msg>',
},
{
"local_id": 4,
"server_id": 304,
"local_type": 47,
"create_time": self._ts(2025, 6, 1, 8, 3, 0),
"message_content": f'<msg><emoji md5="{md5_b}" /></msg>',
},
]
self._seed_message_db(account_dir / "message_0.db", account=account, username=friend, rows=rows)
table_name = f"msg_{hashlib.md5(friend.encode('utf-8')).hexdigest()}"
fts_rows = []
for row in rows:
fts_rows.append(
{
"text": "[表情]",
"username": friend,
"render_type": "emoji",
"create_time": row["create_time"],
"local_id": row["local_id"],
"server_id": row["server_id"],
"local_type": 47,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
}
)
fts_rows.extend(
[
{
# `chat_search_index` stores text as char-tokens: "[微笑][发呆]" -> "[ 微 笑 ] [ 发 呆 ]"
"text": "[ 微 笑 ] [ 发 呆 ]",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 6, 2, 9, 0, 0),
"local_id": 101,
"server_id": 4001,
"local_type": 1,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
},
{
"text": "[ 发 呆 ] [ 微 笑 ]",
"username": friend,
"render_type": "text",
"create_time": self._ts(2025, 6, 2, 9, 1, 0),
"local_id": 102,
"server_id": 4002,
"local_type": 1,
"db_stem": "message_0",
"table_name": table_name,
"sender_username": account,
},
]
)
self._seed_index_db(account_dir / "chat_search_index.db", rows=fts_rows)
data = compute_emoji_universe_stats(account_dir=account_dir, year=2025)
self.assertEqual(data["topStickers"][0]["md5"], md5_a)
expected_emoji_key = sorted(["[微笑]", "[发呆]"])[0]
self.assertEqual(data["topTextEmojis"][0]["key"], expected_emoji_key)
if __name__ == "__main__":
unittest.main()