From 6bed048948a09185eba639acd94c496ab7796095 Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Mon, 20 Apr 2026 15:24:26 +0800 Subject: [PATCH] =?UTF-8?q?improvement(sns-export):=20=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E6=9C=8B=E5=8F=8B=E5=9C=88=E5=A4=9A=E6=A0=BC=E5=BC=8F=E7=A6=BB?= =?UTF-8?q?=E7=BA=BF=E5=AF=BC=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 html/json/txt 导出格式参数 - 为导出结果补充 index 与 manifest 信息 - 补充总动态数和当前联系人进度字段 --- src/wechat_decrypt_tool/routers/sns_export.py | 6 +- src/wechat_decrypt_tool/sns_export_service.py | 620 +++++++++++++----- 2 files changed, 443 insertions(+), 183 deletions(-) diff --git a/src/wechat_decrypt_tool/routers/sns_export.py b/src/wechat_decrypt_tool/routers/sns_export.py index c6b4f7d..6f4e2a4 100644 --- a/src/wechat_decrypt_tool/routers/sns_export.py +++ b/src/wechat_decrypt_tool/routers/sns_export.py @@ -13,23 +13,26 @@ from ..sns_export_service import SNS_EXPORT_MANAGER router = APIRouter(route_class=PathFixRoute) ExportScope = Literal["selected", "all"] +ExportFormat = Literal["html", "json", "txt"] class SnsExportCreateRequest(BaseModel): account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)") scope: ExportScope = Field("selected", description="导出范围:selected=指定联系人;all=全部联系人") usernames: list[str] = Field(default_factory=list, description="朋友圈 username 列表(scope=selected 时使用)") + format: ExportFormat = Field("html", description="导出格式:html/json/txt") use_cache: bool = Field(True, description="是否复用导出过程中的本地缓存(默认开启)") output_dir: Optional[str] = Field(None, description="导出目录绝对路径(可选;不填时使用默认目录)") file_name: Optional[str] = Field(None, description="导出 zip 文件名(可选,不含/含 .zip 都可)") -@router.post("/api/sns/exports", summary="创建朋友圈导出任务(离线 HTML zip)") +@router.post("/api/sns/exports", summary="创建朋友圈导出任务(离线 ZIP,支持 HTML/JSON/TXT)") async def create_sns_export(req: SnsExportCreateRequest): job = SNS_EXPORT_MANAGER.create_job( account=req.account, scope=req.scope, usernames=req.usernames, + export_format=req.format, use_cache=bool(req.use_cache), output_dir=req.output_dir, file_name=req.file_name, @@ -111,4 +114,3 @@ async def cancel_sns_export(export_id: str): if not ok: raise HTTPException(status_code=404, detail="Export not found.") return {"status": "success"} - diff --git a/src/wechat_decrypt_tool/sns_export_service.py b/src/wechat_decrypt_tool/sns_export_service.py index 75ebb6e..44b5997 100644 --- a/src/wechat_decrypt_tool/sns_export_service.py +++ b/src/wechat_decrypt_tool/sns_export_service.py @@ -1,6 +1,6 @@ from __future__ import annotations -"""SNS (Moments) HTML export service (offline ZIP).""" +"""SNS (Moments) export service (offline ZIP).""" import asyncio from bisect import bisect_left, bisect_right @@ -50,6 +50,7 @@ logger = get_logger(__name__) ExportStatus = Literal["queued", "running", "done", "error", "cancelled"] ExportScope = Literal["selected", "all"] +ExportFormat = Literal["html", "json", "txt"] _INVALID_PATH_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]') _HEX_ONLY_RE = re.compile(r"[^0-9a-fA-F]+") @@ -408,9 +409,71 @@ def _esc_attr(v: Any) -> str: return html.escape(str(v or ""), quote=True) +def _json_safe(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, dict): + return {str(k): _json_safe(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_json_safe(v) for v in value] + return str(value) + + +def _guess_official_name_from_title(title: str) -> str: + t0 = str(title or "").strip() + if not t0: + return "" + m = re.search(r"[《「【](.+?)[》」】]", t0) + return str(m.group(1) or "").strip() if m and m.group(1) else "" + + +def _format_moment_type_label(post: dict[str, Any]) -> str: + try: + t = int(post.get("type") or 0) + except Exception: + t = 0 + if t == 3: + off = post.get("official") if isinstance(post.get("official"), dict) else {} + st0 = off.get("serviceType") if isinstance(off, dict) else None + try: + st = int(st0) if st0 not in (None, "") else None + except Exception: + st = None + prefix = "服务号" if st == 1 else "公众号" + name = str(off.get("displayName") or "").strip() if isinstance(off, dict) else "" + if not name: + name = _guess_official_name_from_title(str(post.get("title") or "")) + return f"{prefix}·{name}" if name else prefix + if t == 28: + ff = post.get("finderFeed") if isinstance(post.get("finderFeed"), dict) else {} + name = str(ff.get("nickname") or "").strip() if isinstance(ff, dict) else "" + return f"视频号·{name}" if name else "视频号" + if t in (5, 42): + name0 = str(post.get("sourceName") or "").strip() + if name0: + return name0 + url0 = str(post.get("contentUrl") or "").strip() + if not url0: + ml0 = post.get("media") if isinstance(post.get("media"), list) else [] + m0 = ml0[0] if (ml0 and isinstance(ml0[0], dict)) else {} + url0 = str(m0.get("url") or "").strip() + if url0: + s = re.sub(r"^https?://", "", url0.strip(), flags=re.I) + s = s.split("#", 1)[0].split("?", 1)[0].rstrip("/") + return s or ("音乐" if t == 42 else "外部分享") + return "音乐" if t == 42 else "外部分享" + return "" + + _SNS_EXPORT_CSS_PATCH = """ /* Moments export tweaks (keep consistent with frontend `sns.vue`). */ body { background-color: #EDEDED; } +.wse-sns-post-list > .wse-sns-post:first-child { + padding-top: 0; +} +.wse-sns-post-list > .wse-sns-post:first-child > .wse-sns-post-row { + padding-top: 12px; +} .wse-live-photo video { display: none; } .wse-live-photo:hover video { display: block; } .wse-live-photo:hover img { display: none; } @@ -477,7 +540,10 @@ class ExportProgress: users_done: int = 0 current_username: str = "" current_display_name: str = "" + posts_total: int = 0 posts_exported: int = 0 + current_user_posts_total: int = 0 + current_user_posts_done: int = 0 media_copied: int = 0 media_missing: int = 0 @@ -513,7 +579,10 @@ class ExportJob: "usersDone": self.progress.users_done, "currentUsername": self.progress.current_username, "currentDisplayName": self.progress.current_display_name, + "postsTotal": self.progress.posts_total, "postsExported": self.progress.posts_exported, + "currentUserPostsTotal": self.progress.current_user_posts_total, + "currentUserPostsDone": self.progress.current_user_posts_done, "mediaCopied": self.progress.media_copied, "mediaMissing": self.progress.media_missing, }, @@ -554,6 +623,7 @@ class SnsExportManager: account: Optional[str], scope: ExportScope, usernames: list[str], + export_format: ExportFormat, use_cache: bool, output_dir: Optional[str], file_name: Optional[str], @@ -568,6 +638,7 @@ class SnsExportManager: options={ "scope": str(scope or "selected"), "usernames": [str(u or "").strip() for u in (usernames or []) if str(u or "").strip()], + "format": str(export_format or "html"), "useCache": bool(use_cache), "outputDir": str(output_dir or "").strip(), "fileName": str(file_name or "").strip(), @@ -627,6 +698,10 @@ class SnsExportManager: opts = dict(job.options or {}) scope_raw = str(opts.get("scope") or "selected").strip() or "selected" scope: ExportScope = "all" if scope_raw == "all" else "selected" # type: ignore[assignment] + export_format_raw = str(opts.get("format") or "html").strip().lower() or "html" + if export_format_raw not in {"html", "json", "txt"}: + raise ValueError(f"Unsupported export format: {export_format_raw}") + export_format: ExportFormat = export_format_raw # type: ignore[assignment] target_usernames = [str(u or "").strip() for u in (opts.get("usernames") or []) if str(u or "").strip()] if scope == "selected" and not target_usernames: raise ValueError("No target usernames to export.") @@ -638,13 +713,13 @@ class SnsExportManager: base_name = str(opts.get("fileName") or "").strip() if not base_name: if scope == "all": - base_name = f"wechat_sns_export_{account_dir.name}_{ts}_{job.export_id}.zip" + base_name = f"wechat_sns_export_{account_dir.name}_{export_format}_{ts}_{job.export_id}.zip" else: hint = _safe_name(target_usernames[0], max_len=40) or "selected" - base_name = f"wechat_sns_export_{account_dir.name}_{hint}_{ts}_{job.export_id}.zip" + base_name = f"wechat_sns_export_{account_dir.name}_{hint}_{export_format}_{ts}_{job.export_id}.zip" if not base_name.lower().endswith(".zip"): base_name += ".zip" - base_name = _safe_name(base_name, max_len=120) or f"wechat_sns_export_{account_dir.name}_{ts}_{job.export_id}.zip" + base_name = _safe_name(base_name, max_len=120) or f"wechat_sns_export_{account_dir.name}_{export_format}_{ts}_{job.export_id}.zip" final_zip = (exports_root / base_name).resolve() tmp_zip = (exports_root / f".{base_name}.{job.export_id}.part").resolve() @@ -1120,6 +1195,92 @@ class SnsExportManager: media_written[cache_key] = arc return arc + def _build_post_json_record(post: dict[str, Any]) -> dict[str, Any]: + item = _json_safe(post) + if isinstance(item, dict): + item["momentTypeLabel"] = _format_moment_type_label(post) + item["createTimeText"] = _format_dt(post.get("createTime")) + return item if isinstance(item, dict) else {"value": item} + + def _render_post_text(post: dict[str, Any], index: int) -> str: + ts = _format_dt(post.get("createTime")) or "未知时间" + post_id = str(post.get("id") or post.get("tid") or "").strip() + content_desc = str(post.get("contentDesc") or "").strip() + location = str(post.get("location") or "").strip() + title0 = str(post.get("title") or "").strip() + content_url = str(post.get("contentUrl") or "").strip() + moment_label = _format_moment_type_label(post) + media_list = post.get("media") if isinstance(post.get("media"), list) else [] + likes = post.get("likes") if isinstance(post.get("likes"), list) else [] + comments = post.get("comments") if isinstance(post.get("comments"), list) else [] + + lines = [f"#{index}", f"时间: {ts}"] + if post_id: + lines.append(f"ID: {post_id}") + if moment_label: + lines.append(f"类型: {moment_label}") + if content_desc: + lines.append("内容:") + lines.append(content_desc) + if title0: + lines.append(f"标题: {title0}") + if content_url: + lines.append(f"链接: {content_url}") + if location: + lines.append(f"位置: {location}") + + if media_list: + lines.append("媒体:") + for idx0, media0 in enumerate(media_list, start=1): + m = media0 if isinstance(media0, dict) else {} + mtype = str(m.get("type") or "").strip() or "-" + mid = str(m.get("id") or "").strip() + murl = str(m.get("url") or "").strip() + mthumb = str(m.get("thumb") or "").strip() + media_parts = [f"- [{idx0}] type={mtype}"] + if mid: + media_parts.append(f"id={mid}") + if murl: + media_parts.append(f"url={murl}") + if mthumb and mthumb != murl: + media_parts.append(f"thumb={mthumb}") + lines.append(" ".join(media_parts)) + + if likes: + like_names = [str(x or "").strip() for x in likes if str(x or "").strip()] + if like_names: + lines.append("点赞: " + "、".join(like_names)) + + if comments: + lines.append("评论:") + for idx0, comment0 in enumerate(comments, start=1): + comment = comment0 if isinstance(comment0, dict) else {} + cn = _clean_name(comment.get("nickname") or comment.get("displayName") or comment.get("username") or "") or "未知" + refn = _clean_name(comment.get("refNickname") or comment.get("refUsername") or comment.get("refUserName") or "") + text = str(comment.get("content") or "").strip() + prefix = f"- [{idx0}] {cn}" + if refn: + prefix += f" 回复 {refn}" + if text: + prefix += f": {text}" + lines.append(prefix) + + return "\n".join(lines) + + def _render_user_text(*, username: str, display_name: str, post_count: int, posts: list[dict[str, Any]]) -> str: + header = [ + "朋友圈导出", + f"联系人: {display_name or username}", + f"用户名: {username}", + f"条目数: {post_count}", + "", + ] + body: list[str] = [] + for idx0, post0 in enumerate(posts, start=1): + body.append(_render_post_text(post0, idx0)) + body.append("") + return "\n".join(header + body).rstrip() + "\n" + def render_media_block(*, zf: zipfile.ZipFile, post: dict[str, Any]) -> str: media = post.get("media") if isinstance(post.get("media"), list) else [] if not media: @@ -1331,51 +1492,6 @@ class SnsExportManager: likes = post.get("likes") if isinstance(post.get("likes"), list) else [] comments = post.get("comments") if isinstance(post.get("comments"), list) else [] - def guess_official_name_from_title(title: str) -> str: - t0 = str(title or "").strip() - if not t0: - return "" - m = re.search(r"[《「【](.+?)[》」】]", t0) - return str(m.group(1) or "").strip() if m and m.group(1) else "" - - def format_moment_type_label(p: dict[str, Any]) -> str: - try: - t = int(p.get("type") or 0) - except Exception: - t = 0 - if t == 3: - off = p.get("official") if isinstance(p.get("official"), dict) else {} - st0 = off.get("serviceType") if isinstance(off, dict) else None - try: - st = int(st0) if st0 not in (None, "") else None - except Exception: - st = None - prefix = "服务号" if st == 1 else "公众号" - name = str(off.get("displayName") or "").strip() if isinstance(off, dict) else "" - if not name: - name = guess_official_name_from_title(str(p.get("title") or "")) - return f"{prefix}·{name}" if name else prefix - if t == 28: - ff = p.get("finderFeed") if isinstance(p.get("finderFeed"), dict) else {} - name = str(ff.get("nickname") or "").strip() if isinstance(ff, dict) else "" - return f"视频号·{name}" if name else "视频号" - if t in (5, 42): - name0 = str(p.get("sourceName") or "").strip() - if name0: - return name0 - url0 = str(p.get("contentUrl") or "").strip() - if not url0: - ml0 = p.get("media") if isinstance(p.get("media"), list) else [] - m0 = ml0[0] if (ml0 and isinstance(ml0[0], dict)) else {} - url0 = str(m0.get("url") or "").strip() - if url0: - # host+path (no query) as a readable fallback label. - s = re.sub(r"^https?://", "", url0.strip(), flags=re.I) - s = s.split("#", 1)[0].split("?", 1)[0].rstrip("/") - return s or ("音乐" if t == 42 else "外部分享") - return "音乐" if t == 42 else "外部分享" - return "" - def format_finder_feed_card_text(p: dict[str, Any]) -> str: title0 = str(p.get("title") or "").strip() if title0: @@ -1426,15 +1542,15 @@ class SnsExportManager: f'style="background-color:#4B5563">{fallback}' ) - moment_label = format_moment_type_label(post) + moment_label = _format_moment_type_label(post) try: post_type = int(post.get("type") or 1) except Exception: post_type = 1 out: list[str] = [] - out.append(f'