feat(sns): 支持外部分享卡片并增强媒体阶段识别

- 增强朋友圈 XML 解码与容错清洗,支持 bytes/hex/base64/zstd 场景。

- 补充外部分享(Type 5/42)字段提取与展示,新增 sourceName、链接卡片与导出渲染。

- 通过 Server-Timing + Timing-Allow-Origin 暴露 SNS 媒体阶段信息,前端改为从 ResourceTiming 读取,避免额外探测请求。

- 新增 XML 容错解析与 Server-Timing 注入测试。
This commit is contained in:
2977094657
2026-02-22 18:59:17 +08:00
Unverified
parent 13febebee7
commit 9acbfa7582
7 changed files with 704 additions and 31 deletions
+214 -23
View File
@@ -108,12 +108,13 @@
<div ref="timelineScrollEl" class="flex-1 overflow-auto min-h-0 bg-white" @scroll="onScroll">
<div class="max-w-2xl mx-auto px-4 py-4">
<div class="relative w-full mb-12 -mt-4 bg-white">
<div class="h-64 w-full bg-[#333333] relative overflow-hidden group" @mouseenter="onCoverMediaHover">
<div class="h-64 w-full bg-[#333333] relative overflow-hidden group">
<img
v-if="activeCover && activeCover.media && activeCover.media.length > 0"
:src="getSnsMediaUrl(activeCover, activeCover.media[0], 0, activeCover.media[0].url)"
class="w-full h-full object-cover"
alt="朋友圈封面"
@load="onCoverMediaLoaded(activeCover, $event)"
/>
<div
v-if="snsMediaStageLabel(snsCoverStageKey(activeCover)) || snsMediaStageLoading[snsCoverStageKey(activeCover)]"
@@ -279,13 +280,62 @@
</div>
</div>
<div v-else-if="isExternalShareMoment(post)" class="mt-2 w-full" :class="{ 'privacy-blur': privacyMode }">
<a
v-if="getMomentLinkCardUrl(post)"
:href="getMomentLinkCardUrl(post)"
target="_blank"
rel="noopener noreferrer"
class="block w-full bg-[#F7F7F7] p-2 rounded-sm no-underline hover:bg-[#EFEFEF] transition-colors"
>
<div class="flex items-center gap-3">
<img
v-if="getExternalShareCardThumbSrc(post)"
:src="getExternalShareCardThumbSrc(post)"
class="w-12 h-12 object-cover flex-shrink-0 bg-white"
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@error="onExternalShareCardThumbError(post)"
/>
<div v-else class="w-12 h-12 flex items-center justify-center bg-gray-200 text-gray-400 flex-shrink-0 text-xs">
{{ formatExternalSharePlaceholder(post) }}
</div>
<div class="flex-1 min-w-0 flex items-center overflow-hidden h-12">
<div class="text-[13px] text-gray-900 leading-tight line-clamp-2">{{ formatExternalShareCardTitle(post) }}</div>
</div>
</div>
</a>
<div v-else class="block w-full bg-[#F7F7F7] p-2 rounded-sm">
<div class="flex items-center gap-3">
<img
v-if="getExternalShareCardThumbSrc(post)"
:src="getExternalShareCardThumbSrc(post)"
class="w-12 h-12 object-cover flex-shrink-0 bg-white"
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@error="onExternalShareCardThumbError(post)"
/>
<div v-else class="w-12 h-12 flex items-center justify-center bg-gray-200 text-gray-400 flex-shrink-0 text-xs">
{{ formatExternalSharePlaceholder(post) }}
</div>
<div class="flex-1 min-w-0 flex items-center overflow-hidden h-12">
<div class="text-[13px] text-gray-900 leading-tight line-clamp-2">{{ formatExternalShareCardTitle(post) }}</div>
</div>
</div>
</div>
</div>
<div v-else-if="post.media && post.media.length > 0" class="mt-2" :class="{ 'privacy-blur': privacyMode }">
<div v-if="post.media.length === 1" class="max-w-[360px]">
<div
v-if="!hasMediaError(post.id, 0) && getMediaThumbSrc(post, post.media[0], 0)"
class="inline-block cursor-pointer relative group"
@click.stop="onMediaClick(post, post.media[0], 0)"
@mouseenter="onLivePhotoEnter(post.id, 0, post.media[0]); onSnsMediaHover(post, post.media[0], 0)"
@mouseenter="onLivePhotoEnter(post.id, 0, post.media[0])"
@mouseleave="onLivePhotoLeave(post.id, 0, post.media[0])"
>
<video
@@ -297,7 +347,7 @@
loop
muted
playsinline
@loadeddata="onLocalVideoLoaded(post.id, post.media[0].id)"
@loadeddata="onLocalVideoLoaded(post.id, post.media[0].id); onSnsMediaLoaded(post, post.media[0], 0)"
@error="onLocalVideoError(post.id, post.media[0].id)"
></video>
@@ -311,6 +361,7 @@
loop
:muted="livePhotoHoverMuted"
playsinline
@loadeddata="onSnsMediaLoaded(post, post.media[0], 0)"
@error="onLivePhotoVideoError(post.id, 0)"
></video>
@@ -321,6 +372,7 @@
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@load="onSnsMediaLoaded(post, post.media[0], 0, $event)"
@error="onMediaError(post.id, 0)"
/>
@@ -387,7 +439,7 @@
:key="idx"
class="w-[116px] h-[116px] rounded-[2px] overflow-hidden bg-gray-100 border border-gray-200 flex items-center justify-center cursor-pointer relative group"
@click.stop="onMediaClick(post, m, idx)"
@mouseenter="onLivePhotoEnter(post.id, idx, m); onSnsMediaHover(post, m, idx)"
@mouseenter="onLivePhotoEnter(post.id, idx, m)"
@mouseleave="onLivePhotoLeave(post.id, idx, m)"
>
<video
@@ -399,7 +451,7 @@
loop
muted
playsinline
@loadeddata="onLocalVideoLoaded(post.id, m.id)"
@loadeddata="onLocalVideoLoaded(post.id, m.id); onSnsMediaLoaded(post, m, idx)"
@error="onLocalVideoError(post.id, m.id)"
></video>
<video
@@ -412,6 +464,7 @@
loop
:muted="livePhotoHoverMuted"
playsinline
@loadeddata="onSnsMediaLoaded(post, m, idx)"
@error="onLivePhotoVideoError(post.id, idx)"
></video>
<img
@@ -421,6 +474,7 @@
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@load="onSnsMediaLoaded(post, m, idx, $event)"
@error="onMediaError(post.id, idx)"
/>
@@ -637,6 +691,17 @@ import { SNS_SETTING_USE_CACHE_KEY, readLocalBoolSetting } from '~/utils/desktop
useHead({ title: '朋友圈 - 微信数据分析助手' })
// Nuxt dev mode can load hundreds of module resources, quickly filling the default
// ResourceTiming buffer (150). If it overflows, `<img>` requests may not produce
// entries, making Server-Timing based stage detection always fall back to "unknown".
if (process.client) {
try {
if (typeof performance !== 'undefined' && performance?.setResourceTimingBufferSize) {
performance.setResourceTimingBufferSize(5000)
}
} catch {}
}
const api = useApi()
const chatAccounts = useChatAccountsStore()
@@ -893,6 +958,7 @@ const snsMediaStageLabel = (key) => {
if (source === 'manual-pick') return '手动匹配'
if (source === 'local-heuristic') return '本地兜底'
if (source === 'local-heuristic-next') return '本地兜底(跳过)'
if (source === 'browser-cache') return '浏览器缓存'
if (source === 'bkg-cover') return '封面缓存'
if (source === 'proxy') return '远程代理'
if (source === 'unknown') return '未知'
@@ -908,6 +974,7 @@ const snsMediaStageBadgeColorClass = (key) => {
if (source === 'deterministic-hash') return 'bg-sky-600/85 text-white'
if (source.startsWith('local')) return 'bg-blue-600/85 text-white'
if (source === 'manual-pick') return 'bg-amber-600/90 text-white'
if (source === 'browser-cache') return 'bg-slate-600/85 text-white'
if (source === 'proxy') return 'bg-fuchsia-600/85 text-white'
if (source === 'bkg-cover') return 'bg-indigo-600/85 text-white'
if (source === 'error') return 'bg-red-600/85 text-white'
@@ -929,6 +996,48 @@ const snsMediaStageBadgeTitle = (key) => {
return parts.join(' · ')
}
const readSnsStageFromResourceTiming = (url) => {
try {
if (!process.client) return null
if (typeof performance === 'undefined' || !performance?.getEntriesByName) return null
const u = String(url || '').trim()
if (!u) return null
const entries = performance.getEntriesByName(u) || []
const latest = [...entries].reverse().find((e) => String(e?.entryType || '') === 'resource')
if (!latest) return null
// Prefer backend-injected stage info from `Server-Timing`.
const st = latest?.serverTiming
if (Array.isArray(st) && st.length > 0) {
let source = ''
let hitType = ''
let xEnc = ''
for (const item of st) {
const name = String(item?.name || '').trim()
const desc = String(item?.description || '').trim()
if (name === 'sns_source' && desc) source = desc
else if (name.startsWith('sns_source_')) source = name.slice('sns_source_'.length) || desc
else if (name === 'sns_hit' && desc) hitType = desc
else if (name.startsWith('sns_hit_')) hitType = name.slice('sns_hit_'.length) || desc
else if (name === 'sns_xenc' && desc) xEnc = desc
else if (name.startsWith('sns_xenc_')) xEnc = name.slice('sns_xenc_'.length) || desc
}
if (source) return { source, hitType, xEnc }
}
// When DevTools shows "(from disk cache)", browsers may not expose `serverTiming` at all.
// Best-effort: infer a browser cache hit from ResourceTiming sizes.
const transferSize = Number(latest?.transferSize)
if (Number.isFinite(transferSize) && transferSize === 0) {
return { source: 'browser-cache', hitType: 'transfer=0', xEnc: '' }
}
return null
} catch {
return null
}
}
const ensureSnsMediaStage = async (key, url) => {
if (!process.client) return
const k = String(key || '').trim()
@@ -936,7 +1045,8 @@ const ensureSnsMediaStage = async (key, url) => {
if (!k || !u) return
if (!isSnsMediaApiUrl(u)) return
if (snsMediaStage.value[k]) return
const existingSource = String(snsMediaStage.value?.[k]?.source || '').trim()
if (existingSource && existingSource !== 'unknown') return
if (snsMediaStageLoading.value[k]) return
if (snsMediaStageInFlight.has(k)) return
@@ -944,36 +1054,42 @@ const ensureSnsMediaStage = async (key, url) => {
snsMediaStageLoading.value[k] = true
try {
const resp = await fetch(u, { method: 'GET', mode: 'cors', cache: 'force-cache' })
const source = String(resp.headers.get('X-SNS-Source') || '').trim() || 'unknown'
const hitType = String(resp.headers.get('X-SNS-Hit-Type') || '').trim()
const xEnc = String(resp.headers.get('X-SNS-X-Enc') || '').trim()
snsMediaStage.value[k] = { source, hitType, xEnc }
try {
resp.body?.cancel?.()
} catch {}
} catch {
snsMediaStage.value[k] = { source: 'error', hitType: '', xEnc: '' }
// Prefer stage info from the *same* request that loaded the <img>/<video> element
// (via Server-Timing + Timing-Allow-Origin), to avoid a non-idempotent extra fetch.
let info = null
for (const delayMs of [0, 0, 16, 50, 120, 250, 500]) {
if (delayMs) await new Promise((resolve) => setTimeout(resolve, delayMs))
info = readSnsStageFromResourceTiming(u)
if (info) break
}
snsMediaStage.value[k] = info || { source: 'unknown', hitType: '', xEnc: '' }
} finally {
snsMediaStageLoading.value[k] = false
snsMediaStageInFlight.delete(k)
}
}
const onSnsMediaHover = (post, m, idx = 0) => {
const eventCurrentSrc = (ev) => {
try {
const el = ev?.target || ev?.currentTarget
return String(el?.currentSrc || el?.src || '').trim()
} catch {
return ''
}
}
const onSnsMediaLoaded = (post, m, idx = 0, ev) => {
const pid = String(post?.id || '').trim()
if (!pid) return
const key = snsMediaStageKey(pid, idx, 'thumb')
const u = getMediaThumbSrc(post, m, idx)
const u = eventCurrentSrc(ev) || getMediaThumbSrc(post, m, idx)
ensureSnsMediaStage(key, u)
}
const onCoverMediaHover = () => {
const c = activeCover.value
const onCoverMediaLoaded = (cover, ev) => {
const c = cover || activeCover.value
if (!c || !Array.isArray(c.media) || c.media.length <= 0) return
const u = getSnsMediaUrl(c, c.media[0], 0, c.media[0].url)
const u = eventCurrentSrc(ev) || getSnsMediaUrl(c, c.media[0], 0, c.media[0].url)
ensureSnsMediaStage(snsCoverStageKey(c), u)
}
@@ -1105,6 +1221,68 @@ const getFinderFeedThumbSrc = (post) => {
return getProxyExternalUrl(u)
}
const getMomentLinkCardUrl = (post) => {
const u = String(post?.contentUrl || '').trim()
if (u) return u
const list = Array.isArray(post?.media) ? post.media : []
const m0 = list.length > 0 ? list[0] : null
const u2 = String(m0?.url || '').trim()
return u2
}
const isExternalShareMoment = (post) => {
const t = Number(post?.type || 0)
return t === 42 || t === 5
}
const formatExternalShareUrlLabel = (url) => {
const u = String(url || '').trim()
if (!u) return ''
try {
const parsed = new URL(u)
const host = String(parsed.hostname || '').replace(/^www\\./, '')
const path = String(parsed.pathname || '')
const out = `${host}${path && path !== '/' ? path : ''}`
return out || u
} catch {
return u
}
}
const formatExternalSharePlaceholder = (post) => {
const t = Number(post?.type || 0)
if (t === 42) return '音乐'
return '链接'
}
const formatExternalShareCardTitle = (post) => {
const title = String(post?.title || '').trim()
if (title) return title
const u = String(getMomentLinkCardUrl(post) || '').trim()
if (u) return formatExternalShareUrlLabel(u)
const t = Number(post?.type || 0)
if (t === 42) return '音乐分享'
return '外部分享'
}
const getExternalShareCardThumbSrc = (post) => {
const pid = String(post?.id || '').trim()
if (!pid) return ''
const list = Array.isArray(post?.media) ? post.media : []
const m0 = list.length > 0 ? list[0] : null
if (!m0) return ''
if (hasMediaError(pid, 0)) return ''
return getMediaThumbSrc(post, m0, 0)
}
const onExternalShareCardThumbError = (post) => {
const pid = String(post?.id || '').trim()
if (!pid) return
onMediaError(pid, 0)
}
const formatFinderFeedCardText = (post) => {
const title = String(post?.title || '').trim()
if (title) return title
@@ -1126,6 +1304,18 @@ const formatMomentOfficialSource = (post) => {
return name ? `${prefix}·${name}` : prefix
}
const formatExternalShareSourceLabel = (post) => {
// Prefer DB-provided source name from Moments XML: `<appInfo><appName>...`
const n = String(post?.sourceName || '').trim()
if (n) return n
const url = String(getMomentLinkCardUrl(post) || '').trim()
if (!url) {
return Number(post?.type || 0) === 42 ? '音乐' : '外部分享'
}
return formatExternalShareUrlLabel(url)
}
const formatMomentTypeLabel = (post) => {
const t = Number(post?.type || 0)
if (!t) return ''
@@ -1134,6 +1324,7 @@ const formatMomentTypeLabel = (post) => {
const name = String(post?.finderFeed?.nickname || '').trim()
return name ? `视频号·${name}` : '视频号'
}
if (isExternalShareMoment(post)) return formatExternalShareSourceLabel(post)
return ''
}
+24
View File
@@ -5,6 +5,7 @@ from pathlib import Path
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from starlette.requests import Request
from starlette.exceptions import HTTPException as StarletteHTTPException
from starlette.responses import FileResponse
from starlette.staticfiles import StaticFiles
@@ -25,6 +26,7 @@ from .routers.sns import router as _sns_router
from .routers.sns_export import router as _sns_export_router
from .routers.wechat_detection import router as _wechat_detection_router
from .routers.wrapped import router as _wrapped_router
from .sns_stage_timing import add_sns_stage_timing_headers
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
# 初始化日志系统
@@ -50,6 +52,28 @@ app.add_middleware(
expose_headers=["X-SNS-Source", "X-SNS-Hit-Type", "X-SNS-X-Enc"],
)
@app.middleware("http")
async def _add_sns_stage_timing_headers(request: Request, call_next):
"""Expose SNS stage metadata to the frontend without extra requests.
`<img>` elements can't read response headers, but browsers can surface `Server-Timing`
via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` is set.
"""
response = await call_next(request)
try:
add_sns_stage_timing_headers(
response.headers,
source=str(response.headers.get("X-SNS-Source") or ""),
hit_type=str(response.headers.get("X-SNS-Hit-Type") or ""),
x_enc=str(response.headers.get("X-SNS-X-Enc") or ""),
)
except Exception:
pass
return response
app.include_router(_health_router)
app.include_router(_wechat_detection_router)
app.include_router(_decrypt_router)
+237 -8
View File
@@ -35,6 +35,11 @@ from ..wcdb_realtime import (
get_sns_timeline as _wcdb_get_sns_timeline,
)
try:
import zstandard as zstd # type: ignore
except Exception:
zstd = None
logger = get_logger(__name__)
router = APIRouter(route_class=PathFixRoute)
@@ -43,6 +48,11 @@ SNS_MEDIA_PICKS_FILE = "_sns_media_picks.json"
_SNS_VIDEO_KEY_RE = re.compile(r'<enc\s+key="(\d+)"', flags=re.IGNORECASE)
_MP_BIZ_RE = re.compile(r"__biz=([A-Za-z0-9_=+-]+)")
_ZSTD_MAGIC = b"\x28\xb5\x2f\xfd"
_SNS_APP_NAME_RE = re.compile(r"<appname[^>]*>([\s\S]*?)</appname>", flags=re.IGNORECASE)
_SNS_XML_CDATA_BLOCK_RE = re.compile(r"<!\[CDATA\[[\s\S]*?\]\]>", flags=re.IGNORECASE)
_SNS_XML_BARE_AMP_RE = re.compile(r"&(?!(?:[a-zA-Z]+|#\d+|#x[0-9a-fA-F]+);)")
_SNS_XML_INVALID_CHARS_RE = re.compile(r"[\x00-\x08\x0b\x0c\x0e-\x1f]")
_SNS_REALTIME_SYNC_STATE_FILE = "_sns_realtime_sync_state.json"
_SNS_DECRYPTED_DB_LOCKS: dict[str, threading.Lock] = {}
@@ -456,14 +466,154 @@ def _get_biz_to_official_index(contact_db_path: Path) -> dict[str, dict[str, Any
def _extract_sns_video_key(raw_xml: Any) -> str:
"""Extract Isaac64 video key from raw XML, e.g. `<enc key="1578806206" ...>`."""
text = _decode_sns_text_blob(raw_xml)
m = _SNS_VIDEO_KEY_RE.search(text or "")
return str(m.group(1) or "").strip() if m else ""
def _looks_like_xml_text(s: str) -> bool:
if not s:
return False
t = str(s).lstrip()
if t.startswith('"') and t.endswith('"'):
t = t.strip('"').lstrip()
return t.startswith("<")
def _sanitize_wechat_xml_for_et(xml_text: str) -> str:
"""Best-effort sanitize for ElementTree parsing.
WeChat Moments "XML" is sometimes not well-formed XML (commonly: raw `&` inside URLs),
which breaks `xml.etree.ElementTree.fromstring`. We keep CDATA blocks intact and:
- strip invalid control chars
- escape bare `&` outside CDATA blocks
"""
s = str(xml_text or "")
if not s:
return ""
s = _SNS_XML_INVALID_CHARS_RE.sub("", s)
parts: list[str] = []
last = 0
for m in _SNS_XML_CDATA_BLOCK_RE.finditer(s):
head = s[last : m.start()]
if head:
parts.append(_SNS_XML_BARE_AMP_RE.sub("&amp;", head))
parts.append(m.group(0))
last = m.end()
tail = s[last:]
if tail:
parts.append(_SNS_XML_BARE_AMP_RE.sub("&amp;", tail))
return "".join(parts)
def _decode_sns_text_blob(value: Any) -> str:
"""Decode text/blob values that may be hex/base64 encoded and/or zstd-compressed.
WeChat WCDB realtime can return TEXT/BLOB fields as:
- plain XML string
- hex string (often a zstd frame starting with 28b52ffd...)
- base64 string (same)
"""
if value is None:
return ""
if isinstance(value, memoryview):
raw = bytes(value)
if raw and zstd is not None and raw.startswith(_ZSTD_MAGIC):
try:
raw = zstd.decompress(raw)
except Exception:
pass
try:
s = raw.decode("utf-8", errors="ignore")
except Exception:
s = ""
s = html.unescape(str(s or "").strip())
return s if _looks_like_xml_text(s) else (str(s or "").strip())
if isinstance(value, (bytes, bytearray)):
raw = bytes(value)
if raw and zstd is not None and raw.startswith(_ZSTD_MAGIC):
try:
raw = zstd.decompress(raw)
except Exception:
pass
try:
s = raw.decode("utf-8", errors="ignore")
except Exception:
s = ""
s = html.unescape(str(s or "").strip())
return s if _looks_like_xml_text(s) else (str(s or "").strip())
try:
text = str(raw_xml or "")
text = str(value or "")
except Exception:
return ""
text = html.unescape(text.strip())
if not text:
return ""
m = _SNS_VIDEO_KEY_RE.search(text)
return str(m.group(1) or "").strip() if m else ""
if _looks_like_xml_text(text):
return text
def _accept_xml(decoded: str) -> str:
s2 = html.unescape(str(decoded or "").strip())
return s2 if _looks_like_xml_text(s2) else ""
# Hex string (optionally prefixed with 0x)
t_hex = text[2:] if text.lower().startswith("0x") else text
if len(t_hex) >= 16 and len(t_hex) % 2 == 0 and re.fullmatch(r"[0-9a-fA-F]+", t_hex):
try:
raw = bytes.fromhex(t_hex)
if raw and zstd is not None and raw.startswith(_ZSTD_MAGIC):
try:
raw = zstd.decompress(raw)
except Exception:
raw = b""
if raw:
s2 = _accept_xml(raw.decode("utf-8", errors="ignore"))
if s2:
return s2
except Exception:
pass
# Base64 string
if len(text) >= 24 and len(text) % 4 == 0 and re.fullmatch(r"[A-Za-z0-9+/=]+", text):
try:
raw = base64.b64decode(text)
if raw and zstd is not None and raw.startswith(_ZSTD_MAGIC):
try:
raw = zstd.decompress(raw)
except Exception:
raw = b""
if raw:
s2 = _accept_xml(raw.decode("utf-8", errors="ignore"))
if s2:
return s2
except Exception:
pass
return text
def _extract_sns_source_name(raw_xml: Any) -> str:
text = _decode_sns_text_blob(raw_xml)
if not text:
return ""
m = _SNS_APP_NAME_RE.search(text)
if not m:
return ""
v = str(m.group(1) or "")
v = v.replace("<![CDATA[", "").replace("]]>", "")
v = re.sub(r"<[^>]+>", "", v)
return html.unescape(v.strip())
def _build_location_text(node: Optional[ET.Element]) -> str:
@@ -508,6 +658,7 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
"createTime": 0,
"contentDesc": "",
"location": "",
"sourceName": "",
"media": [],
"likes": [],
"comments": [],
@@ -517,16 +668,43 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
"finderFeed": {}
}
xml_str = str(xml_text or "").strip()
xml_str = _decode_sns_text_blob(xml_text)
if not xml_str:
return out
try:
root = ET.fromstring(xml_str)
root = ET.fromstring(_sanitize_wechat_xml_for_et(xml_str))
except Exception:
return out
# External share source label (e.g. QQ音乐 / 哔哩哔哩) is usually stored in `<appInfo><appName>...`.
try:
for el in root.iter():
try:
tag = str(el.tag or "").lower()
except Exception:
continue
if tag in {"appname", "sourcename"}:
v = str(el.text or "").strip()
if v:
out["sourceName"] = html.unescape(v).strip()
break
try:
attrs = el.attrib or {}
except Exception:
attrs = {}
for k, v in attrs.items():
if str(k or "").lower() in {"appname", "sourcename"}:
vv = str(v or "").strip()
if vv:
out["sourceName"] = html.unescape(vv).strip()
break
if out["sourceName"]:
break
except Exception:
pass
def _find_text(*paths: str) -> str:
for p in paths:
try:
@@ -560,6 +738,42 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
out["title"] = _find_text(".//ContentObject/title")
out["contentUrl"] = _clean_url(_find_text(".//ContentObject/contentUrl"))
# --- 如果是外部分享链接 (Type 5) ---
if post_type == 5:
out["title"] = _find_text(
".//ContentObject/title",
".//ContentObject/linkTitle",
".//ContentObject/name",
".//ContentObject/desc",
".//ContentObject/description",
)
out["contentUrl"] = _clean_url(
_find_text(
".//ContentObject/contentUrl",
".//ContentObject/linkUrl",
".//ContentObject/url",
".//ContentObject/jumpUrl",
)
)
# --- 如果是音乐分享/链接卡片 (Type 42) ---
if post_type == 42:
# WeChat sometimes stores link/music share metadata under ContentObject fields.
out["title"] = _find_text(
".//ContentObject/title",
".//ContentObject/linkTitle",
".//ContentObject/name",
".//ContentObject/desc",
)
out["contentUrl"] = _clean_url(
_find_text(
".//ContentObject/contentUrl",
".//ContentObject/linkUrl",
".//ContentObject/url",
".//ContentObject/jumpUrl",
)
)
# --- 如果是视频号 (Type 28) ---
if post_type == 28:
out["title"] = _find_text(".//ContentObject/title")
@@ -604,6 +818,14 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
pass
out["media"] = media
# Fallback: some type=42 shares only expose the jump URL via media[0].url.
if post_type in (5, 42):
if (not str(out.get("contentUrl") or "").strip()) and media:
m0 = media[0] if isinstance(media[0], dict) else {}
u0 = str(m0.get("url") or "").strip()
if u0:
out["contentUrl"] = u0
likes: list[str] = []
try:
for u in root.findall(".//likeList//like//username"):
@@ -1500,7 +1722,7 @@ def sync_sns_realtime_timeline_latest(
tid_val = int(rr.get("tid") or 0)
except Exception:
continue
content_xml = str(rr.get("content") or "")
content_xml = _decode_sns_text_blob(rr.get("content"))
if not content_xml:
continue
uname = str(rr.get("user_name") or rr.get("username") or "").strip()
@@ -1675,6 +1897,7 @@ def list_sns_timeline(
"createTime": int(parsed2.get("createTime") or 0),
"contentDesc": str(parsed2.get("contentDesc") or ""),
"location": str(parsed2.get("location") or ""),
"sourceName": str(parsed2.get("sourceName") or ""),
"media": parsed2.get("media") or [],
"likes": parsed2.get("likes") or [],
"comments": parsed2.get("comments") or [],
@@ -1819,7 +2042,7 @@ def list_sns_timeline(
if not uname3:
continue
content_xml3 = str(rr.get("content") or "")
content_xml3 = _decode_sns_text_blob(rr.get("content"))
if not content_xml3:
continue
@@ -1871,6 +2094,7 @@ def list_sns_timeline(
"createTime": int(parsed3.get("createTime") or 0),
"contentDesc": str(parsed3.get("contentDesc") or ""),
"location": str(parsed3.get("location") or ""),
"sourceName": str(parsed3.get("sourceName") or ""),
"media": parsed3.get("media") or [],
"likes": parsed3.get("likes") or [],
"comments": parsed3.get("comments") or [],
@@ -2020,7 +2244,7 @@ def list_sns_timeline(
tid_val = int(rr.get("tid"))
except Exception:
continue
content_xml = str(rr.get("content") or "")
content_xml = _decode_sns_text_blob(rr.get("content"))
if content_xml:
content_by_tid[tid_val] = content_xml
uname1 = str(rr.get("user_name") or rr.get("username") or "").strip()
@@ -2083,6 +2307,7 @@ def list_sns_timeline(
# Enrich with parsed XML when available.
location = str(r.get("location") or "")
source_name = _extract_sns_source_name(r.get("rawXml"))
post_type = 1
title = ""
@@ -2098,6 +2323,9 @@ def list_sns_timeline(
parsed = _parse_timeline_xml(xml, uname)
if parsed.get("location"):
location = str(parsed.get("location") or "")
sn0 = str(parsed.get("sourceName") or "").strip()
if sn0:
source_name = sn0
post_type = parsed.get("type", 1)
@@ -2171,6 +2399,7 @@ def list_sns_timeline(
"createTime": create_time,
"contentDesc": content_desc,
"location": str(location or ""),
"sourceName": str(source_name or ""),
"media": media,
"likes": likes,
"comments": comments,
@@ -1077,6 +1077,21 @@ class SnsExportManager:
ff = p.get("finderFeed") if isinstance(p.get("finderFeed"), dict) else {}
name = str(ff.get("nickname") or "").strip() if isinstance(ff, dict) else ""
return f"视频号·{name}" if name else "视频号"
if t in (5, 42):
name0 = str(p.get("sourceName") or "").strip()
if name0:
return name0
url0 = str(p.get("contentUrl") or "").strip()
if not url0:
ml0 = p.get("media") if isinstance(p.get("media"), list) else []
m0 = ml0[0] if (ml0 and isinstance(ml0[0], dict)) else {}
url0 = str(m0.get("url") or "").strip()
if url0:
# host+path (no query) as a readable fallback label.
s = re.sub(r"^https?://", "", url0.strip(), flags=re.I)
s = s.split("#", 1)[0].split("?", 1)[0].rstrip("/")
return s or ("音乐" if t == 42 else "外部分享")
return "音乐" if t == 42 else "外部分享"
return ""
def format_finder_feed_card_text(p: dict[str, Any]) -> str:
@@ -1200,6 +1215,45 @@ class SnsExportManager:
out.append("</div></div>")
out.append("</a>" if content_url else "</div>")
out.append("</div>")
elif post_type in (5, 42):
# External share card (WeChat-like, clickable).
content_url = str(post.get("contentUrl") or "").strip()
title0 = str(post.get("title") or "").strip()
media_list = post.get("media") if isinstance(post.get("media"), list) else []
m0 = media_list[0] if (media_list and isinstance(media_list[0], dict)) else {}
if not content_url and m0:
content_url = str(m0.get("url") or "").strip()
if not title0:
title0 = content_url or ("音乐分享" if post_type == 42 else "外部分享")
thumb_arc = export_image_to_zip(zf=zf, post=post, media=m0, idx=0, prefer_thumb=True) if m0 else ""
placeholder = "音乐" if post_type == 42 else "链接"
out.append('<div class="mt-2 w-full">')
if content_url:
out.append(
f'<a href="{_esc_attr(content_url)}" target="_blank" rel="noopener noreferrer" '
'class="block w-full bg-[#F7F7F7] p-2 rounded-sm no-underline hover:bg-[#EFEFEF] transition-colors">'
)
else:
out.append('<div class="block w-full bg-[#F7F7F7] p-2 rounded-sm">')
out.append('<div class="flex items-center gap-3">')
if thumb_arc:
out.append(
f'<img src="{_esc_attr(thumb_arc)}" class="w-12 h-12 object-cover flex-shrink-0 bg-white" '
'alt="" loading="lazy" referrerpolicy="no-referrer" />'
)
else:
out.append(
f'<div class="w-12 h-12 flex items-center justify-center bg-gray-200 text-gray-400 flex-shrink-0 text-xs">{_esc_text(placeholder)}</div>'
)
out.append('<div class="flex-1 min-w-0 flex items-center overflow-hidden h-12">')
out.append(f'<div class="text-[13px] text-gray-900 leading-tight line-clamp-2">{_esc_text(title0)}</div>')
out.append("</div></div>")
out.append("</a>" if content_url else "</div>")
out.append("</div>")
elif post_type == 28 and isinstance(post.get("finderFeed"), dict) and post.get("finderFeed"):
ff = post.get("finderFeed") if isinstance(post.get("finderFeed"), dict) else {}
thumb_url = str(ff.get("thumbUrl") or "").strip() if isinstance(ff, dict) else ""
@@ -0,0 +1,63 @@
import re
from collections.abc import MutableMapping
def add_sns_stage_timing_headers(
headers: MutableMapping[str, str],
*,
source: str,
hit_type: str = "",
x_enc: str = "",
) -> None:
"""Inject `Server-Timing` + `Timing-Allow-Origin` for SNS media stage inspection.
The frontend can't read `<img>` response headers, but browsers expose `Server-Timing` metrics
via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` allows it.
This helper is intentionally side-effect free beyond mutating `headers`.
"""
src = str(source or "").strip()
if not src:
return
ht = str(hit_type or "").strip()
xe = str(x_enc or "").strip()
if "Timing-Allow-Origin" not in headers:
headers["Timing-Allow-Origin"] = "*"
def _esc(v: str) -> str:
return v.replace("\\", "\\\\").replace('"', '\\"')
def _token(v: str) -> str:
raw = str(v or "").strip()
if not raw:
return ""
raw = raw.replace(" ", "_")
safe = re.sub(r"[^0-9A-Za-z_.-]+", "_", raw).strip("_")
if not safe:
return ""
return safe[:64]
parts: list[str] = []
src_tok = _token(src) or "unknown"
parts.append(f'sns_source_{src_tok};dur=0;desc="{_esc(src)}"')
if ht:
ht_tok = _token(ht)
if ht_tok:
parts.append(f'sns_hit_{ht_tok};dur=0;desc="{_esc(ht)}"')
if xe:
xe_tok = _token(xe)
if xe_tok:
parts.append(f'sns_xenc_{xe_tok};dur=0;desc="{_esc(xe)}"')
existing = str(headers.get("Server-Timing") or "").strip()
# Some responses may already have upstream `Server-Timing` metrics. Always append ours so
# the frontend can consistently read `sns_source_*` via ResourceTiming.serverTiming.
if existing and re.search(r"(^|,\\s*)sns_source(_|;)", existing):
return
combined = ", ".join(parts)
headers["Server-Timing"] = f"{existing}, {combined}" if existing else combined
@@ -0,0 +1,72 @@
import sys
import unittest
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers.sns import _parse_timeline_xml # noqa: E402 pylint: disable=wrong-import-position
class TestSnsParseTimelineXmlSanitization(unittest.TestCase):
def test_external_share_type5_parses_with_raw_ampersands(self):
xml = (
"<SnsDataItem><TimelineObject>"
"<username>wxid_2az0agby0baa22</username>"
"<createTime>1771500773</createTime>"
"<contentDesc>让我看看它和suno有什么区别</contentDesc>"
"<ContentObject>"
"<type>5</type>"
"<title>Google Gemini 上线了AI音乐生成功能</title>"
"<contentUrl>https://b23.tv/lVa1lpm?share_medium=android&share_source=weixin_moments</contentUrl>"
"</ContentObject>"
"<appInfo><appName>哔哩哔哩</appName></appInfo>"
"<mediaList><media>"
"<type>4</type><id>m1</id>"
"<url>https://b23.tv/lVa1lpm?share_medium=android&share_source=weixin_moments</url>"
"<thumb>http://shmmsns.qpic.cn/mmsns/test/150</thumb>"
"</media></mediaList>"
"</TimelineObject></SnsDataItem>"
)
out = _parse_timeline_xml(xml, "fallback")
self.assertEqual(out.get("type"), 5)
self.assertEqual(out.get("title"), "Google Gemini 上线了AI音乐生成功能")
self.assertEqual(out.get("sourceName"), "哔哩哔哩")
self.assertIn("&share_source=weixin_moments", str(out.get("contentUrl") or ""))
self.assertTrue(isinstance(out.get("media"), list) and len(out.get("media") or []) == 1)
def test_external_share_type42_parses_with_raw_ampersands(self):
xml = (
"<SnsDataItem><TimelineObject>"
"<username>wxid_all914izz7w222</username>"
"<createTime>1771504315</createTime>"
"<contentDesc>2026 恭喜自己 也恭喜你</contentDesc>"
"<ContentObject>"
"<type>42</type>"
"<title>恭喜自己</title>"
"<description>成龙/周华健</description>"
"<contentUrl>https://i.y.qq.com/v8/playsong.html?platform=11&appshare=android_qq</contentUrl>"
"</ContentObject>"
"<appInfo><appName>QQ音乐</appName></appInfo>"
"<mediaList><media>"
"<type>5</type><id>m2</id>"
"<url>http://c6.y.qq.com/rsc/fcgi-bin/fcg_pyq_play.fcg?songmid=002kNnX90keHGW&fromtag=46</url>"
"<thumb>http://szmmsns.qpic.cn/mmsns/test/0</thumb>"
"</media></mediaList>"
"</TimelineObject></SnsDataItem>"
)
out = _parse_timeline_xml(xml, "fallback")
self.assertEqual(out.get("type"), 42)
self.assertEqual(out.get("title"), "恭喜自己")
self.assertEqual(out.get("sourceName"), "QQ音乐")
self.assertIn("&appshare=android_qq", str(out.get("contentUrl") or ""))
self.assertTrue(isinstance(out.get("media"), list) and len(out.get("media") or []) == 1)
if __name__ == "__main__":
unittest.main()
+40
View File
@@ -0,0 +1,40 @@
import sys
import unittest
from pathlib import Path
from starlette.responses import Response
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.sns_stage_timing import add_sns_stage_timing_headers # noqa: E402 pylint: disable=wrong-import-position
class TestSnsStageServerTiming(unittest.TestCase):
def test_injects_server_timing_when_missing(self):
resp = Response(content=b"ok")
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertIn("sns_source_", st)
self.assertIn("proxy", st)
def test_appends_when_upstream_server_timing_exists(self):
resp = Response(content=b"ok")
resp.headers["Server-Timing"] = "edge;dur=1"
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertIn("edge;dur=1", st)
self.assertIn("sns_source_", st)
def test_does_not_duplicate_existing_sns_source_metric(self):
resp = Response(content=b"ok")
resp.headers["Server-Timing"] = 'sns_source_proxy;dur=0;desc="proxy"'
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertEqual(st.count("sns_source_"), 1)
if __name__ == "__main__":
unittest.main()