improvement(sns): 精简朋友圈媒体链路并移除手动挑图入口

- 移除 Server-Timing/X-SNS 阶段透出与前端媒体来源标记
- 移除本地候选图与手动匹配接口及预览交互
- /api/sns/media 默认仅保留远程下载解密路径,未命中直接返回 404
- 将导出所需的本地缓存匹配逻辑下沉到 sns_export_service
This commit is contained in:
2977094657
2026-04-16 01:19:09 +08:00
Unverified
parent 94a8b124c3
commit 7870cd54fc
8 changed files with 343 additions and 1267 deletions
-26
View File
@@ -357,30 +357,6 @@ export const useApi = () => {
return await request(url)
}
// 朋友圈图片本地缓存候选(用于错图时手动选择)
const listSnsMediaCandidates = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.create_time != null) query.set('create_time', String(params.create_time))
if (params && params.width != null) query.set('width', String(params.width))
if (params && params.height != null) query.set('height', String(params.height))
if (params && params.limit != null) query.set('limit', String(params.limit))
if (params && params.offset != null) query.set('offset', String(params.offset))
const url = '/sns/media_candidates' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
// 保存朋友圈图片手动匹配结果(本机)
const saveSnsMediaPicks = async (data = {}) => {
return await request('/sns/media_picks', {
method: 'POST',
body: {
account: data.account || null,
picks: (data && data.picks && typeof data.picks === 'object' && !Array.isArray(data.picks)) ? data.picks : {}
}
})
}
const openChatMediaFolder = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
@@ -667,8 +643,6 @@ export const useApi = () => {
resolveAppMsg,
listSnsTimeline,
listSnsUsers,
listSnsMediaCandidates,
saveSnsMediaPicks,
openChatMediaFolder,
downloadChatEmoji,
saveMediaKeys,
+15 -382
View File
@@ -114,20 +114,7 @@
:src="getSnsMediaUrl(activeCover, activeCover.media[0], 0, activeCover.media[0].url)"
class="w-full h-full object-cover"
alt="朋友圈封面"
@load="onCoverMediaLoaded(activeCover, $event)"
/>
<div
v-if="snsMediaStageLabel(snsCoverStageKey(activeCover)) || snsMediaStageLoading[snsCoverStageKey(activeCover)]"
class="absolute top-3 left-3 z-20 opacity-0 group-hover:opacity-100 transition-opacity pointer-events-none"
>
<div
class="text-[10px] px-2 py-0.5 rounded backdrop-blur-sm shadow-sm"
:class="snsMediaStageBadgeColorClass(snsCoverStageKey(activeCover))"
:title="snsMediaStageBadgeTitle(snsCoverStageKey(activeCover))"
>
{{ snsMediaStageLabel(snsCoverStageKey(activeCover)) || '识别中' }}
</div>
</div>
<div
v-if="(activeCover && Number(activeCover.createTime || 0)) || (covers && covers.length > 1)"
@@ -347,7 +334,7 @@
loop
muted
playsinline
@loadeddata="onLocalVideoLoaded(post.id, post.media[0].id); onSnsMediaLoaded(post, post.media[0], 0)"
@loadeddata="onLocalVideoLoaded(post.id, post.media[0].id)"
@error="onLocalVideoError(post.id, post.media[0].id)"
></video>
@@ -361,7 +348,6 @@
loop
:muted="livePhotoHoverMuted"
playsinline
@loadeddata="onSnsMediaLoaded(post, post.media[0], 0)"
@error="onLivePhotoVideoError(post.id, 0)"
></video>
@@ -372,22 +358,8 @@
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@load="onSnsMediaLoaded(post, post.media[0], 0, $event)"
@error="onMediaError(post.id, 0)"
/>
<div
v-if="snsMediaStageLabel(snsMediaStageKey(post.id, 0, 'thumb')) || snsMediaStageLoading[snsMediaStageKey(post.id, 0, 'thumb')]"
class="absolute top-2 left-2 z-20 opacity-0 group-hover:opacity-100 transition-opacity pointer-events-none"
>
<div
class="text-[10px] px-2 py-0.5 rounded backdrop-blur-sm shadow-sm"
:class="snsMediaStageBadgeColorClass(snsMediaStageKey(post.id, 0, 'thumb'))"
:title="snsMediaStageBadgeTitle(snsMediaStageKey(post.id, 0, 'thumb'))"
>
{{ snsMediaStageLabel(snsMediaStageKey(post.id, 0, 'thumb')) || '识别中' }}
</div>
</div>
<div
v-if="Number(post.media[0]?.type || 0) === 6 && !isLocalVideoLoaded(post.id, post.media[0].id)"
class="absolute inset-0 flex items-center justify-center pointer-events-none"
@@ -451,7 +423,7 @@
loop
muted
playsinline
@loadeddata="onLocalVideoLoaded(post.id, m.id); onSnsMediaLoaded(post, m, idx)"
@loadeddata="onLocalVideoLoaded(post.id, m.id)"
@error="onLocalVideoError(post.id, m.id)"
></video>
<video
@@ -464,7 +436,6 @@
loop
:muted="livePhotoHoverMuted"
playsinline
@loadeddata="onSnsMediaLoaded(post, m, idx)"
@error="onLivePhotoVideoError(post.id, idx)"
></video>
<img
@@ -474,22 +445,8 @@
alt=""
loading="lazy"
referrerpolicy="no-referrer"
@load="onSnsMediaLoaded(post, m, idx, $event)"
@error="onMediaError(post.id, idx)"
/>
<div
v-if="snsMediaStageLabel(snsMediaStageKey(post.id, idx, 'thumb')) || snsMediaStageLoading[snsMediaStageKey(post.id, idx, 'thumb')]"
class="absolute top-1 left-1 z-20 opacity-0 group-hover:opacity-100 transition-opacity pointer-events-none"
>
<div
class="text-[10px] px-1.5 py-0.5 rounded backdrop-blur-sm shadow-sm"
:class="snsMediaStageBadgeColorClass(snsMediaStageKey(post.id, idx, 'thumb'))"
:title="snsMediaStageBadgeTitle(snsMediaStageKey(post.id, idx, 'thumb'))"
>
{{ snsMediaStageLabel(snsMediaStageKey(post.id, idx, 'thumb')) || '识别中' }}
</div>
</div>
<!-- 不知道微信朋友圈可不可以发多视频先这样写吧-->
<span v-else class="text-[10px] text-gray-400">图片失败</span>
@@ -630,7 +587,7 @@
</button>
</div>
<!-- 图片预览弹窗 + 候选匹配选择 -->
<!-- 图片预览弹窗 -->
<div
v-if="previewCtx"
class="fixed inset-0 z-[60] bg-black/90 flex items-center justify-center"
@@ -711,17 +668,6 @@ import { reportServerErrorFromError } from '~/lib/server-error-logging'
useHead({ title: '朋友圈 - 微信数据分析助手' })
// Nuxt dev mode can load hundreds of module resources, quickly filling the default
// ResourceTiming buffer (150). If it overflows, `<img>` requests may not produce
// entries, making Server-Timing based stage detection always fall back to "unknown".
if (process.client) {
try {
if (typeof performance !== 'undefined' && performance?.setResourceTimingBufferSize) {
performance.setResourceTimingBufferSize(5000)
}
} catch {}
}
const api = useApi()
const chatAccounts = useChatAccountsStore()
@@ -937,186 +883,6 @@ const onMediaError = (postId, idx) => {
mediaErrors.value[mediaErrorKey(postId, idx)] = true
}
// Hover badge: show which SNS media pipeline stage produced the image.
// Backend provides `X-SNS-Source` (and optional `X-SNS-Hit-Type`, `X-SNS-X-Enc`) on `/api/sns/media` responses.
const snsMediaStage = ref({}) // stageKey -> { source, hitType, xEnc }
const snsMediaStageLoading = ref({}) // stageKey -> boolean
const snsMediaStageInFlight = new Set()
const isSnsMediaApiUrl = (url) => {
const u = String(url || '').trim()
return !!u && u.includes('/api/sns/media')
}
const snsMediaStageKey = (postId, idx, kind = 'thumb') => {
const acc = String(selectedAccount.value || '').trim()
const pid = String(postId || '').trim()
return `sns:${acc}:${pid}:${String(Number(idx) || 0)}:${String(kind || 'thumb')}`
}
const snsCoverStageKey = (cover) => {
const acc = String(selectedAccount.value || '').trim()
const cid = String(cover?.id || cover?.tid || cover?.createTime || '').trim()
return `sns:${acc}:cover:${cid || '0'}`
}
const snsMediaStageLabel = (key) => {
const k = String(key || '').trim()
if (!k) return ''
const info = snsMediaStage.value[k]
if (!info || typeof info !== 'object') return ''
const source = String(info?.source || '').trim()
const hitType = String(info?.hitType || '').trim()
if (source === 'remote-cache') return '远程缓存'
if (source === 'remote-decrypt') return '远程解密'
if (source === 'remote') return '远程直出'
if (source === 'deterministic-hash') return hitType ? `本地命中(${hitType})` : '本地命中'
if (source === 'manual-pick') return '手动匹配'
if (source === 'local-heuristic') return '本地兜底'
if (source === 'local-heuristic-next') return '本地兜底(跳过)'
if (source === 'browser-cache') return '浏览器缓存'
if (source === 'bkg-cover') return '封面缓存'
if (source === 'proxy') return '远程代理'
if (source === 'unknown') return '未知'
if (source === 'error') return '获取失败'
return source || '未知'
}
const snsMediaStageBadgeColorClass = (key) => {
const k = String(key || '').trim()
const source = String(snsMediaStage.value?.[k]?.source || '').trim()
if (source.startsWith('remote')) return 'bg-emerald-600/85 text-white'
if (source === 'deterministic-hash') return 'bg-sky-600/85 text-white'
if (source.startsWith('local')) return 'bg-blue-600/85 text-white'
if (source === 'manual-pick') return 'bg-amber-600/90 text-white'
if (source === 'browser-cache') return 'bg-slate-600/85 text-white'
if (source === 'proxy') return 'bg-fuchsia-600/85 text-white'
if (source === 'bkg-cover') return 'bg-indigo-600/85 text-white'
if (source === 'error') return 'bg-red-600/85 text-white'
return 'bg-black/50 text-white'
}
const snsMediaStageBadgeTitle = (key) => {
const k = String(key || '').trim()
const info = snsMediaStage.value?.[k]
if (!info || typeof info !== 'object') return ''
const source = String(info?.source || '').trim()
const hitType = String(info?.hitType || '').trim()
const xEnc = String(info?.xEnc || '').trim()
const parts = []
if (source) parts.push(`source=${source}`)
if (hitType) parts.push(`hit=${hitType}`)
if (xEnc) parts.push(`x-enc=${xEnc}`)
return parts.join(' · ')
}
const readSnsStageFromResourceTiming = (url) => {
try {
if (!process.client) return null
if (typeof performance === 'undefined' || !performance?.getEntriesByName) return null
const u = String(url || '').trim()
if (!u) return null
const entries = performance.getEntriesByName(u) || []
const latest = [...entries].reverse().find((e) => String(e?.entryType || '') === 'resource')
if (!latest) return null
// Prefer backend-injected stage info from `Server-Timing`.
const st = latest?.serverTiming
if (Array.isArray(st) && st.length > 0) {
let source = ''
let hitType = ''
let xEnc = ''
for (const item of st) {
const name = String(item?.name || '').trim()
const desc = String(item?.description || '').trim()
if (name === 'sns_source' && desc) source = desc
else if (name.startsWith('sns_source_')) source = name.slice('sns_source_'.length) || desc
else if (name === 'sns_hit' && desc) hitType = desc
else if (name.startsWith('sns_hit_')) hitType = name.slice('sns_hit_'.length) || desc
else if (name === 'sns_xenc' && desc) xEnc = desc
else if (name.startsWith('sns_xenc_')) xEnc = name.slice('sns_xenc_'.length) || desc
}
if (source) return { source, hitType, xEnc }
}
// When DevTools shows "(from disk cache)", browsers may not expose `serverTiming` at all.
// Best-effort: infer a browser cache hit from ResourceTiming sizes.
const transferSize = Number(latest?.transferSize)
if (Number.isFinite(transferSize) && transferSize === 0) {
return { source: 'browser-cache', hitType: 'transfer=0', xEnc: '' }
}
return null
} catch {
return null
}
}
const ensureSnsMediaStage = async (key, url) => {
if (!process.client) return
const k = String(key || '').trim()
const u = String(url || '').trim()
if (!k || !u) return
if (!isSnsMediaApiUrl(u)) return
const existingSource = String(snsMediaStage.value?.[k]?.source || '').trim()
if (existingSource && existingSource !== 'unknown') return
if (snsMediaStageLoading.value[k]) return
if (snsMediaStageInFlight.has(k)) return
snsMediaStageInFlight.add(k)
snsMediaStageLoading.value[k] = true
try {
// Prefer stage info from the *same* request that loaded the <img>/<video> element
// (via Server-Timing + Timing-Allow-Origin), to avoid a non-idempotent extra fetch.
let info = null
for (const delayMs of [0, 0, 16, 50, 120, 250, 500]) {
if (delayMs) await new Promise((resolve) => setTimeout(resolve, delayMs))
info = readSnsStageFromResourceTiming(u)
if (info) break
}
snsMediaStage.value[k] = info || { source: 'unknown', hitType: '', xEnc: '' }
} finally {
snsMediaStageLoading.value[k] = false
snsMediaStageInFlight.delete(k)
}
}
const eventCurrentSrc = (ev) => {
try {
const el = ev?.target || ev?.currentTarget
return String(el?.currentSrc || el?.src || '').trim()
} catch {
return ''
}
}
const onSnsMediaLoaded = (post, m, idx = 0, ev) => {
const pid = String(post?.id || '').trim()
if (!pid) return
const key = snsMediaStageKey(pid, idx, 'thumb')
const u = eventCurrentSrc(ev) || getMediaThumbSrc(post, m, idx)
ensureSnsMediaStage(key, u)
}
const onCoverMediaLoaded = (cover, ev) => {
const c = cover || activeCover.value
if (!c || !Array.isArray(c.media) || c.media.length <= 0) return
const u = eventCurrentSrc(ev) || getSnsMediaUrl(c, c.media[0], 0, c.media[0].url)
ensureSnsMediaStage(snsCoverStageKey(c), u)
}
watch([selectedAccount, snsUseCache], () => {
snsMediaStage.value = {}
snsMediaStageLoading.value = {}
snsMediaStageInFlight.clear()
})
// Article card thumbnail is best-effort: try SNS media thumb first, then fall back to
// extracting the cover from mp.weixin.qq.com HTML. Track per-post stage so we don't
// keep showing a broken <img>.
@@ -1504,36 +1270,6 @@ const upgradeTencentHttps = (u) => {
return raw
}
const normalizeHex32 = (value) => {
const raw = String(value ?? '').trim()
if (!raw) return ''
const hex = raw.replace(/[^0-9a-fA-F]/g, '').toLowerCase()
return hex.length >= 32 ? hex.slice(0, 32) : ''
}
const mediaSizeKey = (m) => {
const t = String(m?.type ?? '')
const w = String(m?.size?.width || m?.size?.w || '').trim()
const h = String(m?.size?.height || m?.size?.h || '').trim()
if (!w || !h) return ''
return `${t}:${w}x${h}`
}
// Our backend matches SNS cache images by width/height and then uses `idx` to
// pick the N-th match. `idx` must be the index within the same size-group,
// not the global media index in the post, otherwise images can shift.
const mediaSizeGroupIndex = (post, m, idx) => {
const list = Array.isArray(post?.media) ? post.media : []
const key = mediaSizeKey(m)
const i0 = Number(idx) || 0
if (!key || i0 <= 0) return i0
let count = 0
for (let i = 0; i < i0; i++) {
if (mediaSizeKey(list[i]) === key) count++
}
return count
}
const getSnsMediaUrl = (post, m, idx, rawUrl) => {
const raw = upgradeTencentHttps(String(rawUrl || '').trim())
if (!raw) return ''
@@ -1550,36 +1286,12 @@ const getSnsMediaUrl = (post, m, idx, rawUrl) => {
const host = new URL(raw).hostname.toLowerCase()
if (host.endsWith('.qpic.cn') || host.endsWith('.qlogo.cn') || host.endsWith('.tc.qq.com')) {
const acc = String(selectedAccount.value || '').trim()
const ct = String(post?.createTime || '').trim()
const w = String(m?.size?.width || m?.size?.w || '').trim()
const h = String(m?.size?.height || m?.size?.h || '').trim()
const ts = String(m?.size?.totalSize || m?.size?.total_size || m?.size?.total || '').trim()
const sizeIdx = mediaSizeGroupIndex(post, m, idx)
// const pick = getSnsMediaOverridePick(post?.id, idx)
let md5 = normalizeHex32(m?.urlAttrs?.md5 || m?.thumbAttrs?.md5 || m?.urlAttrs?.MD5 || m?.thumbAttrs?.MD5)
if (!md5) {
const match = /[?&]md5=([0-9a-fA-F]{16,32})/.exec(raw)
if (match?.[1]) md5 = normalizeHex32(match[1])
}
// Match WeFlow's image pipeline: use a stable URL + key/token and let the
// backend handle cache-first remote fetch/decrypt. Avoid attaching legacy
// local-match metadata to the main image path so browser caching can reuse
// the same request URL for list + preview.
const parts = new URLSearchParams()
if (acc) parts.set('account', acc)
if (ct) parts.set('create_time', ct)
if (w) parts.set('width', w)
if (h) parts.set('height', h)
if (/^\d+$/.test(ts)) parts.set('total_size', ts)
parts.set('idx', String(Number(sizeIdx) || 0))
const pid = String(post?.id || '').trim()
if (pid) parts.set('post_id', pid)
const mid = String(m?.id || '').trim()
if (mid) parts.set('media_id', mid)
const postType = String(post?.type || '1').trim()
if (postType) parts.set('post_type', postType)
const mediaType = String(m?.type || '2').trim()
if (mediaType) parts.set('media_type', mediaType)
const token = String(m?.token || m?.urlAttrs?.token || m?.thumbAttrs?.token || '').trim()
if (token) parts.set('token', token)
@@ -1589,10 +1301,8 @@ const getSnsMediaUrl = (post, m, idx, rawUrl) => {
parts.set('use_cache', snsUseCache.value ? '1' : '0')
// When cache is disabled, bust browser caching so backend really downloads+decrypts each time.
if (!snsUseCache.value) parts.set('_t', String(Date.now()))
if (md5) parts.set('md5', md5)
// Bump this when changing backend matching logic to avoid stale cached wrong images.
parts.set('v', '9')
// Bump this when changing the WeFlow-aligned image pipeline to avoid stale browser caches.
parts.set('v', '10')
parts.set('url', raw)
return `${apiBase}/sns/media?${parts.toString()}`
}
@@ -1607,7 +1317,9 @@ const getMediaThumbSrc = (post, m, idx = 0) => {
}
const getMediaPreviewSrc = (post, m, idx = 0) => {
return getSnsMediaUrl(post, m, idx, m?.url || m?.thumb)
// Align with WeFlow: preview reuses the same prepared image source as the grid
// instead of issuing a second "original image" request on click.
return getMediaThumbSrc(post, m, idx)
}
@@ -1755,26 +1467,8 @@ const getLivePhotoVideoSrc = (post, m, idx = 0) => {
return `${apiBase}/sns/video_remote?${parts.toString()}`
}
// 图片预览 + 候选匹配选择
// 图片预览
const previewCtx = ref(null) // { post, media, idx }
const previewCandidatesOpen = ref(false)
const previewCandidates = reactive({
loading: false,
loadingMore: false,
error: '',
items: [],
count: 0,
hasMore: false
})
const resetPreviewCandidates = () => {
previewCandidates.loading = false
previewCandidates.loadingMore = false
previewCandidates.error = ''
previewCandidates.items = []
previewCandidates.count = 0
previewCandidates.hasMore = false
}
const previewSrc = computed(() => {
const ctx = previewCtx.value
@@ -1897,60 +1591,7 @@ watch(
}
)
const loadPreviewCandidates = async ({ reset }) => {
const ctx = previewCtx.value
if (!ctx) return
const acc = String(selectedAccount.value || '').trim()
if (!acc) return
const toInt = (v) => Number.parseInt(String(v || '').trim(), 10) || 0
const w = toInt(ctx.media?.size?.width || ctx.media?.size?.w)
const h = toInt(ctx.media?.size?.height || ctx.media?.size?.h)
// Without dimensions, local matching is too noisy; keep it empty.
if (w <= 0 || h <= 0) {
resetPreviewCandidates()
return
}
const limit = 24
const offset = reset ? 0 : (previewCandidates.items?.length || 0)
if (reset) {
resetPreviewCandidates()
previewCandidates.loading = true
} else {
previewCandidates.loadingMore = true
}
previewCandidates.error = ''
try {
const resp = await api.listSnsMediaCandidates({
account: acc,
create_time: Number(ctx.post?.createTime || 0),
width: w,
height: h,
limit,
offset
})
const items = Array.isArray(resp?.items) ? resp.items : []
previewCandidates.count = Number(resp?.count || 0)
previewCandidates.hasMore = !!resp?.hasMore
if (reset) {
previewCandidates.items = items
} else {
previewCandidates.items = [...(previewCandidates.items || []), ...items]
}
} catch (e) {
previewCandidates.error = e?.message || '加载候选失败'
} finally {
previewCandidates.loading = false
previewCandidates.loadingMore = false
}
}
const openImagePreview = async (post, m, idx = 0) => {
const openImagePreview = (post, m, idx = 0) => {
if (!process.client) return
resetPreviewVideo()
// Stop any background hover-playing live photo when opening the preview.
@@ -1965,11 +1606,7 @@ const openImagePreview = async (post, m, idx = 0) => {
}
}
previewCtx.value = { post, media: m, idx: Number(idx) || 0 }
previewCandidatesOpen.value = false
resetPreviewCandidates()
document.body.style.overflow = 'hidden'
// Load the first page so we can show the candidate count in the header.
await loadPreviewCandidates({ reset: true })
}
const openVideoPreview = (post, m, idx = 0) => {
@@ -1987,8 +1624,6 @@ const openVideoPreview = (post, m, idx = 0) => {
else previewVideoError.value = '视频地址缺失。'
previewCtx.value = { post, media: m, idx: Number(idx) || 0 }
previewCandidatesOpen.value = false
resetPreviewCandidates()
document.body.style.overflow = 'hidden'
}
@@ -2021,8 +1656,6 @@ const onPreviewVideoError = () => {
const closeImagePreview = () => {
if (!process.client) return
previewCtx.value = null
previewCandidatesOpen.value = false
resetPreviewCandidates()
resetPreviewVideo()
document.body.style.overflow = ''
}
@@ -2038,7 +1671,7 @@ const onMediaClick = (post, m, idx = 0) => {
}
// 图片:打开预览
void openImagePreview(post, m, idx)
openImagePreview(post, m, idx)
}
const formatRelativeTime = (tsSeconds) => {
-23
View File
@@ -35,7 +35,6 @@ from .routers.sns_export import router as _sns_export_router
from .routers.wechat_detection import router as _wechat_detection_router
from .routers.wrapped import router as _wrapped_router
from .request_logging import log_server_errors_middleware
from .sns_stage_timing import add_sns_stage_timing_headers
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
from .routers.biz import router as _biz_router
from .routers.system import router as _system_router
@@ -56,31 +55,9 @@ app.add_middleware(
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-SNS-Source", "X-SNS-Hit-Type", "X-SNS-X-Enc"],
)
@app.middleware("http")
async def _add_sns_stage_timing_headers(request: Request, call_next):
"""Expose SNS stage metadata to the frontend without extra requests.
`<img>` elements can't read response headers, but browsers can surface `Server-Timing`
via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` is set.
"""
response = await call_next(request)
try:
add_sns_stage_timing_headers(
response.headers,
source=str(response.headers.get("X-SNS-Source") or ""),
hit_type=str(response.headers.get("X-SNS-Hit-Type") or ""),
x_enc=str(response.headers.get("X-SNS-X-Enc") or ""),
)
except Exception:
pass
return response
@app.middleware("http")
async def _log_server_errors(request: Request, call_next):
return await log_server_errors_middleware(request_logger, request, call_next)
+3 -729
View File
@@ -1,4 +1,3 @@
from bisect import bisect_left, bisect_right
from functools import lru_cache
from pathlib import Path
import os
@@ -20,7 +19,6 @@ from starlette.background import BackgroundTask
from fastapi import APIRouter, HTTPException
from fastapi.responses import Response, FileResponse # 返回视频文件
from pydantic import BaseModel, Field
from ..chat_helpers import _load_contact_rows, _pick_display_name, _resolve_account_dir
from ..logging_config import get_logger
@@ -44,8 +42,6 @@ logger = get_logger(__name__)
router = APIRouter(route_class=PathFixRoute)
SNS_MEDIA_PICKS_FILE = "_sns_media_picks.json"
_SNS_VIDEO_KEY_RE = re.compile(r'<enc\s+key="(\d+)"', flags=re.IGNORECASE)
_MP_BIZ_RE = re.compile(r"__biz=([A-Za-z0-9_=+-]+)")
_ZSTD_MAGIC = b"\x28\xb5\x2f\xfd"
@@ -860,233 +856,6 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
return out
def _image_size_from_bytes(data: bytes, media_type: str) -> tuple[int, int]:
mt = str(media_type or "").lower()
if mt == "image/png":
# PNG IHDR width/height are stored at byte offsets 16..24
if len(data) >= 24 and data.startswith(b"\x89PNG\r\n\x1a\n"):
try:
w = int.from_bytes(data[16:20], "big")
h = int.from_bytes(data[20:24], "big")
return w, h
except Exception:
return 0, 0
return 0, 0
if mt in {"image/jpeg", "image/jpg"}:
# Minimal JPEG SOF parser.
if len(data) < 4 or (not data.startswith(b"\xFF\xD8")):
return 0, 0
i = 2
while i + 3 < len(data):
if data[i] != 0xFF:
i += 1
continue
# Skip padding 0xFF bytes.
while i < len(data) and data[i] == 0xFF:
i += 1
if i >= len(data):
break
marker = data[i]
i += 1
# Markers without a segment length.
if marker in (0xD8, 0xD9):
continue
if marker == 0xDA: # Start of scan.
break
if i + 1 >= len(data):
break
seg_len = (data[i] << 8) + data[i + 1]
i += 2
if seg_len < 2:
break
# SOF markers which contain width/height.
if marker in {
0xC0,
0xC1,
0xC2,
0xC3,
0xC5,
0xC6,
0xC7,
0xC9,
0xCA,
0xCB,
0xCD,
0xCE,
0xCF,
}:
# segment: [precision(1), height(2), width(2), ...]
if i + 4 < len(data):
try:
h = (data[i + 1] << 8) + data[i + 2]
w = (data[i + 3] << 8) + data[i + 4]
return w, h
except Exception:
return 0, 0
i += seg_len - 2
return 0, 0
return 0, 0
@lru_cache(maxsize=16)
def _sns_img_time_index(wxid_dir_str: str) -> tuple[list[float], list[str]]:
"""Build a (mtime_sorted, path_sorted) index for local Moments cache images.
WeChat stores encrypted SNS cache images under:
`{wxid_dir}/cache/YYYY-MM/Sns/Img/<2hex>/<30hex>`
"""
wxid_dir = Path(str(wxid_dir_str or "").strip())
out: list[tuple[float, str]] = []
cache_root = wxid_dir / "cache"
try:
month_dirs = [p for p in cache_root.iterdir() if p.is_dir()]
except Exception:
month_dirs = []
for mdir in month_dirs:
img_root = mdir / "Sns" / "Img"
try:
if not (img_root.exists() and img_root.is_dir()):
continue
except Exception:
continue
# The Img dir uses a 2-level layout; keep this tight (no global rglob).
try:
for sub in img_root.iterdir():
if not sub.is_dir():
continue
for f in sub.iterdir():
try:
if not f.is_file():
continue
st = f.stat()
out.append((float(st.st_mtime), str(f)))
except Exception:
continue
except Exception:
continue
out.sort(key=lambda x: x[0])
mtimes = [m for m, _p in out]
paths = [_p for _m, _p in out]
return mtimes, paths
def _normalize_hex32(value: Optional[str]) -> str:
"""Return the first 32 hex chars from value, or '' if not present."""
s = str(value or "").strip().lower()
if not s:
return ""
# Keep only hex chars. Some attrs may contain separators or be wrapped.
s = re.sub(r"[^0-9a-f]", "", s)
if len(s) < 32:
return ""
return s[:32]
def _sns_media_picks_path(account_dir: Path) -> Path:
return account_dir / SNS_MEDIA_PICKS_FILE
def _sns_post_id_from_media_key(media_key: str) -> str:
# Frontend stores picks under `${postId}:${idx}`.
s = str(media_key or "").strip()
if not s:
return ""
return s.split(":", 1)[0].strip()
@lru_cache(maxsize=32)
def _load_sns_media_picks_cached(path_str: str, mtime: float) -> dict[str, str]:
p = Path(str(path_str or "").strip())
try:
raw = p.read_text(encoding="utf-8")
except Exception:
return {}
try:
obj = json.loads(raw)
except Exception:
return {}
picks_obj = obj.get("picks") if isinstance(obj, dict) else None
if not isinstance(picks_obj, dict):
return {}
out: dict[str, str] = {}
for k, v in picks_obj.items():
mk = str(k or "").strip()
if not mk:
continue
ck = _normalize_hex32(str(v or ""))
if not ck:
continue
out[mk] = ck
return out
def _load_sns_media_picks(account_dir: Path) -> dict[str, str]:
p = _sns_media_picks_path(account_dir)
try:
st = p.stat()
mtime = float(st.st_mtime)
except Exception:
mtime = 0.0
return _load_sns_media_picks_cached(str(p), mtime)
def _save_sns_media_picks(account_dir: Path, picks: dict[str, str]) -> int:
# Normalize + keep it stable for easier diff/debugging.
out: dict[str, str] = {}
for k, v in (picks or {}).items():
mk = str(k or "").strip()
if not mk:
continue
ck = _normalize_hex32(str(v or ""))
if not ck:
continue
out[mk] = ck
try:
payload = {"updated_at": int(time.time()), "picks": dict(sorted(out.items(), key=lambda x: x[0]))}
_sns_media_picks_path(account_dir).write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception:
pass
try:
_load_sns_media_picks_cached.cache_clear()
except Exception:
pass
return len(out)
@lru_cache(maxsize=16)
def _sns_img_roots(wxid_dir_str: str) -> tuple[str, ...]:
"""List all month cache roots that contain `Sns/Img`."""
wxid_dir = Path(str(wxid_dir_str or "").strip())
cache_root = wxid_dir / "cache"
try:
month_dirs = [p for p in cache_root.iterdir() if p.is_dir()]
except Exception:
month_dirs = []
roots: list[str] = []
for mdir in month_dirs:
img_root = mdir / "Sns" / "Img"
try:
if img_root.exists() and img_root.is_dir():
roots.append(str(img_root))
except Exception:
continue
# Keep it stable (helps debugging and caching predictability).
roots.sort()
return tuple(roots)
@lru_cache(maxsize=16)
def _sns_video_roots(wxid_dir_str: str) -> tuple[str, ...]:
"""List all month cache roots that contain `Sns/Video`."""
@@ -1139,268 +908,6 @@ def _resolve_sns_cached_video_path(
return None
def _resolve_sns_cached_image_path_by_md5(
*,
wxid_dir: Path,
md5: str,
create_time: int,
) -> Optional[str]:
"""Try to resolve SNS cache image by md5-based cache path layout."""
md5_32 = _normalize_hex32(md5)
if not md5_32:
return None
sub = md5_32[:2]
rest = md5_32[2:]
roots = _sns_img_roots(str(wxid_dir))
if not roots:
return None
best: tuple[float, str] | None = None
for root_str in roots:
try:
p = Path(root_str) / sub / rest
if not (p.exists() and p.is_file()):
continue
# Prefer the cache file closest to the post create_time (if provided),
# otherwise pick the newest one.
st = p.stat()
if create_time > 0:
score = abs(float(st.st_mtime) - float(create_time))
else:
score = -float(st.st_mtime)
if best is None or score < best[0]:
best = (score, str(p))
except Exception:
continue
return best[1] if best else None
def _sns_cache_key_from_path(p: Path) -> str:
"""Return the 32-hex cache key for a SNS cache file path, or ''."""
try:
# cache/.../Sns/Img/<2hex>/<30hex>
key = f"{p.parent.name}{p.name}"
except Exception:
return ""
return _normalize_hex32(key)
def _generate_sns_cache_key(tid: str, media_id: str, media_type: int = 2) -> str:
"""
公式: md5(tid_mediaId_type)
Example: 14852422213384352392_14852422213963625090_2 -> 6d479249ca5a090fab5c42c79bc56b89
"""
if not tid or not media_id:
return ""
raw_key = f"{tid}_{media_id}_{media_type}"
try:
return hashlib.md5(raw_key.encode("utf-8")).hexdigest()
except Exception:
return ""
def _resolve_sns_cached_image_path_by_cache_key(
*,
wxid_dir: Path,
cache_key: str,
create_time: int,
) -> Optional[str]:
"""Resolve SNS cache image by `<2hex>/<30hex>` cache key."""
key32 = _normalize_hex32(cache_key)
if not key32:
return None
sub = key32[:2]
rest = key32[2:]
roots = _sns_img_roots(str(wxid_dir))
if not roots:
return None
best: tuple[float, str] | None = None
for root_str in roots:
try:
p = Path(root_str) / sub / rest
if not (p.exists() and p.is_file()):
continue
st = p.stat()
if create_time > 0:
score = abs(float(st.st_mtime) - float(create_time))
else:
score = -float(st.st_mtime)
if best is None or score < best[0]:
best = (score, str(p))
except Exception:
continue
return best[1] if best else None
@lru_cache(maxsize=4096)
def _resolve_sns_cached_image_path(
*,
account_dir_str: str,
create_time: int,
width: int,
height: int,
idx: int,
total_size: int = 0,
) -> Optional[str]:
"""Best-effort resolve a local cached SNS image for a post+media meta."""
total_size_i = int(total_size or 0)
must_match_size = width > 0 and height > 0
# Without size/total_size, time-only matching is too error-prone and can easily mix images.
if (not must_match_size) and total_size_i <= 0:
return None
account_dir = Path(str(account_dir_str or "").strip())
if not account_dir.exists():
return None
wxid_dir = _resolve_account_wxid_dir(account_dir)
if not wxid_dir:
return None
mtimes, paths = _sns_img_time_index(str(wxid_dir))
if not mtimes:
return None
create_time_i = int(create_time or 0)
if create_time_i > 0:
# We don't know when the image was cached (could be close to create_time, could be hours later).
# Use a generous window but keep it bounded for performance.
window = 72 * 3600 # 72h
lo = create_time_i - window
hi = create_time_i + window
l = bisect_left(mtimes, lo)
r = bisect_right(mtimes, hi)
if l >= r:
# Fallback: search the newest N files if time window has no hits.
l = max(0, len(mtimes) - 800)
r = len(mtimes)
else:
# Missing createTime: only probe the newest cache entries.
l = max(0, len(mtimes) - 800)
r = len(mtimes)
# Rank by time proximity to create_time (or by recency when createTime is missing).
candidates: list[tuple[float, str]] = []
for j in range(l, r):
try:
if create_time_i > 0:
candidates.append((abs(mtimes[j] - float(create_time_i)), paths[j]))
else:
candidates.append((-mtimes[j], paths[j]))
except Exception:
continue
candidates.sort(key=lambda x: x[0])
matched: list[tuple[int, float, str]] = []
# Limit the work per request.
max_probe = 2000 if (r - l) <= 2000 else 2000
for _diff, pstr in candidates[:max_probe]:
try:
p = Path(pstr)
payload, media_type = _read_and_maybe_decrypt_media(p, account_dir)
if not payload or not str(media_type or "").startswith("image/"):
continue
if must_match_size:
w0, h0 = _image_size_from_bytes(payload, str(media_type or ""))
if (w0, h0) != (width, height):
continue
size_diff = abs(len(payload) - total_size_i) if total_size_i > 0 else 0
# When totalSize is available, it tends to be a stronger discriminator than mtime.
matched.append((int(size_diff), float(_diff), pstr))
except Exception:
continue
if not matched:
return None
if must_match_size:
matched.sort(key=lambda x: (x[0], x[1], x[2]))
# If we have totalSize, treat it as a strong discriminator and always take the best match.
if total_size_i > 0:
return matched[0][2]
idx0 = max(0, int(idx or 0))
return matched[idx0][2] if idx0 < len(matched) else None
# No size: only return a best-effort match when totalSize is available.
if total_size_i > 0:
matched.sort(key=lambda x: (x[0], x[1], x[2]))
return matched[0][2]
return None
@lru_cache(maxsize=2048)
def _list_sns_cached_image_candidate_keys(
*,
account_dir_str: str,
create_time: int,
width: int,
height: int,
) -> tuple[str, ...]:
"""List local SNS cache candidates (as 32-hex cache keys) for a media item.
The ordering matches `_resolve_sns_cached_image_path()`'s scan order, so `idx`
is stable within the same (account, create_time, width, height) input.
"""
if create_time <= 0 or width <= 0 or height <= 0:
return tuple()
account_dir = Path(str(account_dir_str or "").strip())
if not account_dir.exists():
return tuple()
wxid_dir = _resolve_account_wxid_dir(account_dir)
if not wxid_dir:
return tuple()
mtimes, paths = _sns_img_time_index(str(wxid_dir))
if not mtimes:
return tuple()
window = 72 * 3600 # 72h
lo = create_time - window
hi = create_time + window
l = bisect_left(mtimes, lo)
r = bisect_right(mtimes, hi)
if l >= r:
l = max(0, len(mtimes) - 800)
r = len(mtimes)
candidates: list[tuple[float, str]] = []
for j in range(l, r):
try:
candidates.append((abs(mtimes[j] - float(create_time)), paths[j]))
except Exception:
continue
candidates.sort(key=lambda x: x[0])
max_probe = 2000 if (r - l) <= 2000 else 2000
out: list[str] = []
seen: set[str] = set()
for _diff, pstr in candidates[:max_probe]:
try:
p = Path(pstr)
payload, media_type = _read_and_maybe_decrypt_media(p, account_dir)
if not payload or not str(media_type or "").startswith("image/"):
continue
w0, h0 = _image_size_from_bytes(payload, str(media_type or ""))
if (w0, h0) != (width, height):
continue
key = _sns_cache_key_from_path(p)
if not key or key in seen:
continue
seen.add(key)
out.append(key)
except Exception:
continue
return tuple(out)
def _get_sns_covers(account_dir: Path, target_wxid: str, limit: int = 20) -> list[dict[str, Any]]:
"""无论多古老,强行揪出用户的朋友圈封面历史 (type=7)。
@@ -2575,47 +2082,6 @@ def list_sns_users(
return {"items": items, "count": len(items), "limit": lim}
class SnsMediaPicksSaveRequest(BaseModel):
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
picks: dict[str, str] = Field(default_factory=dict, description="手动匹配表:`${postId}:${idx}` -> 32hex cacheKey")
@router.post("/api/sns/media_picks", summary="保存朋友圈图片手动匹配结果(本机)")
async def save_sns_media_picks(request: SnsMediaPicksSaveRequest):
account_dir = _resolve_account_dir(request.account)
count = _save_sns_media_picks(account_dir, request.picks or {})
return {"status": "success", "count": int(count)}
@router.get("/api/sns/media_candidates", summary="获取朋友圈图片本地缓存候选")
def list_sns_media_candidates(
account: Optional[str] = None,
create_time: int = 0,
width: int = 0,
height: int = 0,
limit: int = 24,
offset: int = 0,
):
if limit <= 0:
raise HTTPException(status_code=400, detail="Invalid limit.")
if limit > 200:
limit = 200
if offset < 0:
offset = 0
account_dir = _resolve_account_dir(account)
keys = _list_sns_cached_image_candidate_keys(
account_dir_str=str(account_dir),
create_time=int(create_time or 0),
width=int(width or 0),
height=int(height or 0),
)
total = len(keys)
end = min(total, offset + limit)
items = [{"idx": i, "key": keys[i]} for i in range(offset, end)]
return {"count": total, "items": items, "hasMore": end < total, "limit": limit, "offset": offset}
def _is_allowed_sns_media_host(host: str) -> bool:
return _sns_media.is_allowed_sns_media_host(host)
@@ -2902,10 +2368,7 @@ async def _try_fetch_and_decrypt_sns_remote(
token: str,
use_cache: bool,
) -> Optional[Response]:
"""Try remote download+decrypt first (accurate when keys are present).
Returns a Response on success, or None on failure so caller can fall back to local cache matching.
"""
"""Try remote download+decrypt first (accurate when keys are present)."""
res = await _sns_media.try_fetch_and_decrypt_sns_image_remote(
account_dir=account_dir,
url=str(url or ""),
@@ -2918,34 +2381,18 @@ async def _try_fetch_and_decrypt_sns_remote(
resp = Response(content=res.payload, media_type=res.media_type)
resp.headers["Cache-Control"] = "public, max-age=86400" if use_cache else "no-store"
resp.headers["X-SNS-Source"] = str(res.source or "remote")
if res.x_enc:
resp.headers["X-SNS-X-Enc"] = str(res.x_enc)
return resp
@router.get("/api/sns/media", summary="获取朋友圈图片(下载解密优先)")
async def get_sns_media(
account: Optional[str] = None,
create_time: int = 0,
width: int = 0,
height: int = 0,
total_size: int = 0,
idx: int = 0,
avoid_picked: int = 0,
post_id: Optional[str] = None,
media_id: Optional[str] = None,
post_type: int = 1,
media_type: int = 2,
pick: Optional[str] = None,
md5: Optional[str] = None,
token: Optional[str] = None,
key: Optional[str] = None,
use_cache: int = 1,
url: Optional[str] = None,
):
account_dir = _resolve_account_dir(account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
try:
use_cache_flag = bool(int(use_cache or 1))
@@ -2963,179 +2410,7 @@ async def get_sns_media(
if remote_resp is not None:
return remote_resp
# Cache disabled: do not fall back to local cache heuristics.
if not use_cache_flag:
raise HTTPException(status_code=404, detail="SNS media not found (cache disabled).")
if wxid_dir and post_id and media_id:
if int(post_type) == 7:
raw_key = f"{post_id}_{media_id}_4" # 硬编码
md5_str = hashlib.md5(raw_key.encode("utf-8")).hexdigest()
bkg_path = wxid_dir / "business" / "sns" / "bkg" / md5_str[:2] / md5_str
if bkg_path.exists() and bkg_path.is_file():
print(f"===== Hit Bkg Cover ======= {bkg_path}")
return FileResponse(bkg_path, media_type="image/jpeg",
headers={"Cache-Control": "public, max-age=31536000", "X-SNS-Source": "bkg-cover"})
exact_match_path = None
hit_type = ""
# 尝试 1: 使用 post_type 计算 MD5
key_post = _generate_sns_cache_key(post_id, media_id, post_type)
exact_match_path = _resolve_sns_cached_image_path_by_cache_key(
wxid_dir=wxid_dir,
cache_key=key_post,
create_time=0
)
if exact_match_path:
hit_type = "post_type"
# 尝试 2: 如果没找到,并且 media_type 和 post_type 不一样,再试一次
if not exact_match_path and post_type != media_type:
key_media = _generate_sns_cache_key(post_id, media_id, media_type)
exact_match_path = _resolve_sns_cached_image_path_by_cache_key(
wxid_dir=wxid_dir,
cache_key=key_media,
create_time=0
)
if exact_match_path:
hit_type = "media_type"
# 如果通过这两种精确定位找到了文件,直接返回
if exact_match_path:
print(f"=====exact_match_path======={exact_match_path}============= (Hit: {hit_type})")
try:
payload, mtype = _read_and_maybe_decrypt_media(Path(exact_match_path), account_dir)
if payload and str(mtype or "").startswith("image/"):
resp = Response(content=payload, media_type=str(mtype or "image/jpeg"))
resp.headers["Cache-Control"] = "public, max-age=31536000"
resp.headers["X-SNS-Source"] = "deterministic-hash"
# 在 Header 里塞入到底是哪个 type 命中的,方便 F12 调试
resp.headers["X-SNS-Hit-Type"] = hit_type
return resp
except Exception:
pass
print("no exact match path, falling back...")
# 0) User-picked cache key override (stable across candidate ordering).
pick_key = _normalize_hex32(pick)
if pick_key:
wxid_dir = _resolve_account_wxid_dir(account_dir)
if wxid_dir:
local = _resolve_sns_cached_image_path_by_cache_key(
wxid_dir=wxid_dir,
cache_key=pick_key,
create_time=int(create_time or 0),
)
if local:
try:
payload, media_type = _read_and_maybe_decrypt_media(Path(local), account_dir)
if payload and str(media_type or "").startswith("image/"):
resp = Response(content=payload, media_type=str(media_type or "image/jpeg"))
resp.headers["Cache-Control"] = "public, max-age=86400"
resp.headers["X-SNS-Source"] = "manual-pick"
return resp
except Exception:
pass
# Optional: avoid using a cache image that was manually pinned to another post.
# Only applies when frontend enables this setting and the current media has no explicit `pick`.
try:
avoid_flag = bool(int(avoid_picked or 0))
except Exception:
avoid_flag = False
cur_post_id = str(post_id or "").strip()
reserved_other: set[str] = set()
if avoid_flag and (not pick_key) and cur_post_id:
picks_map = _load_sns_media_picks(account_dir)
for mk, ck in (picks_map or {}).items():
pid = _sns_post_id_from_media_key(mk)
if not pid or pid == cur_post_id:
continue
if ck:
reserved_other.add(str(ck))
# 1) Try local decrypted cache first (works for old posts where CDN URLs return placeholders).
local = _resolve_sns_cached_image_path(
account_dir_str=str(account_dir),
create_time=int(create_time or 0),
width=int(width or 0),
height=int(height or 0),
idx=max(0, int(idx or 0)),
total_size=int(total_size or 0),
)
if local and reserved_other:
try:
ck0 = _sns_cache_key_from_path(Path(local))
if ck0 and ck0 in reserved_other:
local = None
except Exception:
pass
if local:
try:
payload, media_type = _read_and_maybe_decrypt_media(Path(local), account_dir)
if payload and str(media_type or "").startswith("image/"):
resp = Response(content=payload, media_type=str(media_type or "image/jpeg"))
resp.headers["Cache-Control"] = "public, max-age=86400"
resp.headers["X-SNS-Source"] = "local-heuristic"
return resp
except Exception:
pass
# 1.5) If enabled, and the default match was skipped (or not found), pick the next candidate
# that is not reserved by a manual pick on another post.
if reserved_other and int(create_time or 0) > 0 and int(width or 0) > 0 and int(height or 0) > 0:
wxid_dir = _resolve_account_wxid_dir(account_dir)
if wxid_dir:
keys = _list_sns_cached_image_candidate_keys(
account_dir_str=str(account_dir),
create_time=int(create_time or 0),
width=int(width or 0),
height=int(height or 0),
)
base_idx = max(0, int(idx or 0))
for ck in keys[base_idx:]:
if not ck or ck in reserved_other:
continue
local2 = _resolve_sns_cached_image_path_by_cache_key(
wxid_dir=wxid_dir,
cache_key=str(ck),
create_time=int(create_time or 0),
)
if not local2:
continue
try:
payload, media_type = _read_and_maybe_decrypt_media(Path(local2), account_dir)
if payload and str(media_type or "").startswith("image/"):
resp = Response(content=payload, media_type=str(media_type or "image/jpeg"))
resp.headers["Cache-Control"] = "public, max-age=86400"
resp.headers["X-SNS-Source"] = "local-heuristic-next"
return resp
except Exception:
continue
# 2) Fallback to the remote URL (may still return a Tencent placeholder image).
u = str(url or "").strip()
if not u:
raise HTTPException(status_code=404, detail="SNS media not found.")
# Delay-import to avoid pulling requests machinery during normal timeline listing.
from .chat_media import proxy_image # pylint: disable=import-outside-toplevel
try:
resp0 = await proxy_image(u)
try:
resp0.headers["X-SNS-Source"] = "proxy"
except Exception:
pass
return resp0
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=502, detail=f"Fetch sns media failed: {e}")
raise HTTPException(status_code=404, detail="SNS media not found.")
@router.get("/api/sns/article_thumb", summary="提取公众号文章封面图")
@@ -3197,8 +2472,7 @@ async def get_sns_video_remote(
if path is None:
raise HTTPException(status_code=404, detail="SNS remote video not found.")
headers = {"X-SNS-Source": "remote-video-cache" if use_cache_flag else "remote-video"}
headers["Cache-Control"] = "public, max-age=86400" if use_cache_flag else "no-store"
headers = {"Cache-Control": "public, max-age=86400" if use_cache_flag else "no-store"}
if use_cache_flag:
return FileResponse(str(path), media_type="video/mp4", headers=headers)
+286 -4
View File
@@ -3,8 +3,10 @@ from __future__ import annotations
"""SNS (Moments) HTML export service (offline ZIP)."""
import asyncio
from bisect import bisect_left, bisect_right
from dataclasses import dataclass, field
from datetime import datetime
from functools import lru_cache
import hashlib
import html
import json
@@ -33,10 +35,6 @@ from .chat_export_service import ( # pylint: disable=protected-access
# Reuse SNS timeline/local cache helpers.
from .routers.sns import ( # pylint: disable=protected-access
_generate_sns_cache_key,
_resolve_sns_cached_image_path,
_resolve_sns_cached_image_path_by_cache_key,
_resolve_sns_cached_image_path_by_md5,
_resolve_sns_cached_video_path,
list_sns_timeline,
)
@@ -54,6 +52,7 @@ ExportStatus = Literal["queued", "running", "done", "error", "cancelled"]
ExportScope = Literal["selected", "all"]
_INVALID_PATH_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
_HEX_ONLY_RE = re.compile(r"[^0-9a-fA-F]+")
def _safe_name(s: str, max_len: int = 80) -> str:
@@ -101,6 +100,289 @@ def _mime_to_ext(mt: str) -> str:
}.get(m, ".bin")
def _normalize_hex32(value: Any) -> str:
raw = str(value or "").strip()
if not raw:
return ""
hex_only = _HEX_ONLY_RE.sub("", raw).lower()
return hex_only[:32] if len(hex_only) >= 32 else ""
def _image_size_from_bytes(data: bytes, media_type: str) -> tuple[int, int]:
mt = str(media_type or "").lower()
if mt == "image/png":
if len(data) >= 24 and data.startswith(b"\x89PNG\r\n\x1a\n"):
try:
w = int.from_bytes(data[16:20], "big")
h = int.from_bytes(data[20:24], "big")
return w, h
except Exception:
return 0, 0
return 0, 0
if mt in {"image/jpeg", "image/jpg"}:
if len(data) < 4 or not data.startswith(b"\xFF\xD8"):
return 0, 0
i = 2
while i + 3 < len(data):
if data[i] != 0xFF:
i += 1
continue
while i < len(data) and data[i] == 0xFF:
i += 1
if i >= len(data):
break
marker = data[i]
i += 1
if marker in (0xD8, 0xD9):
continue
if marker == 0xDA:
break
if i + 1 >= len(data):
break
seg_len = (data[i] << 8) + data[i + 1]
i += 2
if seg_len < 2:
break
if marker in {
0xC0,
0xC1,
0xC2,
0xC3,
0xC5,
0xC6,
0xC7,
0xC9,
0xCA,
0xCB,
0xCD,
0xCE,
0xCF,
}:
if i + 4 < len(data):
try:
h = (data[i + 1] << 8) + data[i + 2]
w = (data[i + 3] << 8) + data[i + 4]
return w, h
except Exception:
return 0, 0
i += seg_len - 2
return 0, 0
return 0, 0
@lru_cache(maxsize=16)
def _sns_img_roots(wxid_dir_str: str) -> tuple[str, ...]:
wxid_dir = Path(str(wxid_dir_str or "").strip())
cache_root = wxid_dir / "cache"
try:
month_dirs = [p for p in cache_root.iterdir() if p.is_dir()]
except Exception:
month_dirs = []
roots: list[str] = []
for mdir in month_dirs:
img_root = mdir / "Sns" / "Img"
try:
if img_root.exists() and img_root.is_dir():
roots.append(str(img_root))
except Exception:
continue
roots.sort()
return tuple(roots)
@lru_cache(maxsize=16)
def _sns_img_time_index(wxid_dir_str: str) -> tuple[list[float], list[str]]:
wxid_dir = Path(str(wxid_dir_str or "").strip())
out: list[tuple[float, str]] = []
cache_root = wxid_dir / "cache"
try:
month_dirs = [p for p in cache_root.iterdir() if p.is_dir()]
except Exception:
month_dirs = []
for mdir in month_dirs:
img_root = mdir / "Sns" / "Img"
try:
if not (img_root.exists() and img_root.is_dir()):
continue
except Exception:
continue
try:
for sub in img_root.iterdir():
if not sub.is_dir():
continue
for f in sub.iterdir():
try:
if not f.is_file():
continue
st = f.stat()
out.append((float(st.st_mtime), str(f)))
except Exception:
continue
except Exception:
continue
out.sort(key=lambda x: x[0])
mtimes = [m for m, _p in out]
paths = [_p for _m, _p in out]
return mtimes, paths
def _generate_sns_cache_key(tid: str, media_id: str, media_type: int = 2) -> str:
if not tid or not media_id:
return ""
raw_key = f"{tid}_{media_id}_{media_type}"
try:
return hashlib.md5(raw_key.encode("utf-8")).hexdigest()
except Exception:
return ""
def _resolve_sns_cached_image_path_by_cache_key(
*,
wxid_dir: Path,
cache_key: str,
create_time: int,
) -> Optional[str]:
key32 = _normalize_hex32(cache_key)
if not key32:
return None
sub = key32[:2]
rest = key32[2:]
roots = _sns_img_roots(str(wxid_dir))
if not roots:
return None
best: tuple[float, str] | None = None
for root_str in roots:
try:
p = Path(root_str) / sub / rest
if not (p.exists() and p.is_file()):
continue
st = p.stat()
score = abs(float(st.st_mtime) - float(create_time)) if create_time > 0 else -float(st.st_mtime)
if best is None or score < best[0]:
best = (score, str(p))
except Exception:
continue
return best[1] if best else None
def _resolve_sns_cached_image_path_by_md5(
*,
wxid_dir: Path,
md5: str,
create_time: int,
) -> Optional[str]:
md5_32 = _normalize_hex32(md5)
if not md5_32:
return None
sub = md5_32[:2]
rest = md5_32[2:]
roots = _sns_img_roots(str(wxid_dir))
if not roots:
return None
best: tuple[float, str] | None = None
for root_str in roots:
try:
p = Path(root_str) / sub / rest
if not (p.exists() and p.is_file()):
continue
st = p.stat()
score = abs(float(st.st_mtime) - float(create_time)) if create_time > 0 else -float(st.st_mtime)
if best is None or score < best[0]:
best = (score, str(p))
except Exception:
continue
return best[1] if best else None
def _resolve_sns_cached_image_path(
*,
account_dir_str: str,
create_time: int,
width: int,
height: int,
idx: int,
total_size: int = 0,
) -> Optional[str]:
total_size_i = int(total_size or 0)
must_match_size = width > 0 and height > 0
if (not must_match_size) and total_size_i <= 0:
return None
account_dir = Path(str(account_dir_str or "").strip())
if not account_dir.exists():
return None
wxid_dir = _resolve_account_wxid_dir(account_dir)
if not wxid_dir:
return None
mtimes, paths = _sns_img_time_index(str(wxid_dir))
if not mtimes:
return None
create_time_i = int(create_time or 0)
if create_time_i > 0:
window = 72 * 3600
lo = create_time_i - window
hi = create_time_i + window
l = bisect_left(mtimes, lo)
r = bisect_right(mtimes, hi)
if l >= r:
l = max(0, len(mtimes) - 800)
r = len(mtimes)
else:
l = max(0, len(mtimes) - 800)
r = len(mtimes)
candidates: list[tuple[float, str]] = []
for j in range(l, r):
try:
if create_time_i > 0:
candidates.append((abs(mtimes[j] - float(create_time_i)), paths[j]))
else:
candidates.append((-mtimes[j], paths[j]))
except Exception:
continue
candidates.sort(key=lambda x: x[0])
matched: list[tuple[int, float, str]] = []
for diff, pstr in candidates[:2000]:
try:
p = Path(pstr)
payload, media_type = _read_and_maybe_decrypt_media(p, account_dir)
if not payload or not str(media_type or "").startswith("image/"):
continue
if must_match_size:
w0, h0 = _image_size_from_bytes(payload, str(media_type or ""))
if (w0, h0) != (width, height):
continue
size_diff = abs(len(payload) - total_size_i) if total_size_i > 0 else 0
matched.append((int(size_diff), float(diff), pstr))
except Exception:
continue
if not matched:
return None
if must_match_size:
matched.sort(key=lambda x: (x[0], x[1], x[2]))
if total_size_i > 0:
return matched[0][2]
idx0 = max(0, int(idx or 0))
return matched[idx0][2] if idx0 < len(matched) else None
if total_size_i > 0:
matched.sort(key=lambda x: (x[0], x[1], x[2]))
return matched[0][2]
return None
def _format_dt(ts_seconds: Any) -> str:
try:
t = int(ts_seconds or 0)
@@ -1,63 +0,0 @@
import re
from collections.abc import MutableMapping
def add_sns_stage_timing_headers(
headers: MutableMapping[str, str],
*,
source: str,
hit_type: str = "",
x_enc: str = "",
) -> None:
"""Inject `Server-Timing` + `Timing-Allow-Origin` for SNS media stage inspection.
The frontend can't read `<img>` response headers, but browsers expose `Server-Timing` metrics
via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` allows it.
This helper is intentionally side-effect free beyond mutating `headers`.
"""
src = str(source or "").strip()
if not src:
return
ht = str(hit_type or "").strip()
xe = str(x_enc or "").strip()
if "Timing-Allow-Origin" not in headers:
headers["Timing-Allow-Origin"] = "*"
def _esc(v: str) -> str:
return v.replace("\\", "\\\\").replace('"', '\\"')
def _token(v: str) -> str:
raw = str(v or "").strip()
if not raw:
return ""
raw = raw.replace(" ", "_")
safe = re.sub(r"[^0-9A-Za-z_.-]+", "_", raw).strip("_")
if not safe:
return ""
return safe[:64]
parts: list[str] = []
src_tok = _token(src) or "unknown"
parts.append(f'sns_source_{src_tok};dur=0;desc="{_esc(src)}"')
if ht:
ht_tok = _token(ht)
if ht_tok:
parts.append(f'sns_hit_{ht_tok};dur=0;desc="{_esc(ht)}"')
if xe:
xe_tok = _token(xe)
if xe_tok:
parts.append(f'sns_xenc_{xe_tok};dur=0;desc="{_esc(xe)}"')
existing = str(headers.get("Server-Timing") or "").strip()
# Some responses may already have upstream `Server-Timing` metrics. Always append ours so
# the frontend can consistently read `sns_source_*` via ResourceTiming.serverTiming.
if existing and re.search(r"(^|,\\s*)sns_source(_|;)", existing):
return
combined = ", ".join(parts)
headers["Server-Timing"] = f"{existing}, {combined}" if existing else combined
@@ -0,0 +1,39 @@
import asyncio
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import sns # noqa: E402 pylint: disable=wrong-import-position
class TestSnsMediaRouteWeFlowDefault(unittest.TestCase):
def test_route_stops_after_remote_miss_by_default(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
with mock.patch("wechat_decrypt_tool.routers.sns._resolve_account_dir", return_value=account_dir):
with mock.patch("wechat_decrypt_tool.routers.sns._try_fetch_and_decrypt_sns_remote", return_value=None):
with self.assertRaises(sns.HTTPException) as ctx:
asyncio.run(
sns.get_sns_media(
account="acc",
url="https://mmsns.qpic.cn/sns/test/0",
key="123",
token="tkn",
use_cache=1,
)
)
self.assertEqual(ctx.exception.status_code, 404)
if __name__ == "__main__":
unittest.main()
-40
View File
@@ -1,40 +0,0 @@
import sys
import unittest
from pathlib import Path
from starlette.responses import Response
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.sns_stage_timing import add_sns_stage_timing_headers # noqa: E402 pylint: disable=wrong-import-position
class TestSnsStageServerTiming(unittest.TestCase):
def test_injects_server_timing_when_missing(self):
resp = Response(content=b"ok")
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertIn("sns_source_", st)
self.assertIn("proxy", st)
def test_appends_when_upstream_server_timing_exists(self):
resp = Response(content=b"ok")
resp.headers["Server-Timing"] = "edge;dur=1"
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertIn("edge;dur=1", st)
self.assertIn("sns_source_", st)
def test_does_not_duplicate_existing_sns_source_metric(self):
resp = Response(content=b"ok")
resp.headers["Server-Timing"] = 'sns_source_proxy;dur=0;desc="proxy"'
add_sns_stage_timing_headers(resp.headers, source="proxy")
st = str(resp.headers.get("Server-Timing") or "")
self.assertEqual(st.count("sns_source_"), 1)
if __name__ == "__main__":
unittest.main()