+
+
+
@@ -311,6 +361,7 @@
loop
:muted="livePhotoHoverMuted"
playsinline
+ @loadeddata="onSnsMediaLoaded(post, post.media[0], 0)"
@error="onLivePhotoVideoError(post.id, 0)"
>
@@ -321,6 +372,7 @@
alt=""
loading="lazy"
referrerpolicy="no-referrer"
+ @load="onSnsMediaLoaded(post, post.media[0], 0, $event)"
@error="onMediaError(post.id, 0)"
/>
@@ -387,7 +439,7 @@
:key="idx"
class="w-[116px] h-[116px] rounded-[2px] overflow-hidden bg-gray-100 border border-gray-200 flex items-center justify-center cursor-pointer relative group"
@click.stop="onMediaClick(post, m, idx)"
- @mouseenter="onLivePhotoEnter(post.id, idx, m); onSnsMediaHover(post, m, idx)"
+ @mouseenter="onLivePhotoEnter(post.id, idx, m)"
@mouseleave="onLivePhotoLeave(post.id, idx, m)"
>
![]()
@@ -637,6 +691,17 @@ import { SNS_SETTING_USE_CACHE_KEY, readLocalBoolSetting } from '~/utils/desktop
useHead({ title: '朋友圈 - 微信数据分析助手' })
+// Nuxt dev mode can load hundreds of module resources, quickly filling the default
+// ResourceTiming buffer (150). If it overflows, `
![]()
` requests may not produce
+// entries, making Server-Timing based stage detection always fall back to "unknown".
+if (process.client) {
+ try {
+ if (typeof performance !== 'undefined' && performance?.setResourceTimingBufferSize) {
+ performance.setResourceTimingBufferSize(5000)
+ }
+ } catch {}
+}
+
const api = useApi()
const chatAccounts = useChatAccountsStore()
@@ -893,6 +958,7 @@ const snsMediaStageLabel = (key) => {
if (source === 'manual-pick') return '手动匹配'
if (source === 'local-heuristic') return '本地兜底'
if (source === 'local-heuristic-next') return '本地兜底(跳过)'
+ if (source === 'browser-cache') return '浏览器缓存'
if (source === 'bkg-cover') return '封面缓存'
if (source === 'proxy') return '远程代理'
if (source === 'unknown') return '未知'
@@ -908,6 +974,7 @@ const snsMediaStageBadgeColorClass = (key) => {
if (source === 'deterministic-hash') return 'bg-sky-600/85 text-white'
if (source.startsWith('local')) return 'bg-blue-600/85 text-white'
if (source === 'manual-pick') return 'bg-amber-600/90 text-white'
+ if (source === 'browser-cache') return 'bg-slate-600/85 text-white'
if (source === 'proxy') return 'bg-fuchsia-600/85 text-white'
if (source === 'bkg-cover') return 'bg-indigo-600/85 text-white'
if (source === 'error') return 'bg-red-600/85 text-white'
@@ -929,6 +996,48 @@ const snsMediaStageBadgeTitle = (key) => {
return parts.join(' · ')
}
+const readSnsStageFromResourceTiming = (url) => {
+ try {
+ if (!process.client) return null
+ if (typeof performance === 'undefined' || !performance?.getEntriesByName) return null
+ const u = String(url || '').trim()
+ if (!u) return null
+ const entries = performance.getEntriesByName(u) || []
+ const latest = [...entries].reverse().find((e) => String(e?.entryType || '') === 'resource')
+ if (!latest) return null
+
+ // Prefer backend-injected stage info from `Server-Timing`.
+ const st = latest?.serverTiming
+ if (Array.isArray(st) && st.length > 0) {
+ let source = ''
+ let hitType = ''
+ let xEnc = ''
+ for (const item of st) {
+ const name = String(item?.name || '').trim()
+ const desc = String(item?.description || '').trim()
+ if (name === 'sns_source' && desc) source = desc
+ else if (name.startsWith('sns_source_')) source = name.slice('sns_source_'.length) || desc
+ else if (name === 'sns_hit' && desc) hitType = desc
+ else if (name.startsWith('sns_hit_')) hitType = name.slice('sns_hit_'.length) || desc
+ else if (name === 'sns_xenc' && desc) xEnc = desc
+ else if (name.startsWith('sns_xenc_')) xEnc = name.slice('sns_xenc_'.length) || desc
+ }
+ if (source) return { source, hitType, xEnc }
+ }
+
+ // When DevTools shows "(from disk cache)", browsers may not expose `serverTiming` at all.
+ // Best-effort: infer a browser cache hit from ResourceTiming sizes.
+ const transferSize = Number(latest?.transferSize)
+ if (Number.isFinite(transferSize) && transferSize === 0) {
+ return { source: 'browser-cache', hitType: 'transfer=0', xEnc: '' }
+ }
+
+ return null
+ } catch {
+ return null
+ }
+}
+
const ensureSnsMediaStage = async (key, url) => {
if (!process.client) return
const k = String(key || '').trim()
@@ -936,7 +1045,8 @@ const ensureSnsMediaStage = async (key, url) => {
if (!k || !u) return
if (!isSnsMediaApiUrl(u)) return
- if (snsMediaStage.value[k]) return
+ const existingSource = String(snsMediaStage.value?.[k]?.source || '').trim()
+ if (existingSource && existingSource !== 'unknown') return
if (snsMediaStageLoading.value[k]) return
if (snsMediaStageInFlight.has(k)) return
@@ -944,36 +1054,42 @@ const ensureSnsMediaStage = async (key, url) => {
snsMediaStageLoading.value[k] = true
try {
- const resp = await fetch(u, { method: 'GET', mode: 'cors', cache: 'force-cache' })
- const source = String(resp.headers.get('X-SNS-Source') || '').trim() || 'unknown'
- const hitType = String(resp.headers.get('X-SNS-Hit-Type') || '').trim()
- const xEnc = String(resp.headers.get('X-SNS-X-Enc') || '').trim()
-
- snsMediaStage.value[k] = { source, hitType, xEnc }
-
- try {
- resp.body?.cancel?.()
- } catch {}
- } catch {
- snsMediaStage.value[k] = { source: 'error', hitType: '', xEnc: '' }
+ // Prefer stage info from the *same* request that loaded the
![]()
/
")
out.append("" if content_url else "
")
out.append("
")
+ elif post_type in (5, 42):
+ # External share card (WeChat-like, clickable).
+ content_url = str(post.get("contentUrl") or "").strip()
+ title0 = str(post.get("title") or "").strip()
+ media_list = post.get("media") if isinstance(post.get("media"), list) else []
+ m0 = media_list[0] if (media_list and isinstance(media_list[0], dict)) else {}
+ if not content_url and m0:
+ content_url = str(m0.get("url") or "").strip()
+
+ if not title0:
+ title0 = content_url or ("音乐分享" if post_type == 42 else "外部分享")
+
+ thumb_arc = export_image_to_zip(zf=zf, post=post, media=m0, idx=0, prefer_thumb=True) if m0 else ""
+
+ placeholder = "音乐" if post_type == 42 else "链接"
+ out.append('
")
elif post_type == 28 and isinstance(post.get("finderFeed"), dict) and post.get("finderFeed"):
ff = post.get("finderFeed") if isinstance(post.get("finderFeed"), dict) else {}
thumb_url = str(ff.get("thumbUrl") or "").strip() if isinstance(ff, dict) else ""
diff --git a/src/wechat_decrypt_tool/sns_stage_timing.py b/src/wechat_decrypt_tool/sns_stage_timing.py
new file mode 100644
index 0000000..f7221be
--- /dev/null
+++ b/src/wechat_decrypt_tool/sns_stage_timing.py
@@ -0,0 +1,63 @@
+import re
+from collections.abc import MutableMapping
+
+
+def add_sns_stage_timing_headers(
+ headers: MutableMapping[str, str],
+ *,
+ source: str,
+ hit_type: str = "",
+ x_enc: str = "",
+) -> None:
+ """Inject `Server-Timing` + `Timing-Allow-Origin` for SNS media stage inspection.
+
+ The frontend can't read `
![]()
` response headers, but browsers expose `Server-Timing` metrics
+ via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` allows it.
+
+ This helper is intentionally side-effect free beyond mutating `headers`.
+ """
+
+ src = str(source or "").strip()
+ if not src:
+ return
+
+ ht = str(hit_type or "").strip()
+ xe = str(x_enc or "").strip()
+
+ if "Timing-Allow-Origin" not in headers:
+ headers["Timing-Allow-Origin"] = "*"
+
+ def _esc(v: str) -> str:
+ return v.replace("\\", "\\\\").replace('"', '\\"')
+
+ def _token(v: str) -> str:
+ raw = str(v or "").strip()
+ if not raw:
+ return ""
+ raw = raw.replace(" ", "_")
+ safe = re.sub(r"[^0-9A-Za-z_.-]+", "_", raw).strip("_")
+ if not safe:
+ return ""
+ return safe[:64]
+
+ parts: list[str] = []
+ src_tok = _token(src) or "unknown"
+ parts.append(f'sns_source_{src_tok};dur=0;desc="{_esc(src)}"')
+ if ht:
+ ht_tok = _token(ht)
+ if ht_tok:
+ parts.append(f'sns_hit_{ht_tok};dur=0;desc="{_esc(ht)}"')
+ if xe:
+ xe_tok = _token(xe)
+ if xe_tok:
+ parts.append(f'sns_xenc_{xe_tok};dur=0;desc="{_esc(xe)}"')
+
+ existing = str(headers.get("Server-Timing") or "").strip()
+ # Some responses may already have upstream `Server-Timing` metrics. Always append ours so
+ # the frontend can consistently read `sns_source_*` via ResourceTiming.serverTiming.
+ if existing and re.search(r"(^|,\\s*)sns_source(_|;)", existing):
+ return
+
+ combined = ", ".join(parts)
+ headers["Server-Timing"] = f"{existing}, {combined}" if existing else combined
+
diff --git a/tests/test_sns_parse_timeline_xml_sanitization.py b/tests/test_sns_parse_timeline_xml_sanitization.py
new file mode 100644
index 0000000..40e5b48
--- /dev/null
+++ b/tests/test_sns_parse_timeline_xml_sanitization.py
@@ -0,0 +1,72 @@
+import sys
+import unittest
+from pathlib import Path
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+
+from wechat_decrypt_tool.routers.sns import _parse_timeline_xml # noqa: E402 pylint: disable=wrong-import-position
+
+
+class TestSnsParseTimelineXmlSanitization(unittest.TestCase):
+ def test_external_share_type5_parses_with_raw_ampersands(self):
+ xml = (
+ "
"
+ "wxid_2az0agby0baa22"
+ "1771500773"
+ "让我看看它和suno有什么区别"
+ ""
+ "5"
+ "Google Gemini 上线了AI音乐生成功能"
+ "https://b23.tv/lVa1lpm?share_medium=android&share_source=weixin_moments"
+ ""
+ "哔哩哔哩"
+ ""
+ "4m1"
+ "https://b23.tv/lVa1lpm?share_medium=android&share_source=weixin_moments"
+ "http://shmmsns.qpic.cn/mmsns/test/150"
+ ""
+ ""
+ )
+
+ out = _parse_timeline_xml(xml, "fallback")
+ self.assertEqual(out.get("type"), 5)
+ self.assertEqual(out.get("title"), "Google Gemini 上线了AI音乐生成功能")
+ self.assertEqual(out.get("sourceName"), "哔哩哔哩")
+ self.assertIn("&share_source=weixin_moments", str(out.get("contentUrl") or ""))
+ self.assertTrue(isinstance(out.get("media"), list) and len(out.get("media") or []) == 1)
+
+ def test_external_share_type42_parses_with_raw_ampersands(self):
+ xml = (
+ "
"
+ "wxid_all914izz7w222"
+ "1771504315"
+ "2026 恭喜自己 也恭喜你"
+ ""
+ "42"
+ "恭喜自己"
+ "成龙/周华健"
+ "https://i.y.qq.com/v8/playsong.html?platform=11&appshare=android_qq"
+ ""
+ "QQ音乐"
+ ""
+ "5m2"
+ "http://c6.y.qq.com/rsc/fcgi-bin/fcg_pyq_play.fcg?songmid=002kNnX90keHGW&fromtag=46"
+ "http://szmmsns.qpic.cn/mmsns/test/0"
+ ""
+ ""
+ )
+
+ out = _parse_timeline_xml(xml, "fallback")
+ self.assertEqual(out.get("type"), 42)
+ self.assertEqual(out.get("title"), "恭喜自己")
+ self.assertEqual(out.get("sourceName"), "QQ音乐")
+ self.assertIn("&appshare=android_qq", str(out.get("contentUrl") or ""))
+ self.assertTrue(isinstance(out.get("media"), list) and len(out.get("media") or []) == 1)
+
+
+if __name__ == "__main__":
+ unittest.main()
+
diff --git a/tests/test_sns_stage_server_timing.py b/tests/test_sns_stage_server_timing.py
new file mode 100644
index 0000000..1fd2a13
--- /dev/null
+++ b/tests/test_sns_stage_server_timing.py
@@ -0,0 +1,40 @@
+import sys
+import unittest
+from pathlib import Path
+
+from starlette.responses import Response
+
+
+ROOT = Path(__file__).resolve().parents[1]
+sys.path.insert(0, str(ROOT / "src"))
+
+
+from wechat_decrypt_tool.sns_stage_timing import add_sns_stage_timing_headers # noqa: E402 pylint: disable=wrong-import-position
+
+
+class TestSnsStageServerTiming(unittest.TestCase):
+ def test_injects_server_timing_when_missing(self):
+ resp = Response(content=b"ok")
+ add_sns_stage_timing_headers(resp.headers, source="proxy")
+ st = str(resp.headers.get("Server-Timing") or "")
+ self.assertIn("sns_source_", st)
+ self.assertIn("proxy", st)
+
+ def test_appends_when_upstream_server_timing_exists(self):
+ resp = Response(content=b"ok")
+ resp.headers["Server-Timing"] = "edge;dur=1"
+ add_sns_stage_timing_headers(resp.headers, source="proxy")
+ st = str(resp.headers.get("Server-Timing") or "")
+ self.assertIn("edge;dur=1", st)
+ self.assertIn("sns_source_", st)
+
+ def test_does_not_duplicate_existing_sns_source_metric(self):
+ resp = Response(content=b"ok")
+ resp.headers["Server-Timing"] = 'sns_source_proxy;dur=0;desc="proxy"'
+ add_sns_stage_timing_headers(resp.headers, source="proxy")
+ st = str(resp.headers.get("Server-Timing") or "")
+ self.assertEqual(st.count("sns_source_"), 1)
+
+
+if __name__ == "__main__":
+ unittest.main()