-
-
+
+
![朋友圈封面]()
+
{{ selfInfo.nickname || '获取中...' }}
@@ -26,9 +32,14 @@
-
{{ error }}
-
加载中…
-
暂无朋友圈数据
+
{{ error }}
+
+
+
+
暂无朋友圈数据
@@ -327,6 +338,8 @@ const hasMore = ref(true)
const isLoading = ref(false)
const error = ref('')
+const coverData = ref(null)
+
const pageSize = 20
const mediaBase = process.client ? 'http://localhost:8000' : ''
@@ -782,10 +795,12 @@ const loadPosts = async ({ reset }) => {
offset
})
const items = resp?.timeline || []
+
if (reset) {
- posts.value = items
+ posts.value = items.filter(p => p.type !== 7)
+ coverData.value = resp?.cover || null
} else {
- posts.value = [...posts.value, ...items]
+ posts.value = [...posts.value, ...items.filter(p => p.type !== 7)]
}
hasMore.value = !!resp?.hasMore
} catch (e) {
diff --git a/src/wechat_decrypt_tool/routers/sns.py b/src/wechat_decrypt_tool/routers/sns.py
index 3c43a53..59254bb 100644
--- a/src/wechat_decrypt_tool/routers/sns.py
+++ b/src/wechat_decrypt_tool/routers/sns.py
@@ -763,6 +763,54 @@ def _list_sns_cached_image_candidate_keys(
return tuple(out)
+def _get_sns_cover(account_dir: Path, target_wxid: str) -> Optional[dict[str, Any]]:
+ """无论多古老,强行揪出用户最近的一次朋友圈封面 (type=7)"""
+ cover_sql = f"SELECT tid, content FROM SnsTimeLine WHERE user_name = '{target_wxid}' AND content LIKE '%7%' ORDER BY tid DESC LIMIT 1"
+ cover_xml = None
+ cover_tid = None
+
+ try:
+ if WCDB_REALTIME.is_connected(account_dir.name):
+ conn = WCDB_REALTIME.ensure_connected(account_dir)
+ with conn.lock:
+ sns_db_path = conn.db_storage_dir / "sns" / "sns.db"
+ if not sns_db_path.exists():
+ sns_db_path = conn.db_storage_dir / "sns.db"
+ # 利用 exec_query 强行查
+ rows = _wcdb_exec_query(conn.handle, kind="media", path=str(sns_db_path), sql=cover_sql)
+ if rows:
+ cover_xml = rows[0].get("content")
+ cover_tid = rows[0].get("tid")
+ except Exception as e:
+ logger.warning(f"[sns] WCDB cover fetch failed: {e}")
+
+ # 2. 如果没查到,降级从本地解密的 sns.db 查
+ if not cover_xml:
+ sns_db_path = account_dir / "sns.db"
+ if sns_db_path.exists():
+ try:
+ # 只读模式防止锁死
+ conn_sq = sqlite3.connect(f"file:{sns_db_path}?mode=ro", uri=True)
+ conn_sq.row_factory = sqlite3.Row
+ row = conn_sq.execute(cover_sql).fetchone()
+ if row:
+ cover_xml = str(row["content"] or "")
+ cover_tid = row["tid"]
+ conn_sq.close()
+ except Exception as e:
+ logger.warning(f"[sns] SQLite cover fetch failed: {e}")
+
+ if cover_xml:
+ parsed = _parse_timeline_xml(cover_xml, target_wxid)
+ return {
+ "id": str(cover_tid or ""),
+ "media": parsed.get("media", []),
+ "type": 7
+ }
+ return None
+
+
+
@router.get("/api/sns/self_info", summary="获取个人信息(wxid和nickname)")
def api_sns_self_info(account: Optional[str] = None):
@@ -864,6 +912,11 @@ def list_sns_timeline(
users = _parse_csv_list(usernames)
kw = str(keyword or "").strip()
+ cover_data = None
+ if offset == 0:
+ target_wxid = users[0] if users else account_dir.name
+ cover_data = _get_sns_cover(account_dir, target_wxid)
+
# Prefer real-time WCDB access (reads the latest encrypted db_storage/sns/sns.db).
# Fallback to the decrypted sqlite copy in output/{account}/sns.db.
try:
@@ -962,6 +1015,10 @@ def list_sns_timeline(
location = str(parsed.get("location") or "")
post_type = parsed.get("type", 1)
+
+ if post_type == 7: # 朋友圈封面
+ continue
+
title = parsed.get("title", "")
content_url = parsed.get("contentUrl", "")
finder_feed = parsed.get("finderFeed", {})
@@ -1009,7 +1066,14 @@ def list_sns_timeline(
}
)
- return {"timeline": timeline, "hasMore": has_more, "limit": limit, "offset": offset, "source": "wcdb"}
+ return {
+ "timeline": timeline,
+ "hasMore": has_more,
+ "limit": limit,
+ "offset": offset,
+ "source": "wcdb",
+ "cover": cover_data,
+ }
except WCDBRealtimeError as e:
logger.info("[sns] wcdb realtime unavailable: %s", e)
except Exception as e:
@@ -1089,7 +1153,13 @@ def list_sns_timeline(
}
)
- return {"timeline": timeline, "hasMore": has_more, "limit": limit, "offset": offset}
+ return {
+ "timeline": timeline,
+ "hasMore": has_more,
+ "limit": limit,
+ "offset": offset,
+ "cover": cover_data,
+ }
class SnsMediaPicksSaveRequest(BaseModel):
@@ -1144,8 +1214,8 @@ async def get_sns_media(
avoid_picked: int = 0,
post_id: Optional[str] = None,
media_id: Optional[str] = None,
- post_type: int = 1, # <--- 接收前端传来的 post_type
- media_type: int = 2, # <--- 接收前端传来的 media_type
+ post_type: int = 1,
+ media_type: int = 2,
pick: Optional[str] = None,
md5: Optional[str] = None,
url: Optional[str] = None,
@@ -1154,6 +1224,17 @@ async def get_sns_media(
wxid_dir = _resolve_account_wxid_dir(account_dir)
if wxid_dir and post_id and media_id:
+ if int(post_type) == 7:
+ raw_key = f"{post_id}_{media_id}_4" # 硬编码
+
+ md5_str = hashlib.md5(raw_key.encode("utf-8")).hexdigest()
+ bkg_path = wxid_dir / "business" / "sns" / "bkg" / md5_str[:2] / md5_str
+
+ if bkg_path.exists() and bkg_path.is_file():
+ print(f"===== Hit Bkg Cover ======= {bkg_path}")
+
+ return FileResponse(bkg_path, media_type="image/jpeg",
+ headers={"Cache-Control": "public, max-age=31536000"})
exact_match_path = None
hit_type = ""