mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
improvement(sns-export): 支持朋友圈多格式离线导出
- 新增 html/json/txt 导出格式参数 - 为导出结果补充 index 与 manifest 信息 - 补充总动态数和当前联系人进度字段
This commit is contained in:
@@ -13,23 +13,26 @@ from ..sns_export_service import SNS_EXPORT_MANAGER
|
||||
router = APIRouter(route_class=PathFixRoute)
|
||||
|
||||
ExportScope = Literal["selected", "all"]
|
||||
ExportFormat = Literal["html", "json", "txt"]
|
||||
|
||||
|
||||
class SnsExportCreateRequest(BaseModel):
|
||||
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
|
||||
scope: ExportScope = Field("selected", description="导出范围:selected=指定联系人;all=全部联系人")
|
||||
usernames: list[str] = Field(default_factory=list, description="朋友圈 username 列表(scope=selected 时使用)")
|
||||
format: ExportFormat = Field("html", description="导出格式:html/json/txt")
|
||||
use_cache: bool = Field(True, description="是否复用导出过程中的本地缓存(默认开启)")
|
||||
output_dir: Optional[str] = Field(None, description="导出目录绝对路径(可选;不填时使用默认目录)")
|
||||
file_name: Optional[str] = Field(None, description="导出 zip 文件名(可选,不含/含 .zip 都可)")
|
||||
|
||||
|
||||
@router.post("/api/sns/exports", summary="创建朋友圈导出任务(离线 HTML zip)")
|
||||
@router.post("/api/sns/exports", summary="创建朋友圈导出任务(离线 ZIP,支持 HTML/JSON/TXT)")
|
||||
async def create_sns_export(req: SnsExportCreateRequest):
|
||||
job = SNS_EXPORT_MANAGER.create_job(
|
||||
account=req.account,
|
||||
scope=req.scope,
|
||||
usernames=req.usernames,
|
||||
export_format=req.format,
|
||||
use_cache=bool(req.use_cache),
|
||||
output_dir=req.output_dir,
|
||||
file_name=req.file_name,
|
||||
@@ -111,4 +114,3 @@ async def cancel_sns_export(export_id: str):
|
||||
if not ok:
|
||||
raise HTTPException(status_code=404, detail="Export not found.")
|
||||
return {"status": "success"}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
"""SNS (Moments) HTML export service (offline ZIP)."""
|
||||
"""SNS (Moments) export service (offline ZIP)."""
|
||||
|
||||
import asyncio
|
||||
from bisect import bisect_left, bisect_right
|
||||
@@ -50,6 +50,7 @@ logger = get_logger(__name__)
|
||||
|
||||
ExportStatus = Literal["queued", "running", "done", "error", "cancelled"]
|
||||
ExportScope = Literal["selected", "all"]
|
||||
ExportFormat = Literal["html", "json", "txt"]
|
||||
|
||||
_INVALID_PATH_CHARS = re.compile(r'[<>:"/\\|?*\x00-\x1f]')
|
||||
_HEX_ONLY_RE = re.compile(r"[^0-9a-fA-F]+")
|
||||
@@ -408,9 +409,71 @@ def _esc_attr(v: Any) -> str:
|
||||
return html.escape(str(v or ""), quote=True)
|
||||
|
||||
|
||||
def _json_safe(value: Any) -> Any:
|
||||
if value is None or isinstance(value, (str, int, float, bool)):
|
||||
return value
|
||||
if isinstance(value, dict):
|
||||
return {str(k): _json_safe(v) for k, v in value.items()}
|
||||
if isinstance(value, (list, tuple)):
|
||||
return [_json_safe(v) for v in value]
|
||||
return str(value)
|
||||
|
||||
|
||||
def _guess_official_name_from_title(title: str) -> str:
|
||||
t0 = str(title or "").strip()
|
||||
if not t0:
|
||||
return ""
|
||||
m = re.search(r"[《「【](.+?)[》」】]", t0)
|
||||
return str(m.group(1) or "").strip() if m and m.group(1) else ""
|
||||
|
||||
|
||||
def _format_moment_type_label(post: dict[str, Any]) -> str:
|
||||
try:
|
||||
t = int(post.get("type") or 0)
|
||||
except Exception:
|
||||
t = 0
|
||||
if t == 3:
|
||||
off = post.get("official") if isinstance(post.get("official"), dict) else {}
|
||||
st0 = off.get("serviceType") if isinstance(off, dict) else None
|
||||
try:
|
||||
st = int(st0) if st0 not in (None, "") else None
|
||||
except Exception:
|
||||
st = None
|
||||
prefix = "服务号" if st == 1 else "公众号"
|
||||
name = str(off.get("displayName") or "").strip() if isinstance(off, dict) else ""
|
||||
if not name:
|
||||
name = _guess_official_name_from_title(str(post.get("title") or ""))
|
||||
return f"{prefix}·{name}" if name else prefix
|
||||
if t == 28:
|
||||
ff = post.get("finderFeed") if isinstance(post.get("finderFeed"), dict) else {}
|
||||
name = str(ff.get("nickname") or "").strip() if isinstance(ff, dict) else ""
|
||||
return f"视频号·{name}" if name else "视频号"
|
||||
if t in (5, 42):
|
||||
name0 = str(post.get("sourceName") or "").strip()
|
||||
if name0:
|
||||
return name0
|
||||
url0 = str(post.get("contentUrl") or "").strip()
|
||||
if not url0:
|
||||
ml0 = post.get("media") if isinstance(post.get("media"), list) else []
|
||||
m0 = ml0[0] if (ml0 and isinstance(ml0[0], dict)) else {}
|
||||
url0 = str(m0.get("url") or "").strip()
|
||||
if url0:
|
||||
s = re.sub(r"^https?://", "", url0.strip(), flags=re.I)
|
||||
s = s.split("#", 1)[0].split("?", 1)[0].rstrip("/")
|
||||
return s or ("音乐" if t == 42 else "外部分享")
|
||||
return "音乐" if t == 42 else "外部分享"
|
||||
return ""
|
||||
|
||||
|
||||
_SNS_EXPORT_CSS_PATCH = """
|
||||
/* Moments export tweaks (keep consistent with frontend `sns.vue`). */
|
||||
body { background-color: #EDEDED; }
|
||||
.wse-sns-post-list > .wse-sns-post:first-child {
|
||||
padding-top: 0;
|
||||
}
|
||||
.wse-sns-post-list > .wse-sns-post:first-child > .wse-sns-post-row {
|
||||
padding-top: 12px;
|
||||
}
|
||||
.wse-live-photo video { display: none; }
|
||||
.wse-live-photo:hover video { display: block; }
|
||||
.wse-live-photo:hover img { display: none; }
|
||||
@@ -477,7 +540,10 @@ class ExportProgress:
|
||||
users_done: int = 0
|
||||
current_username: str = ""
|
||||
current_display_name: str = ""
|
||||
posts_total: int = 0
|
||||
posts_exported: int = 0
|
||||
current_user_posts_total: int = 0
|
||||
current_user_posts_done: int = 0
|
||||
media_copied: int = 0
|
||||
media_missing: int = 0
|
||||
|
||||
@@ -513,7 +579,10 @@ class ExportJob:
|
||||
"usersDone": self.progress.users_done,
|
||||
"currentUsername": self.progress.current_username,
|
||||
"currentDisplayName": self.progress.current_display_name,
|
||||
"postsTotal": self.progress.posts_total,
|
||||
"postsExported": self.progress.posts_exported,
|
||||
"currentUserPostsTotal": self.progress.current_user_posts_total,
|
||||
"currentUserPostsDone": self.progress.current_user_posts_done,
|
||||
"mediaCopied": self.progress.media_copied,
|
||||
"mediaMissing": self.progress.media_missing,
|
||||
},
|
||||
@@ -554,6 +623,7 @@ class SnsExportManager:
|
||||
account: Optional[str],
|
||||
scope: ExportScope,
|
||||
usernames: list[str],
|
||||
export_format: ExportFormat,
|
||||
use_cache: bool,
|
||||
output_dir: Optional[str],
|
||||
file_name: Optional[str],
|
||||
@@ -568,6 +638,7 @@ class SnsExportManager:
|
||||
options={
|
||||
"scope": str(scope or "selected"),
|
||||
"usernames": [str(u or "").strip() for u in (usernames or []) if str(u or "").strip()],
|
||||
"format": str(export_format or "html"),
|
||||
"useCache": bool(use_cache),
|
||||
"outputDir": str(output_dir or "").strip(),
|
||||
"fileName": str(file_name or "").strip(),
|
||||
@@ -627,6 +698,10 @@ class SnsExportManager:
|
||||
opts = dict(job.options or {})
|
||||
scope_raw = str(opts.get("scope") or "selected").strip() or "selected"
|
||||
scope: ExportScope = "all" if scope_raw == "all" else "selected" # type: ignore[assignment]
|
||||
export_format_raw = str(opts.get("format") or "html").strip().lower() or "html"
|
||||
if export_format_raw not in {"html", "json", "txt"}:
|
||||
raise ValueError(f"Unsupported export format: {export_format_raw}")
|
||||
export_format: ExportFormat = export_format_raw # type: ignore[assignment]
|
||||
target_usernames = [str(u or "").strip() for u in (opts.get("usernames") or []) if str(u or "").strip()]
|
||||
if scope == "selected" and not target_usernames:
|
||||
raise ValueError("No target usernames to export.")
|
||||
@@ -638,13 +713,13 @@ class SnsExportManager:
|
||||
base_name = str(opts.get("fileName") or "").strip()
|
||||
if not base_name:
|
||||
if scope == "all":
|
||||
base_name = f"wechat_sns_export_{account_dir.name}_{ts}_{job.export_id}.zip"
|
||||
base_name = f"wechat_sns_export_{account_dir.name}_{export_format}_{ts}_{job.export_id}.zip"
|
||||
else:
|
||||
hint = _safe_name(target_usernames[0], max_len=40) or "selected"
|
||||
base_name = f"wechat_sns_export_{account_dir.name}_{hint}_{ts}_{job.export_id}.zip"
|
||||
base_name = f"wechat_sns_export_{account_dir.name}_{hint}_{export_format}_{ts}_{job.export_id}.zip"
|
||||
if not base_name.lower().endswith(".zip"):
|
||||
base_name += ".zip"
|
||||
base_name = _safe_name(base_name, max_len=120) or f"wechat_sns_export_{account_dir.name}_{ts}_{job.export_id}.zip"
|
||||
base_name = _safe_name(base_name, max_len=120) or f"wechat_sns_export_{account_dir.name}_{export_format}_{ts}_{job.export_id}.zip"
|
||||
|
||||
final_zip = (exports_root / base_name).resolve()
|
||||
tmp_zip = (exports_root / f".{base_name}.{job.export_id}.part").resolve()
|
||||
@@ -1120,6 +1195,92 @@ class SnsExportManager:
|
||||
media_written[cache_key] = arc
|
||||
return arc
|
||||
|
||||
def _build_post_json_record(post: dict[str, Any]) -> dict[str, Any]:
|
||||
item = _json_safe(post)
|
||||
if isinstance(item, dict):
|
||||
item["momentTypeLabel"] = _format_moment_type_label(post)
|
||||
item["createTimeText"] = _format_dt(post.get("createTime"))
|
||||
return item if isinstance(item, dict) else {"value": item}
|
||||
|
||||
def _render_post_text(post: dict[str, Any], index: int) -> str:
|
||||
ts = _format_dt(post.get("createTime")) or "未知时间"
|
||||
post_id = str(post.get("id") or post.get("tid") or "").strip()
|
||||
content_desc = str(post.get("contentDesc") or "").strip()
|
||||
location = str(post.get("location") or "").strip()
|
||||
title0 = str(post.get("title") or "").strip()
|
||||
content_url = str(post.get("contentUrl") or "").strip()
|
||||
moment_label = _format_moment_type_label(post)
|
||||
media_list = post.get("media") if isinstance(post.get("media"), list) else []
|
||||
likes = post.get("likes") if isinstance(post.get("likes"), list) else []
|
||||
comments = post.get("comments") if isinstance(post.get("comments"), list) else []
|
||||
|
||||
lines = [f"#{index}", f"时间: {ts}"]
|
||||
if post_id:
|
||||
lines.append(f"ID: {post_id}")
|
||||
if moment_label:
|
||||
lines.append(f"类型: {moment_label}")
|
||||
if content_desc:
|
||||
lines.append("内容:")
|
||||
lines.append(content_desc)
|
||||
if title0:
|
||||
lines.append(f"标题: {title0}")
|
||||
if content_url:
|
||||
lines.append(f"链接: {content_url}")
|
||||
if location:
|
||||
lines.append(f"位置: {location}")
|
||||
|
||||
if media_list:
|
||||
lines.append("媒体:")
|
||||
for idx0, media0 in enumerate(media_list, start=1):
|
||||
m = media0 if isinstance(media0, dict) else {}
|
||||
mtype = str(m.get("type") or "").strip() or "-"
|
||||
mid = str(m.get("id") or "").strip()
|
||||
murl = str(m.get("url") or "").strip()
|
||||
mthumb = str(m.get("thumb") or "").strip()
|
||||
media_parts = [f"- [{idx0}] type={mtype}"]
|
||||
if mid:
|
||||
media_parts.append(f"id={mid}")
|
||||
if murl:
|
||||
media_parts.append(f"url={murl}")
|
||||
if mthumb and mthumb != murl:
|
||||
media_parts.append(f"thumb={mthumb}")
|
||||
lines.append(" ".join(media_parts))
|
||||
|
||||
if likes:
|
||||
like_names = [str(x or "").strip() for x in likes if str(x or "").strip()]
|
||||
if like_names:
|
||||
lines.append("点赞: " + "、".join(like_names))
|
||||
|
||||
if comments:
|
||||
lines.append("评论:")
|
||||
for idx0, comment0 in enumerate(comments, start=1):
|
||||
comment = comment0 if isinstance(comment0, dict) else {}
|
||||
cn = _clean_name(comment.get("nickname") or comment.get("displayName") or comment.get("username") or "") or "未知"
|
||||
refn = _clean_name(comment.get("refNickname") or comment.get("refUsername") or comment.get("refUserName") or "")
|
||||
text = str(comment.get("content") or "").strip()
|
||||
prefix = f"- [{idx0}] {cn}"
|
||||
if refn:
|
||||
prefix += f" 回复 {refn}"
|
||||
if text:
|
||||
prefix += f": {text}"
|
||||
lines.append(prefix)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
def _render_user_text(*, username: str, display_name: str, post_count: int, posts: list[dict[str, Any]]) -> str:
|
||||
header = [
|
||||
"朋友圈导出",
|
||||
f"联系人: {display_name or username}",
|
||||
f"用户名: {username}",
|
||||
f"条目数: {post_count}",
|
||||
"",
|
||||
]
|
||||
body: list[str] = []
|
||||
for idx0, post0 in enumerate(posts, start=1):
|
||||
body.append(_render_post_text(post0, idx0))
|
||||
body.append("")
|
||||
return "\n".join(header + body).rstrip() + "\n"
|
||||
|
||||
def render_media_block(*, zf: zipfile.ZipFile, post: dict[str, Any]) -> str:
|
||||
media = post.get("media") if isinstance(post.get("media"), list) else []
|
||||
if not media:
|
||||
@@ -1331,51 +1492,6 @@ class SnsExportManager:
|
||||
likes = post.get("likes") if isinstance(post.get("likes"), list) else []
|
||||
comments = post.get("comments") if isinstance(post.get("comments"), list) else []
|
||||
|
||||
def guess_official_name_from_title(title: str) -> str:
|
||||
t0 = str(title or "").strip()
|
||||
if not t0:
|
||||
return ""
|
||||
m = re.search(r"[《「【](.+?)[》」】]", t0)
|
||||
return str(m.group(1) or "").strip() if m and m.group(1) else ""
|
||||
|
||||
def format_moment_type_label(p: dict[str, Any]) -> str:
|
||||
try:
|
||||
t = int(p.get("type") or 0)
|
||||
except Exception:
|
||||
t = 0
|
||||
if t == 3:
|
||||
off = p.get("official") if isinstance(p.get("official"), dict) else {}
|
||||
st0 = off.get("serviceType") if isinstance(off, dict) else None
|
||||
try:
|
||||
st = int(st0) if st0 not in (None, "") else None
|
||||
except Exception:
|
||||
st = None
|
||||
prefix = "服务号" if st == 1 else "公众号"
|
||||
name = str(off.get("displayName") or "").strip() if isinstance(off, dict) else ""
|
||||
if not name:
|
||||
name = guess_official_name_from_title(str(p.get("title") or ""))
|
||||
return f"{prefix}·{name}" if name else prefix
|
||||
if t == 28:
|
||||
ff = p.get("finderFeed") if isinstance(p.get("finderFeed"), dict) else {}
|
||||
name = str(ff.get("nickname") or "").strip() if isinstance(ff, dict) else ""
|
||||
return f"视频号·{name}" if name else "视频号"
|
||||
if t in (5, 42):
|
||||
name0 = str(p.get("sourceName") or "").strip()
|
||||
if name0:
|
||||
return name0
|
||||
url0 = str(p.get("contentUrl") or "").strip()
|
||||
if not url0:
|
||||
ml0 = p.get("media") if isinstance(p.get("media"), list) else []
|
||||
m0 = ml0[0] if (ml0 and isinstance(ml0[0], dict)) else {}
|
||||
url0 = str(m0.get("url") or "").strip()
|
||||
if url0:
|
||||
# host+path (no query) as a readable fallback label.
|
||||
s = re.sub(r"^https?://", "", url0.strip(), flags=re.I)
|
||||
s = s.split("#", 1)[0].split("?", 1)[0].rstrip("/")
|
||||
return s or ("音乐" if t == 42 else "外部分享")
|
||||
return "音乐" if t == 42 else "外部分享"
|
||||
return ""
|
||||
|
||||
def format_finder_feed_card_text(p: dict[str, Any]) -> str:
|
||||
title0 = str(p.get("title") or "").strip()
|
||||
if title0:
|
||||
@@ -1426,15 +1542,15 @@ class SnsExportManager:
|
||||
f'style="background-color:#4B5563">{fallback}</div></div>'
|
||||
)
|
||||
|
||||
moment_label = format_moment_type_label(post)
|
||||
moment_label = _format_moment_type_label(post)
|
||||
try:
|
||||
post_type = int(post.get("type") or 1)
|
||||
except Exception:
|
||||
post_type = 1
|
||||
|
||||
out: list[str] = []
|
||||
out.append(f'<div class="bg-white rounded-sm px-4 py-4 mb-3" id="{_esc_attr(pid)}">')
|
||||
out.append('<div class="flex items-start gap-3">')
|
||||
out.append(f'<div class="wse-sns-post bg-white rounded-sm px-4 py-4 mb-3" id="{_esc_attr(pid)}">')
|
||||
out.append('<div class="wse-sns-post-row flex items-start gap-3">')
|
||||
out.append(avatar_html)
|
||||
out.append('<div class="flex-1 min-w-0">')
|
||||
out.append(f'<div class="text-sm font-medium leading-5 text-[#576b95]">{_esc_text(display)}</div>')
|
||||
@@ -1558,6 +1674,7 @@ class SnsExportManager:
|
||||
'<svg class="w-6 h-6 text-white" fill="currentColor" viewBox="0 0 24 24"><path d="M8 5v14l11-7z"/></svg>'
|
||||
)
|
||||
out.append("</div></div></div>")
|
||||
out.append("</div>")
|
||||
else:
|
||||
out.append(render_media_block(zf=zf, post=post))
|
||||
|
||||
@@ -1635,7 +1752,7 @@ class SnsExportManager:
|
||||
avatar_arc = export_avatar_to_zip(zf=zf, username=username, display_name=display_name)
|
||||
|
||||
out: list[str] = []
|
||||
out.append('<div class="relative w-full mb-12 -mt-4 bg-white">')
|
||||
out.append('<div class="wse-sns-cover relative w-full -mt-4">')
|
||||
out.append('<div class="h-64 w-full bg-[#333333] relative overflow-hidden">')
|
||||
if cover_arc:
|
||||
out.append(
|
||||
@@ -1644,7 +1761,7 @@ class SnsExportManager:
|
||||
)
|
||||
out.append("</div>")
|
||||
|
||||
out.append('<div class="absolute right-4 -bottom-6 flex items-end gap-4">')
|
||||
out.append('<div class="absolute right-4 flex items-end gap-4" style="bottom:-12px; z-index:2;">')
|
||||
out.append(
|
||||
f'<div class="text-white font-bold text-xl mb-7 drop-shadow-md">{_esc_text(display_name or username)}</div>'
|
||||
)
|
||||
@@ -1667,22 +1784,24 @@ class SnsExportManager:
|
||||
|
||||
try:
|
||||
with zipfile.ZipFile(str(tmp_zip), mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||||
css_payload = _load_ui_css_bundle(ui_public_dir=ui_public_dir, report=report) + "\n\n" + _SNS_EXPORT_CSS_PATCH
|
||||
zf.writestr("assets/wechat-sns-export.css", css_payload)
|
||||
written.add("assets/wechat-sns-export.css")
|
||||
css_href = "assets/wechat-sns-export.css"
|
||||
if export_format == "html":
|
||||
css_payload = _load_ui_css_bundle(ui_public_dir=ui_public_dir, report=report) + "\n\n" + _SNS_EXPORT_CSS_PATCH
|
||||
zf.writestr(css_href, css_payload)
|
||||
written.add(css_href)
|
||||
|
||||
repo_root = Path(__file__).resolve().parents[2]
|
||||
wxemoji_src: Optional[Path] = None
|
||||
if ui_public_dir is not None:
|
||||
cand = Path(ui_public_dir) / "wxemoji"
|
||||
if cand.is_dir():
|
||||
wxemoji_src = cand
|
||||
if wxemoji_src is None:
|
||||
cand = repo_root / "frontend" / "public" / "wxemoji"
|
||||
if cand.is_dir():
|
||||
wxemoji_src = cand
|
||||
if wxemoji_src is not None:
|
||||
_zip_write_tree(zf=zf, src_dir=wxemoji_src, dest_prefix="wxemoji", written=written)
|
||||
repo_root = Path(__file__).resolve().parents[2]
|
||||
wxemoji_src: Optional[Path] = None
|
||||
if ui_public_dir is not None:
|
||||
cand = Path(ui_public_dir) / "wxemoji"
|
||||
if cand.is_dir():
|
||||
wxemoji_src = cand
|
||||
if wxemoji_src is None:
|
||||
cand = repo_root / "frontend" / "public" / "wxemoji"
|
||||
if cand.is_dir():
|
||||
wxemoji_src = cand
|
||||
if wxemoji_src is not None:
|
||||
_zip_write_tree(zf=zf, src_dir=wxemoji_src, dest_prefix="wxemoji", written=written)
|
||||
|
||||
if scope == "all":
|
||||
users = _load_sns_users(account_dir)
|
||||
@@ -1691,19 +1810,37 @@ class SnsExportManager:
|
||||
order = {u: i for i, u in enumerate(target_usernames)}
|
||||
users.sort(key=lambda x: order.get(str(x.get("username") or ""), 10**9))
|
||||
|
||||
total_posts_est = 0
|
||||
for user_item in users:
|
||||
try:
|
||||
total_posts_est += max(0, int(user_item.get("postCount") or 0))
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
with self._lock:
|
||||
job.progress.users_total = len(users)
|
||||
job.progress.posts_total = total_posts_est
|
||||
job.progress.posts_exported = 0
|
||||
job.progress.current_user_posts_total = 0
|
||||
job.progress.current_user_posts_done = 0
|
||||
|
||||
user_pages: list[dict[str, Any]] = []
|
||||
css_href = "assets/wechat-sns-export.css"
|
||||
user_outputs: list[dict[str, Any]] = []
|
||||
exported_at = datetime.now().isoformat(timespec="seconds")
|
||||
|
||||
for i, u in enumerate(users):
|
||||
should_cancel()
|
||||
uname = str(u.get("username") or "").strip()
|
||||
display = _clean_name(u.get("displayName")) or uname
|
||||
try:
|
||||
post_count_est = max(0, int(u.get("postCount") or 0))
|
||||
except Exception:
|
||||
post_count_est = 0
|
||||
safe_uname = _safe_name(uname, max_len=80) or hashlib.md5(uname.encode("utf-8", errors="ignore")).hexdigest()[:12]
|
||||
with self._lock:
|
||||
job.progress.current_username = uname
|
||||
job.progress.current_display_name = display
|
||||
job.progress.current_user_posts_total = post_count_est
|
||||
job.progress.current_user_posts_done = 0
|
||||
|
||||
posts_all: list[dict[str, Any]] = []
|
||||
cover_data: Optional[dict[str, Any]] = None
|
||||
@@ -1728,132 +1865,252 @@ class SnsExportManager:
|
||||
if not bool(resp.get("hasMore")):
|
||||
break
|
||||
|
||||
post_parts: list[str] = []
|
||||
for p in posts_all:
|
||||
should_cancel()
|
||||
post_parts.append(render_post_html(zf=zf, post=p))
|
||||
actual_post_count = len(posts_all)
|
||||
if actual_post_count != post_count_est:
|
||||
with self._lock:
|
||||
job.progress.posts_exported += 1
|
||||
job.progress.posts_total = max(
|
||||
job.progress.posts_exported,
|
||||
max(0, job.progress.posts_total + (actual_post_count - post_count_est)),
|
||||
)
|
||||
job.progress.current_user_posts_total = actual_post_count
|
||||
else:
|
||||
with self._lock:
|
||||
job.progress.current_user_posts_total = actual_post_count
|
||||
|
||||
safe_uname = _safe_name(uname, max_len=80) or hashlib.md5(uname.encode("utf-8", errors="ignore")).hexdigest()[:12]
|
||||
page_name = f"sns_{safe_uname}.html"
|
||||
title = f"朋友圈导出 - {display}"
|
||||
back_link = (
|
||||
'<a href="index.html" class="text-sm text-[#576b95] hover:underline">← 返回</a>'
|
||||
if scope == "all"
|
||||
else ""
|
||||
)
|
||||
cover_html = render_cover_header_html(zf=zf, username=uname, display_name=display, cover_data=cover_data)
|
||||
page_html = "\n".join(
|
||||
[
|
||||
"<!doctype html>",
|
||||
"<html>",
|
||||
"<head>",
|
||||
'<meta charset="utf-8" />',
|
||||
'<meta name="viewport" content="width=device-width, initial-scale=1" />',
|
||||
f"<title>{_esc_text(title)}</title>",
|
||||
f'<link rel="stylesheet" href="{_esc_attr(css_href)}" />',
|
||||
"</head>",
|
||||
'<body style="background-color:#EDEDED">',
|
||||
'<div class="min-h-screen" style="background-color:#EDEDED">',
|
||||
'<div class="max-w-2xl mx-auto px-4 py-4">',
|
||||
cover_html,
|
||||
('<div class="flex items-center justify-between mb-4">' + back_link + (f'<div class="text-xs text-gray-500 truncate">{_esc_text(uname)}</div>' if uname else "") + "</div>") if back_link else "",
|
||||
"".join(post_parts),
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</body>",
|
||||
"</html>",
|
||||
"",
|
||||
]
|
||||
)
|
||||
zf.writestr(page_name, page_html)
|
||||
written.add(page_name)
|
||||
output_name = ""
|
||||
if export_format == "html":
|
||||
post_parts: list[str] = []
|
||||
for p in posts_all:
|
||||
should_cancel()
|
||||
post_parts.append(render_post_html(zf=zf, post=p))
|
||||
with self._lock:
|
||||
job.progress.posts_exported += 1
|
||||
job.progress.current_user_posts_done += 1
|
||||
|
||||
user_pages.append(
|
||||
output_name = f"sns_{safe_uname}.html"
|
||||
title = f"朋友圈导出 - {display}"
|
||||
back_link = (
|
||||
'<a href="index.html" class="text-sm text-[#576b95] hover:underline">← 返回</a>'
|
||||
if scope == "all"
|
||||
else ""
|
||||
)
|
||||
cover_html = render_cover_header_html(zf=zf, username=uname, display_name=display, cover_data=cover_data)
|
||||
page_html = "\n".join(
|
||||
[
|
||||
"<!doctype html>",
|
||||
"<html>",
|
||||
"<head>",
|
||||
'<meta charset="utf-8" />',
|
||||
'<meta name="viewport" content="width=device-width, initial-scale=1" />',
|
||||
f"<title>{_esc_text(title)}</title>",
|
||||
f'<link rel="stylesheet" href="{_esc_attr(css_href)}" />',
|
||||
"</head>",
|
||||
'<body style="background-color:#EDEDED">',
|
||||
'<div class="min-h-screen" style="background-color:#EDEDED">',
|
||||
'<div class="wse-sns-page max-w-2xl mx-auto px-4 py-4">',
|
||||
cover_html,
|
||||
('<div class="flex items-center justify-between mb-4">' + back_link + (f'<div class="text-xs text-gray-500 truncate">{_esc_text(uname)}</div>' if uname else "") + "</div>") if back_link else "",
|
||||
'<div class="wse-sns-post-list">' + "".join(post_parts) + "</div>",
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</body>",
|
||||
"</html>",
|
||||
"",
|
||||
]
|
||||
)
|
||||
zf.writestr(output_name, page_html)
|
||||
written.add(output_name)
|
||||
elif export_format == "json":
|
||||
exported_posts: list[dict[str, Any]] = []
|
||||
for p in posts_all:
|
||||
should_cancel()
|
||||
exported_posts.append(_build_post_json_record(p))
|
||||
with self._lock:
|
||||
job.progress.posts_exported += 1
|
||||
job.progress.current_user_posts_done += 1
|
||||
|
||||
output_name = f"sns_{safe_uname}.json"
|
||||
json_payload: dict[str, Any] = {
|
||||
"exportedAt": exported_at,
|
||||
"exportId": job.export_id,
|
||||
"account": account_dir.name,
|
||||
"scope": scope,
|
||||
"format": export_format,
|
||||
"username": uname,
|
||||
"displayName": display,
|
||||
"postCount": actual_post_count,
|
||||
"posts": exported_posts,
|
||||
}
|
||||
if isinstance(cover_data, dict) and cover_data:
|
||||
json_payload["cover"] = _json_safe(cover_data)
|
||||
zf.writestr(output_name, json.dumps(json_payload, ensure_ascii=False, indent=2))
|
||||
written.add(output_name)
|
||||
else:
|
||||
for _idx0, _post0 in enumerate(posts_all, start=1):
|
||||
should_cancel()
|
||||
with self._lock:
|
||||
job.progress.posts_exported += 1
|
||||
job.progress.current_user_posts_done += 1
|
||||
|
||||
output_name = f"sns_{safe_uname}.txt"
|
||||
zf.writestr(
|
||||
output_name,
|
||||
_render_user_text(
|
||||
username=uname,
|
||||
display_name=display,
|
||||
post_count=actual_post_count,
|
||||
posts=posts_all,
|
||||
),
|
||||
)
|
||||
written.add(output_name)
|
||||
|
||||
user_outputs.append(
|
||||
{
|
||||
"username": uname,
|
||||
"displayName": display,
|
||||
"postCount": int(u.get("postCount") or 0),
|
||||
"page": page_name,
|
||||
"postCount": actual_post_count,
|
||||
"entry": output_name,
|
||||
}
|
||||
)
|
||||
|
||||
with self._lock:
|
||||
job.progress.users_done = i + 1
|
||||
job.progress.current_user_posts_done = actual_post_count
|
||||
|
||||
if scope == "all":
|
||||
rows: list[str] = []
|
||||
for u in user_pages:
|
||||
uname = str(u.get("username") or "").strip()
|
||||
display = _clean_name(u.get("displayName")) or uname
|
||||
pc = int(u.get("postCount") or 0)
|
||||
href = str(u.get("page") or "").strip()
|
||||
avatar_arc = export_avatar_to_zip(zf=zf, username=uname, display_name=display)
|
||||
if avatar_arc:
|
||||
avatar_html = (
|
||||
'<div class="w-8 h-8 rounded-md overflow-hidden bg-gray-300 flex-shrink-0">'
|
||||
f'<img src="{_esc_attr(avatar_arc)}" class="w-full h-full object-cover" '
|
||||
f'alt="{_esc_attr(display or uname)}" loading="lazy" referrerpolicy="no-referrer" />'
|
||||
"</div>"
|
||||
if export_format == "html":
|
||||
if scope == "all":
|
||||
rows: list[str] = []
|
||||
for u in user_outputs:
|
||||
uname = str(u.get("username") or "").strip()
|
||||
display = _clean_name(u.get("displayName")) or uname
|
||||
pc = int(u.get("postCount") or 0)
|
||||
href = str(u.get("entry") or "").strip()
|
||||
avatar_arc = export_avatar_to_zip(zf=zf, username=uname, display_name=display)
|
||||
if avatar_arc:
|
||||
avatar_html = (
|
||||
'<div class="w-8 h-8 rounded-md overflow-hidden bg-gray-300 flex-shrink-0">'
|
||||
f'<img src="{_esc_attr(avatar_arc)}" class="w-full h-full object-cover" '
|
||||
f'alt="{_esc_attr(display or uname)}" loading="lazy" referrerpolicy="no-referrer" />'
|
||||
"</div>"
|
||||
)
|
||||
else:
|
||||
fallback = _esc_text((display or uname or "友")[:1] or "友")
|
||||
avatar_html = (
|
||||
'<div class="w-8 h-8 rounded-md overflow-hidden bg-gray-300 flex-shrink-0">'
|
||||
'<div class="w-full h-full flex items-center justify-center text-white text-xs font-bold" '
|
||||
f'style="background-color:#4B5563">{fallback}</div></div>'
|
||||
)
|
||||
rows.append(
|
||||
'<a class="px-3 py-2 text-sm cursor-pointer flex items-center gap-2 border-b border-gray-100 hover:bg-gray-50" '
|
||||
f'href="{_esc_attr(href)}">'
|
||||
f"{avatar_html}"
|
||||
'<div class="flex-1 min-w-0">'
|
||||
f'<div class="truncate">{_esc_text(display)}</div>'
|
||||
f'<div class="text-[11px] text-gray-400 truncate">{_esc_text(uname)} · {pc} 条</div>'
|
||||
"</div></a>"
|
||||
)
|
||||
else:
|
||||
fallback = _esc_text((display or uname or "友")[:1] or "友")
|
||||
avatar_html = (
|
||||
'<div class="w-8 h-8 rounded-md overflow-hidden bg-gray-300 flex-shrink-0">'
|
||||
'<div class="w-full h-full flex items-center justify-center text-white text-xs font-bold" '
|
||||
f'style="background-color:#4B5563">{fallback}</div></div>'
|
||||
)
|
||||
rows.append(
|
||||
'<a class="px-3 py-2 text-sm cursor-pointer flex items-center gap-2 border-b border-gray-100 hover:bg-gray-50" '
|
||||
f'href="{_esc_attr(href)}">'
|
||||
f"{avatar_html}"
|
||||
'<div class="flex-1 min-w-0">'
|
||||
f'<div class="truncate">{_esc_text(display)}</div>'
|
||||
f'<div class="text-[11px] text-gray-400 truncate">{_esc_text(uname)} · {pc} 条</div>'
|
||||
"</div></a>"
|
||||
)
|
||||
|
||||
index_html = "\n".join(
|
||||
[
|
||||
"<!doctype html>",
|
||||
"<html>",
|
||||
"<head>",
|
||||
'<meta charset="utf-8" />',
|
||||
'<meta name="viewport" content="width=device-width, initial-scale=1" />',
|
||||
"<title>朋友圈导出</title>",
|
||||
f'<link rel="stylesheet" href="{_esc_attr(css_href)}" />',
|
||||
"</head>",
|
||||
'<body style="background-color:#EDEDED">',
|
||||
'<div class="min-h-screen" style="background-color:#EDEDED">',
|
||||
'<div class="max-w-2xl mx-auto px-4 py-4">',
|
||||
'<div class="mb-4 flex items-center justify-between">',
|
||||
'<div class="text-sm font-semibold text-gray-700">朋友圈联系人</div>',
|
||||
f'<div class="text-xs text-gray-500">{len(user_pages)} 人</div>',
|
||||
"</div>",
|
||||
'<div class="bg-white rounded-sm overflow-hidden border border-gray-200">',
|
||||
"".join(rows),
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</body>",
|
||||
"</html>",
|
||||
"",
|
||||
]
|
||||
)
|
||||
zf.writestr("index.html", index_html)
|
||||
written.add("index.html")
|
||||
else:
|
||||
only_page = user_pages[0]["page"] if user_pages else ""
|
||||
if only_page:
|
||||
index_html = (
|
||||
"<!doctype html><html><head>"
|
||||
'<meta charset="utf-8" />'
|
||||
f'<meta http-equiv="refresh" content="0; url={_esc_attr(only_page)}" />'
|
||||
"</head><body></body></html>"
|
||||
index_html = "\n".join(
|
||||
[
|
||||
"<!doctype html>",
|
||||
"<html>",
|
||||
"<head>",
|
||||
'<meta charset="utf-8" />',
|
||||
'<meta name="viewport" content="width=device-width, initial-scale=1" />',
|
||||
"<title>朋友圈导出</title>",
|
||||
f'<link rel="stylesheet" href="{_esc_attr(css_href)}" />',
|
||||
"</head>",
|
||||
'<body style="background-color:#EDEDED">',
|
||||
'<div class="min-h-screen" style="background-color:#EDEDED">',
|
||||
'<div class="max-w-2xl mx-auto px-4 py-4">',
|
||||
'<div class="mb-4 flex items-center justify-between">',
|
||||
'<div class="text-sm font-semibold text-gray-700">朋友圈联系人</div>',
|
||||
f'<div class="text-xs text-gray-500">{len(user_outputs)} 人</div>',
|
||||
"</div>",
|
||||
'<div class="bg-white rounded-sm overflow-hidden border border-gray-200">',
|
||||
"".join(rows),
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</div>",
|
||||
"</body>",
|
||||
"</html>",
|
||||
"",
|
||||
]
|
||||
)
|
||||
zf.writestr("index.html", index_html)
|
||||
written.add("index.html")
|
||||
else:
|
||||
only_page = user_outputs[0]["entry"] if user_outputs else ""
|
||||
if only_page:
|
||||
index_html = (
|
||||
"<!doctype html><html><head>"
|
||||
'<meta charset="utf-8" />'
|
||||
f'<meta http-equiv="refresh" content="0; url={_esc_attr(only_page)}" />'
|
||||
"</head><body></body></html>"
|
||||
)
|
||||
zf.writestr("index.html", index_html)
|
||||
written.add("index.html")
|
||||
elif export_format == "json":
|
||||
zf.writestr(
|
||||
"index.json",
|
||||
json.dumps(
|
||||
{
|
||||
"exportedAt": exported_at,
|
||||
"exportId": job.export_id,
|
||||
"account": account_dir.name,
|
||||
"scope": scope,
|
||||
"format": export_format,
|
||||
"users": user_outputs,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
),
|
||||
)
|
||||
written.add("index.json")
|
||||
else:
|
||||
lines = [
|
||||
"朋友圈导出",
|
||||
f"导出时间: {exported_at}",
|
||||
f"账号: {account_dir.name}",
|
||||
f"范围: {'全部联系人' if scope == 'all' else '指定联系人'}",
|
||||
f"格式: {export_format}",
|
||||
"",
|
||||
]
|
||||
for item in user_outputs:
|
||||
lines.append(
|
||||
f"- {item.get('displayName') or item.get('username') or ''} "
|
||||
f"({item.get('username') or ''}) · {int(item.get('postCount') or 0)} 条 -> {item.get('entry') or ''}"
|
||||
)
|
||||
zf.writestr("index.txt", "\n".join(lines).rstrip() + "\n")
|
||||
written.add("index.txt")
|
||||
|
||||
zf.writestr(
|
||||
"manifest.json",
|
||||
json.dumps(
|
||||
{
|
||||
"schemaVersion": 1,
|
||||
"exportedAt": exported_at,
|
||||
"exportId": job.export_id,
|
||||
"account": account_dir.name,
|
||||
"scope": scope,
|
||||
"format": export_format,
|
||||
"options": {
|
||||
"useCache": use_cache,
|
||||
},
|
||||
"stats": {
|
||||
"users": len(user_outputs),
|
||||
"postsExported": job.progress.posts_exported,
|
||||
"postsTotal": job.progress.posts_total,
|
||||
"mediaCopied": job.progress.media_copied,
|
||||
"mediaMissing": job.progress.media_missing,
|
||||
},
|
||||
"entries": user_outputs,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
),
|
||||
)
|
||||
written.add("manifest.json")
|
||||
|
||||
try:
|
||||
zf.writestr("export_report.json", json.dumps(report, ensure_ascii=False, indent=2))
|
||||
@@ -1881,8 +2138,9 @@ class SnsExportManager:
|
||||
if job.status != "cancelled":
|
||||
job.status = "done"
|
||||
job.finished_at = time.time()
|
||||
job.progress.current_user_posts_done = job.progress.current_user_posts_total
|
||||
|
||||
return tmp_zip
|
||||
return final_out
|
||||
|
||||
|
||||
SNS_EXPORT_MANAGER = SnsExportManager()
|
||||
|
||||
Reference in New Issue
Block a user