@@ -422,6 +471,21 @@ const onMediaError = (postId, idx) => {
mediaErrors.value[mediaErrorKey(postId, idx)] = true
}
+const articleThumbErrors = ref({})
+
+const hasArticleThumbError = (postId) => !!articleThumbErrors.value[postId]
+
+const onArticleThumbError = (postId) => {
+ articleThumbErrors.value[postId] = true
+}
+
+// (原有的函数保持不变)
+const getArticleThumbProxyUrl = (contentUrl) => {
+ const u = String(contentUrl || '').trim()
+ if (!u) return ''
+ return `${mediaBase}/api/sns/article_thumb?url=${encodeURIComponent(u)}`
+}
+
// Right-click context menu (copy text / JSON) to help debug SNS parsing issues.
const contextMenu = ref({ visible: false, x: 0, y: 0, post: null })
@@ -954,4 +1018,13 @@ onUnmounted(() => {
document.removeEventListener('click', onGlobalClick)
document.removeEventListener('keydown', onGlobalKeyDown)
})
+
+const getProxyExternalUrl = (url) => {
+ // 目前难以计算enc,代理获取封面图(thumbnail)
+ const u = String(url || '').trim()
+ if (!u) return ''
+ return `${mediaBase}/api/chat/media/proxy_image?url=${encodeURIComponent(u)}`
+}
+
+
diff --git a/src/wechat_decrypt_tool/routers/sns.py b/src/wechat_decrypt_tool/routers/sns.py
index 7a17690..9cc5c65 100644
--- a/src/wechat_decrypt_tool/routers/sns.py
+++ b/src/wechat_decrypt_tool/routers/sns.py
@@ -4,6 +4,8 @@ from pathlib import Path
import hashlib
import json
import re
+import httpx
+import html # 修复&转义的问题!!!
import sqlite3
import time
import xml.etree.ElementTree as ET
@@ -93,6 +95,10 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
"media": [],
"likes": [],
"comments": [],
+ "type": 1, # 默认类型
+ "title": "",
+ "contentUrl": "",
+ "finderFeed": {}
}
xml_str = str(xml_text or "").strip()
@@ -113,54 +119,72 @@ def _parse_timeline_xml(xml_text: str, fallback_username: str) -> dict[str, Any]
if isinstance(v, str) and v.strip():
return v.strip()
return ""
+ # &转义!!
+ def _clean_url(u: str) -> str:
+ if not u:
+ return ""
- out["username"] = (
- _find_text(".//TimelineObject/username", ".//TimelineObject/user_name", ".//TimelineObject/userName", ".//username")
- or fallback_username
- )
+ cleaned = html.unescape(u)
+ cleaned = cleaned.replace("&", "&")
+ return cleaned.strip()
+
+ out["username"] = _find_text(".//TimelineObject/username", ".//TimelineObject/user_name",
+ ".//username") or fallback_username
out["createTime"] = _safe_int(_find_text(".//TimelineObject/createTime", ".//createTime"))
out["contentDesc"] = _find_text(".//TimelineObject/contentDesc", ".//contentDesc")
out["location"] = _build_location_text(root.find(".//location"))
+ # --- 提取内容类型 ---
+ post_type = _safe_int(_find_text(".//ContentObject/type", ".//type"))
+ out["type"] = post_type
+
+ # --- 如果是公众号文章 (Type 3) ---
+ if post_type == 3:
+ out["title"] = _find_text(".//ContentObject/title")
+ out["contentUrl"] = _clean_url(_find_text(".//ContentObject/contentUrl"))
+
+ # --- 如果是视频号 (Type 28) ---
+ if post_type == 28:
+ out["title"] = _find_text(".//ContentObject/title")
+ out["contentUrl"] = _clean_url(_find_text(".//ContentObject/contentUrl"))
+ out["finderFeed"] = {
+ "nickname": _find_text(".//finderFeed/nickname"),
+ "desc": _find_text(".//finderFeed/desc"),
+ "thumbUrl": _clean_url(
+ _find_text(".//finderFeed/mediaList/media/thumbUrl", ".//finderFeed/mediaList/media/coverUrl")),
+ "url": _clean_url(_find_text(".//finderFeed/mediaList/media/url"))
+ }
+
media: list[dict[str, Any]] = []
try:
for m in root.findall(".//mediaList//media"):
mt = _safe_int(m.findtext("type"))
+ url_el = m.find("url") if m.find("url") is not None else m.find("urlV")
+ thumb_el = m.find("thumb") if m.find("thumb") is not None else m.find("thumbV")
- # WeChat stores important download/auth hints in attributes (key/enc_idx/token/md5...).
- # NOTE: xml.etree.ElementTree.Element is falsy when it has no children.
- # So we must check `is None` instead of using `or`, otherwise `
` would be treated as missing.
- url_el = m.find("url")
- if url_el is None:
- url_el = m.find("urlV")
- thumb_el = m.find("thumb")
- if thumb_el is None:
- thumb_el = m.find("thumbV")
-
- url = str((url_el.text if url_el is not None else "") or "").strip()
- thumb = str((thumb_el.text if thumb_el is not None else "") or "").strip()
+ url = _clean_url(url_el.text if url_el is not None else "")
+ thumb = _clean_url(thumb_el.text if thumb_el is not None else "")
url_attrs = dict(url_el.attrib) if url_el is not None and url_el.attrib else {}
thumb_attrs = dict(thumb_el.attrib) if thumb_el is not None and thumb_el.attrib else {}
-
media_id = str(m.findtext("id") or "").strip()
size_el = m.find("size")
size = dict(size_el.attrib) if size_el is not None and size_el.attrib else {}
+
if not url and not thumb:
continue
- media.append(
- {
- "type": mt,
- "id": media_id,
- "url": url,
- "thumb": thumb,
- "urlAttrs": url_attrs,
- "thumbAttrs": thumb_attrs,
- "size": size,
- }
- )
+
+ media.append({
+ "type": mt,
+ "id": media_id,
+ "url": url,
+ "thumb": thumb,
+ "urlAttrs": url_attrs,
+ "thumbAttrs": thumb_attrs,
+ "size": size,
+ })
except Exception:
- media = []
+ pass
out["media"] = media
likes: list[str] = []
@@ -789,6 +813,11 @@ def list_sns_timeline(
# Enrich with parsed XML when available.
location = str(r.get("location") or "")
+
+ post_type = 1
+ title = ""
+ content_url = ""
+ finder_feed = {}
try:
tid_u = int(r.get("id") or 0)
tid_s = (tid_u & 0xFFFFFFFFFFFFFFFF)
@@ -799,6 +828,12 @@ def list_sns_timeline(
parsed = _parse_timeline_xml(xml, uname)
if parsed.get("location"):
location = str(parsed.get("location") or "")
+
+ post_type = parsed.get("type", 1)
+ title = parsed.get("title", "")
+ content_url = parsed.get("contentUrl", "")
+ finder_feed = parsed.get("finderFeed", {})
+
pmedia = parsed.get("media") or []
if isinstance(pmedia, list) and isinstance(media, list) and pmedia:
# Merge by index (best-effort).
@@ -835,6 +870,10 @@ def list_sns_timeline(
"media": media,
"likes": likes,
"comments": comments,
+ "type": post_type,
+ "title": title,
+ "contentUrl": content_url,
+ "finderFeed": finder_feed,
}
)
@@ -911,6 +950,10 @@ def list_sns_timeline(
"media": parsed.get("media") or [],
"likes": parsed.get("likes") or [],
"comments": parsed.get("comments") or [],
+ "type": parsed.get("type", 1),
+ "title": parsed.get("title", ""),
+ "contentUrl": parsed.get("contentUrl", ""),
+ "finderFeed": parsed.get("finderFeed", {}),
}
)
@@ -987,6 +1030,7 @@ async def get_sns_media(
)
if exact_match_path:
+ print(f"=====exact_match_path======={exact_match_path}=============")
try:
payload, mtype = _read_and_maybe_decrypt_media(Path(exact_match_path), account_dir)
if payload and str(mtype or "").startswith("image/"):
@@ -997,6 +1041,8 @@ async def get_sns_media(
except Exception:
pass
+ print("no exact match path")
+
# 0) User-picked cache key override (stable across candidate ordering).
pick_key = _normalize_hex32(pick)
if pick_key:
@@ -1105,3 +1151,37 @@ async def get_sns_media(
raise
except Exception as e:
raise HTTPException(status_code=502, detail=f"Fetch sns media failed: {e}")
+
+
+@router.get("/api/sns/article_thumb", summary="提取公众号文章封面图")
+async def proxy_article_thumb(url: str):
+ u = str(url or "").strip()
+ if not u.startswith("http"):
+ raise HTTPException(status_code=400, detail="Invalid URL")
+
+ try:
+ async with httpx.AsyncClient(timeout=10.0) as client:
+ headers = {"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"}
+ resp = await client.get(u, headers=headers)
+ resp.raise_for_status()
+ html_text = resp.text
+
+ match = re.search(r'["\'](https?://[^"\']*?mmbiz_[a-zA-Z]+[^"\']*?)["\']', html_text)
+
+ if not match:
+ raise HTTPException(status_code=404, detail="未在 HTML 中找到图片 URL")
+
+ img_url = match.group(1)
+ img_url = html.unescape(img_url).replace("&", "&")
+
+ img_resp = await client.get(img_url, headers=headers)
+ img_resp.raise_for_status()
+
+ return Response(
+ content=img_resp.content,
+ media_type=img_resp.headers.get("Content-Type", "image/jpeg")
+ )
+
+ except Exception as e:
+ logger.warning(f"[sns] 提取公众号封面失败 url={u[:50]}... : {e}")
+ raise HTTPException(status_code=404, detail="无法获取文章封面")
\ No newline at end of file