diff --git a/pyproject.toml b/pyproject.toml index 78cd6c2..7209a0b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,6 +18,7 @@ dependencies = [ "loguru>=0.7.0", "zstandard>=0.23.0", "pilk>=0.2.4", + "pypinyin>=0.53.0", ] [project.optional-dependencies] diff --git a/src/wechat_decrypt_tool/routers/wrapped.py b/src/wechat_decrypt_tool/routers/wrapped.py index d3b248a..e95f339 100644 --- a/src/wechat_decrypt_tool/routers/wrapped.py +++ b/src/wechat_decrypt_tool/routers/wrapped.py @@ -3,10 +3,10 @@ from __future__ import annotations import asyncio from typing import Optional -from fastapi import APIRouter, Query +from fastapi import APIRouter, HTTPException, Path, Query from ..path_fix import PathFixRoute -from ..wrapped.service import build_wrapped_annual_response +from ..wrapped.service import build_wrapped_annual_card, build_wrapped_annual_meta, build_wrapped_annual_response router = APIRouter(route_class=PathFixRoute) @@ -17,7 +17,39 @@ async def wrapped_annual( account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"), refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"), ): - """返回年度总结数据(目前仅实现第 1 个点子:年度赛博作息表)。""" + """返回年度总结完整数据(一次性包含全部卡片,可能较慢)。""" # This endpoint performs blocking sqlite/file IO, so run it in a worker thread. return await asyncio.to_thread(build_wrapped_annual_response, account=account, year=year, refresh=refresh) + + +@router.get("/api/wrapped/annual/meta", summary="微信聊天年度总结(WeChat Wrapped)- 目录(轻量)") +async def wrapped_annual_meta( + year: Optional[int] = Query(None, description="年份(例如 2026)。默认当前年份。"), + account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"), + refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"), +): + """返回年度总结的目录/元信息,用于前端懒加载每一页。""" + + return await asyncio.to_thread(build_wrapped_annual_meta, account=account, year=year, refresh=refresh) + + +@router.get("/api/wrapped/annual/cards/{card_id}", summary="微信聊天年度总结(WeChat Wrapped)- 单张卡片(按页加载)") +async def wrapped_annual_card( + card_id: int = Path(..., description="卡片ID(与前端页面一一对应)", ge=0), + year: Optional[int] = Query(None, description="年份(例如 2026)。默认当前年份。"), + account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"), + refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"), +): + """按卡片 ID 返回单页数据(避免首屏一次性计算全部卡片)。""" + + try: + return await asyncio.to_thread( + build_wrapped_annual_card, + account=account, + year=year, + card_id=card_id, + refresh=refresh, + ) + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) from e diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_00_global_overview.py b/src/wechat_decrypt_tool/wrapped/cards/card_00_global_overview.py new file mode 100644 index 0000000..6dfdf4c --- /dev/null +++ b/src/wechat_decrypt_tool/wrapped/cards/card_00_global_overview.py @@ -0,0 +1,759 @@ +from __future__ import annotations + +import hashlib +import re +import sqlite3 +import time +from collections import Counter +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +from .card_01_cyber_schedule import WeekdayHourHeatmap, compute_weekday_hour_heatmap +from ...chat_search_index import get_chat_search_index_db_path +from ...chat_helpers import ( + _build_avatar_url, + _decode_sqlite_text, + _iter_message_db_paths, + _load_contact_rows, + _pick_avatar_url, + _pick_display_name, + _quote_ident, + _should_keep_session, +) +from ...logging_config import get_logger + +logger = get_logger(__name__) + + +_MD5_HEX_RE = re.compile(r"(?i)[0-9a-f]{32}") + + +@dataclass(frozen=True) +class GlobalOverviewStats: + year: int + active_days: int + local_type_counts: dict[int, int] + kind_counts: dict[str, int] + latest_ts: int + top_phrase: Optional[tuple[str, int]] + top_emoji: Optional[tuple[str, int]] + top_contact: Optional[tuple[str, int]] + top_group: Optional[tuple[str, int]] + + +def _year_range_epoch_seconds(year: int) -> tuple[int, int]: + # Keep the same semantics as other parts of the project: local time boundaries. + start = int(datetime(year, 1, 1).timestamp()) + end = int(datetime(year + 1, 1, 1).timestamp()) + return start, end + + +def _list_message_tables(conn: sqlite3.Connection) -> list[str]: + try: + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + except Exception: + return [] + names: list[str] = [] + for r in rows: + if not r or not r[0]: + continue + name = str(r[0]) + ln = name.lower() + if ln.startswith(("msg_", "chat_")): + names.append(name) + return names + + +def _list_session_usernames(session_db_path: Path) -> list[str]: + if not session_db_path.exists(): + return [] + + conn = sqlite3.connect(str(session_db_path)) + try: + try: + rows = conn.execute("SELECT username FROM SessionTable").fetchall() + except sqlite3.OperationalError: + rows = conn.execute("SELECT username FROM Session").fetchall() + except Exception: + rows = [] + finally: + conn.close() + + out: list[str] = [] + for r in rows: + if not r or not r[0]: + continue + u = str(r[0]).strip() + if u: + out.append(u) + return out + + +def _mask_name(name: str) -> str: + s = str(name or "").strip() + if not s: + return "" + if len(s) == 1: + return "*" + if len(s) == 2: + return s[0] + "*" + return s[0] + ("*" * (len(s) - 2)) + s[-1] + + +def _normalize_phrase(v: Any) -> str: + s = _decode_sqlite_text(v).strip() + if not s: + return "" + s = re.sub(r"\s+", " ", s).strip() + if not s: + return "" + if len(s) > 12: + return "" + lower = s.lower() + if "http://" in lower or "https://" in lower: + return "" + if s.startswith("<"): + return "" + # Avoid pure punctuation / numbers. + if not re.search(r"[\u4e00-\u9fffA-Za-z]", s): + return "" + return s + + +def _normalize_emoji(v: Any) -> str: + s = _decode_sqlite_text(v).strip() + if not s: + return "" + s = re.sub(r"\s+", " ", s).strip() + if not s or len(s) > 48: + return "" + if s.startswith("<"): + return "" + # If it is an md5 or some opaque token, don't show it. + if re.fullmatch(r"(?i)[0-9a-f]{32}", s): + return "" + return s + + +def _kind_from_local_type(t: int) -> str: + # See `_infer_local_type` in chat_helpers for known values. + if t == 1: + return "text" + if t == 3: + return "image" + if t == 34: + return "voice" + if t == 43: + return "video" + if t == 47: + return "emoji" + if t in (49, 17179869233, 21474836529, 154618822705, 12884901937, 270582939697): + return "link" + if t == 25769803825: + return "file" + if t == 10000: + return "system" + if t == 50: + return "voip" + if t == 244813135921: + return "quote" + if t == 8594229559345: + return "red_packet" + if t == 8589934592049: + return "transfer" + if t == 266287972401: + return "pat" + return "other" + + +def _weekday_name_zh(weekday_index: int) -> str: + labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"] + if 0 <= weekday_index < len(labels): + return labels[weekday_index] + return "" + + +def _kind_label_zh(kind: str) -> str: + return { + "text": "文字", + "emoji": "表情包", + "voice": "语音", + "image": "图片", + "video": "视频", + "link": "链接/小程序", + "file": "文件", + "system": "系统消息", + "other": "其他", + }.get(kind, kind) + + +def compute_global_overview_stats( + *, + account_dir: Path, + year: int, + sender_username: str | None = None, +) -> GlobalOverviewStats: + """Compute global overview stats for wrapped. + + Notes: + - Best-effort only. Different WeChat versions may store different message types/values. + - We default to excluding `biz_message*.db` to reduce noise. + - If `sender_username` is provided, only messages sent by that sender are counted + (best-effort). + """ + + start_ts, end_ts = _year_range_epoch_seconds(year) + sender = str(sender_username).strip() if sender_username and str(sender_username).strip() else None + + # Prefer using the unified search index if available; it already merges all shards/tables. + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1" + ).fetchone() + is not None + ) + if has_fts: + t0 = time.time() + + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + where = f"{ts_expr} >= ? AND {ts_expr} < ? AND db_stem NOT LIKE 'biz_message%'" + params: tuple[Any, ...] = (start_ts, end_ts) + if sender: + where += " AND sender_username = ?" + params = (start_ts, end_ts, sender) + + # activeDays + latest_ts in one pass. + sql_meta = ( + "SELECT " + "COUNT(DISTINCT date(datetime(ts, 'unixepoch', 'localtime'))) AS active_days, " + "MAX(ts) AS latest_ts " + "FROM (" + f" SELECT {ts_expr} AS ts" + " FROM message_fts" + f" WHERE {where}" + ") sub" + ) + r = conn.execute(sql_meta, params).fetchone() + active_days_i = int((r[0] if r else 0) or 0) + latest_ts_i = int((r[1] if r else 0) or 0) + + # local_type distribution (for message kind). + local_type_counts_i: Counter[int] = Counter() + kind_counts_i: Counter[str] = Counter() + try: + rows = conn.execute( + f"SELECT CAST(local_type AS INTEGER) AS lt, COUNT(1) AS cnt " + f"FROM message_fts WHERE {where} GROUP BY lt", + params, + ).fetchall() + except Exception: + rows = [] + for rr in rows: + if not rr: + continue + try: + lt = int(rr[0] or 0) + cnt = int(rr[1] or 0) + except Exception: + continue + if cnt <= 0: + continue + local_type_counts_i[lt] += cnt + kind_counts_i[_kind_from_local_type(lt)] += cnt + + # Top conversations (best-effort: only needs a small LIMIT). + per_username_counts_i: Counter[str] = Counter() + try: + rows_u = conn.execute( + f"SELECT username, COUNT(1) AS cnt " + f"FROM message_fts WHERE {where} " + "GROUP BY username ORDER BY cnt DESC LIMIT 400", + params, + ).fetchall() + except Exception: + rows_u = [] + for rr in rows_u: + if not rr: + continue + u = str(rr[0] or "").strip() + if not u: + continue + try: + cnt = int(rr[1] or 0) + except Exception: + cnt = 0 + if cnt > 0: + per_username_counts_i[u] = cnt + + # Top phrases (short text only). + phrase_counts_i: Counter[str] = Counter() + try: + rows_p = conn.execute( + f"SELECT \"text\" AS txt, COUNT(1) AS cnt " + f"FROM message_fts WHERE {where} AND render_type = 'text' " + " AND \"text\" IS NOT NULL " + " AND TRIM(\"text\") != '' " + " AND LENGTH(TRIM(\"text\")) <= 12 " + "GROUP BY txt ORDER BY cnt DESC LIMIT 400", + params, + ).fetchall() + except Exception: + rows_p = [] + for rr in rows_p: + if not rr: + continue + phrase = _normalize_phrase(rr[0]) + if not phrase: + continue + try: + cnt = int(rr[1] or 0) + except Exception: + cnt = 0 + if cnt > 0: + phrase_counts_i[phrase] += cnt + + def pick_top(counter: Counter[Any]) -> Optional[tuple[Any, int]]: + if not counter: + return None + best_item = max(counter.items(), key=lambda kv: (kv[1], str(kv[0]))) + if best_item[1] <= 0: + return None + return best_item[0], int(best_item[1]) + + def is_keep_username(u: str) -> bool: + return _should_keep_session(u, include_official=False) + + contact_counts_i = Counter( + { + u: c + for u, c in per_username_counts_i.items() + if (not u.endswith("@chatroom")) and is_keep_username(u) + } + ) + group_counts_i = Counter( + {u: c for u, c in per_username_counts_i.items() if u.endswith("@chatroom") and is_keep_username(u)} + ) + top_contact = pick_top(contact_counts_i) + top_group = pick_top(group_counts_i) + top_phrase = pick_top(phrase_counts_i) + + total_messages = int(sum(local_type_counts_i.values())) + logger.info( + "Wrapped card#0 overview computed (search index): account=%s year=%s total=%s active_days=%s sender=%s db=%s elapsed=%.2fs", + str(account_dir.name or "").strip(), + year, + total_messages, + active_days_i, + sender or "*", + str(index_path.name), + time.time() - t0, + ) + + return GlobalOverviewStats( + year=year, + active_days=active_days_i, + local_type_counts={int(k): int(v) for k, v in local_type_counts_i.items()}, + kind_counts={str(k): int(v) for k, v in kind_counts_i.items()}, + latest_ts=latest_ts_i, + top_phrase=(str(top_phrase[0]), int(top_phrase[1])) if top_phrase else None, + top_emoji=None, + top_contact=(str(top_contact[0]), int(top_contact[1])) if top_contact else None, + top_group=(str(top_group[0]), int(top_group[1])) if top_group else None, + ) + finally: + try: + conn.close() + except Exception: + pass + + # Resolve all sessions (usernames) so we can map msg_xxx/chat_xxx tables back to usernames. + session_usernames = _list_session_usernames(account_dir / "session.db") + md5_to_username: dict[str, str] = {} + table_to_username: dict[str, str] = {} + for u in session_usernames: + md5_hex = hashlib.md5(u.encode("utf-8")).hexdigest().lower() + md5_to_username[md5_hex] = u + table_to_username[f"msg_{md5_hex}"] = u + table_to_username[f"chat_{md5_hex}"] = u + + def resolve_username_from_table(table_name: str) -> Optional[str]: + ln = str(table_name or "").lower() + u = table_to_username.get(ln) + if u: + return u + m = _MD5_HEX_RE.search(ln) + if m: + return md5_to_username.get(m.group(0).lower()) + return None + + db_paths = _iter_message_db_paths(account_dir) + db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")] + + # Convert millisecond timestamps defensively. + ts_expr = ( + "CASE WHEN create_time > 1000000000000 THEN CAST(create_time/1000 AS INTEGER) ELSE create_time END" + ) + + local_type_counts: Counter[int] = Counter() + kind_counts: Counter[str] = Counter() + active_days: set[str] = set() + per_username_counts: Counter[str] = Counter() + phrase_counts: Counter[str] = Counter() + + latest_ts = 0 + + t0 = time.time() + for db_path in db_paths: + if not db_path.exists(): + continue + conn: sqlite3.Connection | None = None + try: + conn = sqlite3.connect(str(db_path)) + tables = _list_message_tables(conn) + if not tables: + continue + + sender_rowid: int | None = None + if sender: + try: + r2 = conn.execute( + "SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", + (sender,), + ).fetchone() + if r2 is not None and r2[0] is not None: + sender_rowid = int(r2[0]) + except Exception: + sender_rowid = None + # Can't reliably filter by sender for this shard; skip to avoid mixing directions. + if sender_rowid is None: + continue + + for table_name in tables: + qt = _quote_ident(table_name) + username = resolve_username_from_table(table_name) + sender_where = " AND real_sender_id = ?" if sender_rowid is not None else "" + params = (start_ts, end_ts, sender_rowid) if sender_rowid is not None else (start_ts, end_ts) + + # 1) local_type distribution + table total + sql_types = ( + "SELECT local_type, COUNT(1) AS cnt " + "FROM (" + f" SELECT local_type, {ts_expr} AS ts " + f" FROM {qt} " + f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}" + ") sub " + "GROUP BY local_type" + ) + try: + rows = conn.execute(sql_types, params).fetchall() + except Exception: + continue + if not rows: + continue + + table_total = 0 + table_text_cnt = 0 + for r in rows: + if not r: + continue + try: + lt = int(r[0] or 0) + except Exception: + lt = 0 + try: + cnt = int(r[1] or 0) + except Exception: + cnt = 0 + if cnt <= 0: + continue + table_total += cnt + local_type_counts[lt] += cnt + kind_counts[_kind_from_local_type(lt)] += cnt + if lt == 1: + table_text_cnt = cnt + + if table_total <= 0: + continue + if username: + per_username_counts[username] += table_total + + # 3) active days (distinct dates) + sql_days = ( + "SELECT DISTINCT date(datetime(ts, 'unixepoch', 'localtime')) AS d " + "FROM (" + f" SELECT {ts_expr} AS ts" + f" FROM {qt}" + f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}" + ") sub" + ) + try: + rows_d = conn.execute(sql_days, params).fetchall() + except Exception: + rows_d = [] + for rd in rows_d: + if not rd or not rd[0]: + continue + active_days.add(str(rd[0])) + + # 4) latest timestamp within this year + sql_max_ts = f"SELECT MAX({ts_expr}) AS mx FROM {qt} WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}" + try: + rmax = conn.execute(sql_max_ts, params).fetchone() + except Exception: + rmax = None + try: + mx = int((rmax[0] if rmax else 0) or 0) + except Exception: + mx = 0 + if mx > latest_ts: + latest_ts = mx + + # 5) top phrases (best-effort via short, repeated text messages) + if table_text_cnt > 0: + sql_phrase = ( + "SELECT message_content AS txt, COUNT(1) AS cnt " + f"FROM {qt} " + f"WHERE local_type = 1 " + f" AND {ts_expr} >= ? AND {ts_expr} < ?{sender_where} " + " AND message_content IS NOT NULL " + " AND TRIM(CAST(message_content AS TEXT)) != '' " + " AND LENGTH(TRIM(CAST(message_content AS TEXT))) <= 12 " + "GROUP BY txt " + "ORDER BY cnt DESC " + "LIMIT 60" + ) + try: + rows_p = conn.execute(sql_phrase, params).fetchall() + except Exception: + rows_p = [] + for rp in rows_p: + if not rp: + continue + phrase = _normalize_phrase(rp[0]) + if not phrase: + continue + try: + cnt = int(rp[1] or 0) + except Exception: + cnt = 0 + if cnt > 0: + phrase_counts[phrase] += cnt + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + + def pick_top(counter: Counter[Any]) -> Optional[tuple[Any, int]]: + if not counter: + return None + # Deterministic tie-breaker: key string ascending. + best_item = max(counter.items(), key=lambda kv: (kv[1], str(kv[0]))) + if best_item[1] <= 0: + return None + return best_item[0], int(best_item[1]) + + # Pick top contact & group (exclude official/service accounts by default). + def is_keep_username(u: str) -> bool: + return _should_keep_session(u, include_official=False) + + contact_counts = Counter({u: c for u, c in per_username_counts.items() if (not u.endswith("@chatroom")) and is_keep_username(u)}) + group_counts = Counter({u: c for u, c in per_username_counts.items() if u.endswith("@chatroom") and is_keep_username(u)}) + top_contact = pick_top(contact_counts) + top_group = pick_top(group_counts) + + top_phrase = pick_top(phrase_counts) + + total_messages = int(sum(local_type_counts.values())) + + logger.info( + "Wrapped card#0 overview computed: account=%s year=%s total=%s active_days=%s sender=%s dbs=%s elapsed=%.2fs", + str(account_dir.name or "").strip(), + year, + total_messages, + len(active_days), + sender or "*", + len(db_paths), + time.time() - t0, + ) + + return GlobalOverviewStats( + year=year, + active_days=len(active_days), + local_type_counts={int(k): int(v) for k, v in local_type_counts.items()}, + kind_counts={str(k): int(v) for k, v in kind_counts.items()}, + latest_ts=int(latest_ts), + top_phrase=(str(top_phrase[0]), int(top_phrase[1])) if top_phrase else None, + top_emoji=None, + top_contact=(str(top_contact[0]), int(top_contact[1])) if top_contact else None, + top_group=(str(top_group[0]), int(top_group[1])) if top_group else None, + ) + + +def build_card_00_global_overview( + *, + account_dir: Path, + year: int, + heatmap: WeekdayHourHeatmap | None = None, +) -> dict[str, Any]: + """Card #0: 年度全局概览(开场综合页,建议作为第2页)。""" + + sender = str(account_dir.name or "").strip() + heatmap = heatmap or compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=sender) + stats = compute_global_overview_stats(account_dir=account_dir, year=year, sender_username=sender) + + # Resolve display names for top sessions (best-effort). + contact_db_path = account_dir / "contact.db" + top_usernames: list[str] = [] + if stats.top_contact: + top_usernames.append(stats.top_contact[0]) + if stats.top_group: + top_usernames.append(stats.top_group[0]) + contact_rows = _load_contact_rows(contact_db_path, top_usernames) if top_usernames else {} + + top_contact_obj = None + if stats.top_contact: + u, cnt = stats.top_contact + row = contact_rows.get(u) + display = _pick_display_name(row, u) + avatar = _pick_avatar_url(row) or (_build_avatar_url(str(account_dir.name or ""), u) if u else "") + top_contact_obj = { + "username": u, + "displayName": display, + "maskedName": _mask_name(display), + "avatarUrl": avatar, + "messages": int(cnt), + "isGroup": False, + } + + top_group_obj = None + if stats.top_group: + u, cnt = stats.top_group + row = contact_rows.get(u) + display = _pick_display_name(row, u) + avatar = _pick_avatar_url(row) or (_build_avatar_url(str(account_dir.name or ""), u) if u else "") + top_group_obj = { + "username": u, + "displayName": display, + "maskedName": _mask_name(display), + "avatarUrl": avatar, + "messages": int(cnt), + "isGroup": True, + } + + # Derive the top "message kind". + top_kind = None + if stats.kind_counts: + kc = Counter(stats.kind_counts) + # Exclude mostly-unhelpful kinds from the "top" pick. + for drop in ("system", "other"): + if drop in kc: + del kc[drop] + if kc: + kind, count = max(kc.items(), key=lambda kv: (kv[1], str(kv[0]))) + ratio = (float(count) / float(heatmap.total_messages)) if heatmap.total_messages > 0 else 0.0 + top_kind = { + "kind": str(kind), + "label": _kind_label_zh(str(kind)), + "count": int(count), + "ratio": ratio, + } + + messages_per_day = 0.0 + if stats.active_days > 0: + messages_per_day = heatmap.total_messages / float(stats.active_days) + + most_active_hour: Optional[int] = None + most_active_weekday: Optional[int] = None + if heatmap.total_messages > 0: + hour_totals = [sum(heatmap.matrix[w][h] for w in range(7)) for h in range(24)] + most_active_hour = max(range(24), key=lambda h: (hour_totals[h], -h)) + + weekday_totals = [sum(heatmap.matrix[w][h] for h in range(24)) for w in range(7)] + most_active_weekday = max(range(7), key=lambda w: (weekday_totals[w], -w)) + + most_active_weekday_name = _weekday_name_zh(most_active_weekday or -1) if most_active_weekday is not None else "" + + highlight = None + if stats.latest_ts > 0: + dt = datetime.fromtimestamp(int(stats.latest_ts)) + highlight = { + "timestamp": int(stats.latest_ts), + "date": dt.strftime("%Y-%m-%d"), + "time": dt.strftime("%H:%M"), + # Keep it privacy-safe by default: no content/object here. + "action": "你还在微信里发送消息", + } + + lines: list[str] = [] + if heatmap.total_messages > 0: + lines.append(f"今年以来,你在微信里发送了 {heatmap.total_messages:,} 条消息,平均每天 {messages_per_day:.1f} 条。") + else: + lines.append("今年以来,你在微信里还没有发出聊天消息。") + + if stats.active_days > 0: + if most_active_hour is not None and most_active_weekday_name: + lines.append(f"和微信共度的 {stats.active_days} 天里,你最常在 {most_active_hour} 点出没;{most_active_weekday_name}是你最爱聊天的日子。") + else: + lines.append(f"和微信共度的 {stats.active_days} 天里,你留下了很多对话的痕迹。") + + if top_contact_obj or top_group_obj: + parts: list[str] = [] + if top_contact_obj: + parts.append(f"你发消息最多的人是「{top_contact_obj['maskedName']}」({int(top_contact_obj['messages']):,} 条)") + if top_group_obj: + parts.append(f"你最常发言的群是「{top_group_obj['maskedName']}」({int(top_group_obj['messages']):,} 条)") + if parts: + lines.append(",".join(parts) + "。") + + if top_kind and top_kind.get("count", 0) > 0: + pct = float(top_kind.get("ratio") or 0.0) * 100.0 + lines.append(f"你最常用的表达方式是{top_kind['label']}(占 {pct:.0f}%)。") + + if stats.top_phrase and stats.top_phrase[0] and stats.top_phrase[1] > 0: + phrase, cnt = stats.top_phrase + lines.append(f"你今年说得最多的一句话是「{phrase}」(共 {cnt:,} 次)。") + + # NOTE: We keep the `highlight` field in `data` for future use, but do not + # surface it in the page narrative for now (per product requirement). + + narrative = "一屏读懂你的年度微信聊天画像" + + return { + "id": 0, + "title": "年度全局概览", + "scope": "global", + "category": "A", + "status": "ok", + "kind": "global/overview", + "narrative": narrative, + "data": { + "year": int(year), + "totalMessages": int(heatmap.total_messages), + "activeDays": int(stats.active_days), + "messagesPerDay": messages_per_day, + "mostActiveHour": most_active_hour, + "mostActiveWeekday": most_active_weekday, + "mostActiveWeekdayName": most_active_weekday_name, + "topContact": top_contact_obj, + "topGroup": top_group_obj, + "topKind": top_kind, + "topPhrase": {"phrase": stats.top_phrase[0], "count": int(stats.top_phrase[1])} if stats.top_phrase else None, + "topEmoji": {"emoji": stats.top_emoji[0], "count": int(stats.top_emoji[1])} if stats.top_emoji else None, + "highlight": highlight, + "lines": lines, + }, + } diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_01_cyber_schedule.py b/src/wechat_decrypt_tool/wrapped/cards/card_01_cyber_schedule.py index ed256bb..a59111e 100644 --- a/src/wechat_decrypt_tool/wrapped/cards/card_01_cyber_schedule.py +++ b/src/wechat_decrypt_tool/wrapped/cards/card_01_cyber_schedule.py @@ -7,6 +7,7 @@ from datetime import datetime from pathlib import Path from typing import Any +from ...chat_search_index import get_chat_search_index_db_path from ...chat_helpers import _iter_message_db_paths, _quote_ident from ...logging_config import get_logger @@ -25,6 +26,54 @@ class WeekdayHourHeatmap: total_messages: int +def _get_time_personality(hour: int) -> str: + if 5 <= hour <= 8: + return "early_bird" + if 9 <= hour <= 12: + return "office_worker" + if 13 <= hour <= 17: + return "afternoon" + if 18 <= hour <= 23: + return "night_owl" + if 0 <= hour <= 4: + return "late_night" + return "unknown" + + +def _get_weekday_name(weekday_index: int) -> str: + if 0 <= weekday_index < len(_WEEKDAY_LABELS_ZH): + return _WEEKDAY_LABELS_ZH[weekday_index] + return "" + + +def _build_narrative(*, hour: int, weekday: str, total: int) -> str: + personality = _get_time_personality(hour) + + templates: dict[str, str] = { + "early_bird": ( + f"清晨 {hour:02d}:00,当城市还在沉睡,你已经开始了新一天的问候。" + f"{weekday}是你最健谈的一天,这一年你用 {total:,} 条消息记录了这些早起时光。" + ), + "office_worker": ( + f"忙碌的上午 {hour:02d}:00,是你最常敲击键盘的时刻。" + f"{weekday}最活跃,这一年你用 {total:,} 条消息把工作与生活都留在了对话里。" + ), + "afternoon": ( + f"午后的阳光里,{hour:02d}:00 是你最爱分享的时刻。" + f"{weekday}的聊天最热闹,这一年共 {total:,} 条消息串起了你的午后时光。" + ), + "night_owl": ( + f"夜幕降临,{hour:02d}:00 是你最常出没的时刻。" + f"{weekday}最活跃,这一年 {total:,} 条消息陪你把每个夜晚都聊得更亮。" + ), + "late_night": ( + f"当世界沉睡,凌晨 {hour:02d}:00 的你依然在线。" + f"{weekday}最活跃,这一年 {total:,} 条深夜消息,是你与这个世界的悄悄话。" + ), + } + return templates.get(personality, f"你在 {hour:02d}:00 最活跃") + + def _year_range_epoch_seconds(year: int) -> tuple[int, int]: # Use local time boundaries (same semantics as sqlite "localtime"). start = int(datetime(year, 1, 1).timestamp()) @@ -54,6 +103,7 @@ def _accumulate_db( start_ts: int, end_ts: int, matrix: list[list[int]], + sender_username: str | None = None, ) -> int: """Accumulate message counts from one message shard DB into matrix. @@ -77,9 +127,30 @@ def _accumulate_db( "CASE WHEN create_time > 1000000000000 THEN CAST(create_time/1000 AS INTEGER) ELSE create_time END" ) + # Optional sender filter (best-effort). When provided, we only count + # messages whose `real_sender_id` maps to `sender_username`. + sender_rowid: int | None = None + if sender_username and str(sender_username).strip(): + try: + r = conn.execute( + "SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", + (str(sender_username).strip(),), + ).fetchone() + if r is not None and r[0] is not None: + sender_rowid = int(r[0]) + except Exception: + sender_rowid = None + counted = 0 for table_name in tables: qt = _quote_ident(table_name) + sender_where = "" + params: tuple[Any, ...] + if sender_rowid is not None: + sender_where = " AND real_sender_id = ?" + params = (start_ts, end_ts, sender_rowid) + else: + params = (start_ts, end_ts) sql = ( "SELECT " # %w: 0..6 with Sunday=0, so shift to Monday=0..Sunday=6 @@ -89,12 +160,12 @@ def _accumulate_db( "FROM (" f" SELECT {ts_expr} AS ts" f" FROM {qt}" - f" WHERE {ts_expr} >= ? AND {ts_expr} < ?" + f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}" ") sub " "GROUP BY weekday, hour" ) try: - rows = conn.execute(sql, (start_ts, end_ts)).fetchall() + rows = conn.execute(sql, params).fetchall() except Exception: continue @@ -119,25 +190,114 @@ def _accumulate_db( pass -def compute_weekday_hour_heatmap(*, account_dir: Path, year: int) -> WeekdayHourHeatmap: +def compute_weekday_hour_heatmap(*, account_dir: Path, year: int, sender_username: str | None = None) -> WeekdayHourHeatmap: start_ts, end_ts = _year_range_epoch_seconds(year) matrix: list[list[int]] = [[0 for _ in range(24)] for _ in range(7)] total = 0 + # Prefer using our unified search index if available; it's much faster than scanning all msg tables. + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute( + "SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1" + ).fetchone() + is not None + ) + if has_fts: + # Convert millisecond timestamps defensively (some datasets store ms). + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + sender_clause = "" + if sender_username and str(sender_username).strip(): + sender_clause = " AND sender_username = ?" + sql = ( + "SELECT " + "((CAST(strftime('%w', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) + 6) % 7) AS weekday, " + "CAST(strftime('%H', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS hour, " + "COUNT(1) AS cnt " + "FROM (" + f" SELECT {ts_expr} AS ts" + " FROM message_fts" + f" WHERE {ts_expr} >= ? AND {ts_expr} < ?" + " AND db_stem NOT LIKE 'biz_message%'" + f"{sender_clause}" + ") sub " + "GROUP BY weekday, hour" + ) + + t0 = time.time() + try: + params: tuple[Any, ...] = (start_ts, end_ts) + if sender_username and str(sender_username).strip(): + params = (start_ts, end_ts, str(sender_username).strip()) + rows = conn.execute(sql, params).fetchall() + except Exception: + rows = [] + + for r in rows: + if not r: + continue + try: + w = int(r[0] or 0) + h = int(r[1] or 0) + cnt = int(r[2] or 0) + except Exception: + continue + if 0 <= w < 7 and 0 <= h < 24 and cnt > 0: + matrix[w][h] += cnt + total += cnt + + logger.info( + "Wrapped heatmap computed (search index): account=%s year=%s total=%s sender=%s db=%s elapsed=%.2fs", + str(account_dir.name or "").strip(), + year, + total, + str(sender_username).strip() if sender_username else "*", + str(index_path.name), + time.time() - t0, + ) + + return WeekdayHourHeatmap( + weekday_labels=list(_WEEKDAY_LABELS_ZH), + hour_labels=list(_HOUR_LABELS), + matrix=matrix, + total_messages=total, + ) + finally: + try: + conn.close() + except Exception: + pass + db_paths = _iter_message_db_paths(account_dir) # Default: exclude official/biz shards (biz_message*.db) to reduce noise. db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")] my_wxid = str(account_dir.name or "").strip() t0 = time.time() for db_path in db_paths: - total += _accumulate_db(db_path=db_path, start_ts=start_ts, end_ts=end_ts, matrix=matrix) + total += _accumulate_db( + db_path=db_path, + start_ts=start_ts, + end_ts=end_ts, + matrix=matrix, + sender_username=str(sender_username).strip() if sender_username else None, + ) logger.info( - "Wrapped card#1 heatmap computed: account=%s year=%s total=%s dbs=%s elapsed=%.2fs", + "Wrapped heatmap computed: account=%s year=%s total=%s sender=%s dbs=%s elapsed=%.2fs", my_wxid, year, total, + str(sender_username).strip() if sender_username else "*", len(db_paths), time.time() - t0, ) @@ -150,17 +310,36 @@ def compute_weekday_hour_heatmap(*, account_dir: Path, year: int) -> WeekdayHour ) -def build_card_01_cyber_schedule(*, account_dir: Path, year: int) -> dict[str, Any]: - """Card #1: 年度赛博作息表 (24x7 heatmap).""" +def build_card_01_cyber_schedule( + *, + account_dir: Path, + year: int, + heatmap: WeekdayHourHeatmap | None = None, +) -> dict[str, Any]: + """Card #1: 年度赛博作息表 (24x7 heatmap). - heatmap = compute_weekday_hour_heatmap(account_dir=account_dir, year=year) + `heatmap` can be provided by the caller to reuse computation across cards. + """ - narrative = "今年你没有聊天消息" + sender = str(account_dir.name or "").strip() + heatmap = heatmap or compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=sender) + + narrative = "今年你没有发出聊天消息" if heatmap.total_messages > 0: hour_totals = [sum(heatmap.matrix[w][h] for w in range(7)) for h in range(24)] # Deterministic: pick earliest hour on ties. most_active_hour = max(range(24), key=lambda h: (hour_totals[h], -h)) - narrative = f"你在 {most_active_hour:02d}:00 最活跃" + + weekday_totals = [sum(heatmap.matrix[w][h] for h in range(24)) for w in range(7)] + # Deterministic: pick earliest weekday on ties. + most_active_weekday = max(range(7), key=lambda w: (weekday_totals[w], -w)) + weekday_name = _get_weekday_name(most_active_weekday) + + narrative = _build_narrative( + hour=most_active_hour, + weekday=weekday_name, + total=heatmap.total_messages, + ) return { "id": 1, diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_02_message_chars.py b/src/wechat_decrypt_tool/wrapped/cards/card_02_message_chars.py new file mode 100644 index 0000000..2a0428e --- /dev/null +++ b/src/wechat_decrypt_tool/wrapped/cards/card_02_message_chars.py @@ -0,0 +1,804 @@ +from __future__ import annotations + +import math +import random +import sqlite3 +import time +from collections import Counter +from datetime import datetime +from pathlib import Path +from typing import Any, Optional + +from pypinyin import lazy_pinyin, Style + +from ...chat_helpers import _decode_message_content, _iter_message_db_paths, _quote_ident +from ...chat_search_index import get_chat_search_index_db_path +from ...logging_config import get_logger + +logger = get_logger(__name__) + + +# 键盘布局中用于“磨损”展示的按键(字母 + 数字 + 常用标点)。 +# 注意:功能键(Tab/Enter/Backspace 等)不统计;空格键单独放在 spaceHits。 +_KEYBOARD_KEYS = ( + list("`1234567890-=") + + list("qwertyuiop[]\\") + + list("asdfghjkl;\'") + + list("zxcvbnm,./") +) +_KEYBOARD_KEY_SET = set(_KEYBOARD_KEYS) + +# 将“显示字符”映射到键盘上的“实际按键”(用基础键位表示,如 '!' => '1', '?' => '/')。 +_CHAR_TO_KEY: dict[str, str] = { + # ASCII shifted symbols + "~": "`", + "!": "1", + "@": "2", + "#": "3", + "$": "4", + "%": "5", + "^": "6", + "&": "7", + "*": "8", + "(": "9", + ")": "0", + "_": "-", + "+": "=", + "{": "[", + "}": "]", + "|": "\\", + ":": ";", + '"': "'", + "<": ",", + ">": ".", + "?": "/", + # Common fullwidth / CJK punctuation (approximate key mapping) + "~": "`", + "!": "1", + "@": "2", + "#": "3", + "$": "4", + "%": "5", + "^": "6", + "&": "7", + "*": "8", + "(": "9", + ")": "0", + "¥": "4", + "¥": "4", + "_": "-", + "+": "=", + "{": "[", + "}": "]", + "|": "\\", + ":": ";", + """: "'", + "<": ",", + ">": ".", + "?": "/", + ",": ",", + "、": ",", + "。": ".", + ".": ".", + ";": ";", + "“": "'", + "”": "'", + "‘": "'", + "’": "'", + "【": "[", + "】": "]", + "《": ",", + "》": ".", + "—": "-", + "-": "-", + "=": "=", + "/": "/", + "\": "\\", + "·": "`", # 常见:中文输入法下“·”常用 ` 键打出 + "…": ".", # 近似处理:省略号按 '.' 计 +} + +# 默认拼音字母频率分布(用于:有中文但采样不足时的兜底估算) +_DEFAULT_PINYIN_FREQ = { + "a": 0.121, + "i": 0.118, + "n": 0.098, + "e": 0.089, + "u": 0.082, + "g": 0.072, + "h": 0.065, + "o": 0.052, + "z": 0.048, + "s": 0.042, + "x": 0.038, + "y": 0.036, + "d": 0.032, + "l": 0.028, + "j": 0.026, + "b": 0.022, + "c": 0.020, + "w": 0.018, + "m": 0.016, + "f": 0.014, + "t": 0.012, + "r": 0.010, + "p": 0.009, + "k": 0.007, + "q": 0.005, + "v": 0.001, +} +_AVG_PINYIN_LEN = 2.8 + + +def _is_cjk_han(ch: str) -> bool: + """是否为中文汉字(用于拼音估算)。""" + if not ch: + return False + o = ord(ch) + return (0x4E00 <= o <= 0x9FFF) or (0x3400 <= o <= 0x4DBF) + + +def _char_to_key(ch: str) -> str | None: + """将单个字符映射为键盘按键 code(与前端键盘布局的 code 保持一致)。""" + if not ch: + return None + + # Fullwidth digits: '0'..'9' + if "0" <= ch <= "9": + return chr(ord(ch) - ord("0") + ord("0")) + + if ch in _KEYBOARD_KEY_SET: + return ch + + mapped = _CHAR_TO_KEY.get(ch) + if mapped is not None: + return mapped + + if ch.isalpha(): + low = ch.lower() + if low in _KEYBOARD_KEY_SET: + return low + + return None + + +def _update_keyboard_counters( + text: str, + *, + direct_counter: Counter, + pinyin_counter: Counter, + pinyin_cache: dict[str, str], + do_pinyin: bool, +) -> tuple[int, int, int]: + """ + 扫描一条消息文本,累加: + - direct_counter: 非中文汉字部分(英文/数字/标点)可直接映射到按键的统计(精确) + - pinyin_counter: 中文汉字部分的拼音字母统计(仅当 do_pinyin=True 时才做;用于采样估算) + 并返回 (nonspace_chars, cjk_han_chars, space_chars)。 + """ + if not text: + return 0, 0, 0 + + nonspace = 0 + cjk = 0 + spaces = 0 + + for ch in text: + # 真实可见空格:统计进 spaceHits(不计入 sentChars/receivedChars 的口径) + if ch == " " or ch == "\u3000": + spaces += 1 + continue + if ch.isspace(): + continue + + nonspace += 1 + + if _is_cjk_han(ch): + cjk += 1 + if do_pinyin: + py = pinyin_cache.get(ch) + if py is None: + lst = lazy_pinyin(ch, style=Style.NORMAL) + py = (lst[0] or "").lower() if lst else "" + pinyin_cache[ch] = py + for letter in py: + # pypinyin 在 Style.NORMAL 下通常只会给出 a-z(含 ü=>v),这里再做一次过滤。 + if letter in _KEYBOARD_KEY_SET: + pinyin_counter[letter] += 1 + continue + + k = _char_to_key(ch) + if k is not None: + direct_counter[k] += 1 + + return nonspace, cjk, spaces + + +def compute_keyboard_stats(*, account_dir: Path, year: int, sample_rate: float = 1.0) -> dict[str, Any]: + """ + 统计键盘敲击数据。 + + - 英文/数字/标点:可直接从消息文本映射到按键(精确统计) + - 中文汉字:需要拼音转换,成本高;对“消息”做采样(sample_rate)后估算总体拼音字母分布 + """ + start_ts, end_ts = _year_range_epoch_seconds(year) + my_username = str(account_dir.name or "").strip() + + sample_rate = max(0.0, min(1.0, float(sample_rate))) + + direct_counter: Counter[str] = Counter() + pinyin_counter: Counter[str] = Counter() + pinyin_cache: dict[str, str] = {} + + total_cjk_chars = 0 + sampled_cjk_chars = 0 + actual_space_chars = 0 + + total_messages = 0 + sampled_messages = 0 + used_index = False + + # 优先使用搜索索引(更快) + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone() + is not None + ) + if has_fts and my_username: + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + where = ( + f"{ts_expr} >= ? AND {ts_expr} < ? " + "AND db_stem NOT LIKE 'biz_message%' " + "AND render_type = 'text' " + "AND \"text\" IS NOT NULL " + "AND TRIM(CAST(\"text\" AS TEXT)) != '' " + "AND sender_username = ?" + ) + + sql = f"SELECT \"text\" FROM message_fts WHERE {where}" + try: + cur = conn.execute(sql, (start_ts, end_ts, my_username)) + used_index = True + for row in cur: + txt = str(row[0] or "").strip() + if not txt: + continue + total_messages += 1 + + if sample_rate >= 1.0: + do_sample = True + elif sample_rate <= 0.0: + do_sample = False + else: + do_sample = random.random() < sample_rate + + if do_sample: + sampled_messages += 1 + + _, cjk, spaces = _update_keyboard_counters( + txt, + direct_counter=direct_counter, + pinyin_counter=pinyin_counter, + pinyin_cache=pinyin_cache, + do_pinyin=do_sample, + ) + total_cjk_chars += cjk + actual_space_chars += spaces + if do_sample: + sampled_cjk_chars += cjk + except Exception: + used_index = False + finally: + try: + conn.close() + except Exception: + pass + + # 如果索引不可用,回退到直接扫描(慢,但兼容) + if not used_index: + db_paths = _iter_message_db_paths(account_dir) + for db_path in db_paths: + try: + if db_path.name.lower().startswith("biz_message"): + continue + except Exception: + pass + if not db_path.exists(): + continue + + conn: sqlite3.Connection | None = None + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.text_factory = bytes + + my_rowid: Optional[int] + try: + r2 = conn.execute("SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", (my_username,)).fetchone() + my_rowid = int(r2[0]) if r2 and r2[0] is not None else None + except Exception: + my_rowid = None + + if my_rowid is None: + continue + + tables = _list_message_tables(conn) + if not tables: + continue + + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + + for table in tables: + qt = _quote_ident(table) + sql = ( + "SELECT real_sender_id, message_content, compress_content " + f"FROM {qt} " + "WHERE local_type = 1 " + f" AND {ts_expr} >= ? AND {ts_expr} < ?" + ) + try: + cur = conn.execute(sql, (start_ts, end_ts)) + except Exception: + continue + + for r in cur: + try: + rsid = int(r["real_sender_id"] or 0) + except Exception: + rsid = 0 + + if rsid != my_rowid: + continue + + txt = "" + try: + txt = _decode_message_content(r["compress_content"], r["message_content"]).strip() + except Exception: + txt = "" + if not txt: + continue + total_messages += 1 + if sample_rate >= 1.0: + do_sample = True + elif sample_rate <= 0.0: + do_sample = False + else: + do_sample = random.random() < sample_rate + if do_sample: + sampled_messages += 1 + _, cjk, spaces = _update_keyboard_counters( + txt, + direct_counter=direct_counter, + pinyin_counter=pinyin_counter, + pinyin_cache=pinyin_cache, + do_pinyin=do_sample, + ) + total_cjk_chars += cjk + actual_space_chars += spaces + if do_sample: + sampled_cjk_chars += cjk + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + + # 中文拼音部分:按“中文汉字数量”缩放(比按总字符缩放更合理,也能让数字/标点更准确) + est_pinyin_counter: Counter[str] = Counter() + sampled_pinyin_hits = int(sum(pinyin_counter.values())) + if total_cjk_chars > 0: + if sampled_cjk_chars > 0 and sampled_pinyin_hits > 0: + scale_factor = total_cjk_chars / sampled_cjk_chars + for k, cnt in pinyin_counter.items(): + est_pinyin_counter[k] = int(round(cnt * scale_factor)) + else: + # 兜底:有中文但采样不足(或采样中无法提取拼音),用默认分布估算 + total_pinyin_hits = int(total_cjk_chars * _AVG_PINYIN_LEN) + for k, freq in _DEFAULT_PINYIN_FREQ.items(): + est_pinyin_counter[k] = int(freq * total_pinyin_hits) + + key_hits_counter: Counter[str] = Counter() + key_hits_counter.update(direct_counter) + key_hits_counter.update(est_pinyin_counter) + + key_hits: dict[str, int] = {k: int(key_hits_counter.get(k, 0)) for k in _KEYBOARD_KEYS} + total_non_space_hits = int(sum(key_hits.values())) + + # 空格键:= 真实空格(如英文句子) + 中文拼音选词带来的“隐含空格”(粗略估算) + implied_space_hits = int(sum(est_pinyin_counter.values()) * 0.15) + space_hits = int(actual_space_chars + implied_space_hits) + + total_key_hits = int(total_non_space_hits + space_hits) + + # 频率只对“非空格键”归一化;空格频率由 spaceHits 单独给出 + key_frequency: dict[str, float] = {} + for k in _KEYBOARD_KEYS: + key_frequency[k] = (key_hits.get(k, 0) / total_non_space_hits) if total_non_space_hits > 0 else 0.0 + + logger.info( + "Keyboard stats computed: account=%s year=%s sample_rate=%.2f msgs=%d sampled=%d cjk=%d sampled_cjk=%d total_hits=%d", + my_username, + year, + float(sample_rate), + int(total_messages), + int(sampled_messages), + int(total_cjk_chars), + int(sampled_cjk_chars), + int(total_key_hits), + ) + + return { + "totalKeyHits": total_key_hits, + "keyHits": key_hits, + "keyFrequency": key_frequency, + "spaceHits": space_hits, + } + + +def _year_range_epoch_seconds(year: int) -> tuple[int, int]: + # Use local time boundaries (same semantics as sqlite "localtime"). + start = int(datetime(year, 1, 1).timestamp()) + end = int(datetime(year + 1, 1, 1).timestamp()) + return start, end + + +def _list_message_tables(conn: sqlite3.Connection) -> list[str]: + try: + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + except Exception: + return [] + names: list[str] = [] + for r in rows: + if not r or not r[0]: + continue + name = str(r[0]) + ln = name.lower() + if ln.startswith(("msg_", "chat_")): + names.append(name) + return names + + +# Book analogy table (for "sent chars"). +_BOOK_ANALOGIES: list[dict[str, Any]] = [ + {"min": 1, "max": 100_000, "level": "小量级", "options": ["一本《小王子》", "一本《解忧杂货店》"]}, + {"min": 100_000, "max": 500_000, "level": "中量级", "options": ["一本《三体Ⅰ:地球往事》", "一套《朝花夕拾+呐喊》(鲁迅经典合集)"]}, + {"min": 500_000, "max": 1_000_000, "level": "大量级", "options": ["一本《红楼梦》(全本)", "一本《百年孤独》(全本无删减)"]}, + {"min": 1_000_000, "max": 5_000_000, "level": "超大量级", "options": ["一套《三体》全三册", "一本《西游记》(全本白话文)"]}, + {"min": 5_000_000, "max": 10_000_000, "level": "千万级Ⅰ", "options": ["一套金庸武侠《射雕+神雕+倚天》(经典三部曲)", "一套《平凡的世界》全三册"]}, + {"min": 10_000_000, "max": 50_000_000, "level": "千万级Ⅱ", "options": ["一套《哈利·波特》全七册(中文版)", "一本《资治通鉴》(文白对照全本)"]}, + {"min": 50_000_000, "max": 100_000_000, "level": "亿级Ⅰ", "options": ["一套《冰与火之歌》全系列(中文版)", "一本《史记》(全本含集解索隐正义)"]}, + {"min": 100_000_000, "max": 500_000_000, "level": "亿级Ⅱ", "options": ["一套《中国大百科全书》(单卷本全册)", "一套《金庸武侠全集》(15部完整版)"]}, + {"min": 500_000_000, "max": None, "level": "亿级Ⅲ", "options": ["一套《四库全书》(文津阁精选集)", "一套《大英百科全书》(国际完整版)"]}, +] + + +# A4 analogy table (for "received chars"). +# Estimation assumptions: +# - A4 (single side) holds about 1700 chars (depends on font/spacing; this is an approximation). +# - 70g A4 paper thickness is roughly 0.1mm => 100 sheets ≈ 1cm. +_A4_CHARS_PER_SHEET = 1700 +_A4_SHEETS_PER_CM = 100.0 + +# "Level" is a coarse grouping by character count; the physical object analogy is picked by the +# estimated stacked height (so the text stays self-consistent). +_A4_LEVELS: list[dict[str, Any]] = [ + {"min": 1, "max": 100_000, "level": "小量级"}, + {"min": 100_000, "max": 500_000, "level": "中量级"}, + {"min": 500_000, "max": 1_000_000, "level": "大量级"}, + {"min": 1_000_000, "max": 5_000_000, "level": "超大量级"}, + {"min": 5_000_000, "max": 10_000_000, "level": "千万级Ⅰ"}, + {"min": 10_000_000, "max": 50_000_000, "level": "千万级Ⅱ"}, + {"min": 50_000_000, "max": 100_000_000, "level": "亿级Ⅰ"}, + {"min": 100_000_000, "max": 500_000_000, "level": "亿级Ⅱ"}, + {"min": 500_000_000, "max": None, "level": "亿级Ⅲ"}, +] + +# Physical object analogies by stacked height (cm). +_A4_HEIGHT_ANALOGIES: list[dict[str, Any]] = [ + {"minCm": 0.0, "maxCm": 0.5, "objects": ["1枚硬币的厚度", "1张银行卡的厚度"]}, + {"minCm": 0.5, "maxCm": 2.0, "objects": ["1叠便利贴", "1本薄款软皮笔记本"]}, + {"minCm": 2.0, "maxCm": 6.0, "objects": ["3-5本加厚硬壳笔记本", "1本厚词典"]}, + {"minCm": 6.0, "maxCm": 30.0, "objects": ["10本办公台账", "1个矮款文件柜单层满装"]}, + {"minCm": 30.0, "maxCm": 60.0, "objects": ["1个标准办公文件盒", "1个登机箱(约55cm)"]}, + {"minCm": 60.0, "maxCm": 200.0, "objects": ["1.7-1.8m成年人身高", "2个办公文件柜叠放"]}, + {"minCm": 200.0, "maxCm": 600.0, "objects": ["2层普通住宅层高", "1棵成年矮树(枇杷树/橘子树)"]}, + {"minCm": 600.0, "maxCm": 2500.0, "objects": ["4-8层居民楼层高", "1棵成年大树(梧桐树/樟树)"]}, + {"minCm": 2500.0, "maxCm": 5000.0, "objects": ["10-18层小高层住宅", "1栋小型临街写字楼"]}, + {"minCm": 5000.0, "maxCm": 25000.0, "objects": ["20-80层超高层住宅", "城市核心区小高层地标"]}, + {"minCm": 25000.0, "maxCm": None, "objects": ["1栋城市核心超高层写字楼", "国内中型摩天大楼(约100层)"]}, +] + + +def _pick_option(options: list[str], *, seed: int) -> str: + if not options: + return "" + idx = abs(int(seed)) % len(options) + return str(options[idx] or "").strip() + + +def _pick_book_analogy(chars: int) -> Optional[dict[str, Any]]: + n = int(chars or 0) + if n <= 0: + return None + + for row in _BOOK_ANALOGIES: + lo = int(row["min"] or 0) + hi = row.get("max") + if n < lo: + continue + if hi is None or n < int(hi): + picked = _pick_option(list(row.get("options") or []), seed=n) + return { + "level": str(row.get("level") or ""), + "book": picked, + "text": f"相当于写了{picked}" if picked else "", + } + return None + + +def _format_height(height_cm: float) -> str: + try: + cm = float(height_cm) + except Exception: + cm = 0.0 + if cm <= 0: + return "0cm" + if cm < 1: + mm = cm * 10.0 + return f"{mm:.1f}mm" + if cm < 100: + if cm < 10: + return f"{cm:.1f}cm" + return f"{cm:.0f}cm" + m = cm / 100.0 + if m < 10: + return f"{m:.1f}m" + return f"{m:.0f}m" + + +def _a4_stats(chars: int) -> dict[str, Any]: + # Rough estimate: 1 A4 page ~ 1700 chars; 100 pages ~ 1cm thick. + n = int(chars or 0) + if n <= 0: + return {"sheets": 0, "heightCm": 0.0, "heightText": "0cm"} + sheets = int(math.ceil(n / float(_A4_CHARS_PER_SHEET))) + height_cm = float(sheets) / float(_A4_SHEETS_PER_CM) + return {"sheets": int(sheets), "heightCm": float(height_cm), "heightText": _format_height(height_cm)} + + +def _pick_a4_analogy(chars: int) -> Optional[dict[str, Any]]: + n = int(chars or 0) + if n <= 0: + return None + + a4 = _a4_stats(n) + + level = "" + for row in _A4_LEVELS: + lo = int(row["min"] or 0) + hi = row.get("max") + if n < lo: + continue + if hi is None or n < int(hi): + level = str(row.get("level") or "") + break + + height_cm = float(a4.get("heightCm") or 0.0) + picked = "" + for row in _A4_HEIGHT_ANALOGIES: + lo = float(row.get("minCm") or 0.0) + hi = row.get("maxCm") + if height_cm < lo: + continue + if hi is None or height_cm < float(hi): + picked = _pick_option(list(row.get("objects") or []), seed=n) + break + + return { + "level": level, + "object": picked, + "a4": a4, + "text": ( + f"大约 {int(a4['sheets']):,} 张 A4,堆起来约 {a4['heightText']}" + (f",差不多是{picked}的高度" if picked else "") + ).strip(","), + } + + +def compute_text_message_char_counts(*, account_dir: Path, year: int) -> tuple[int, int]: + """Return (sent_chars, received_chars) for render_type='text' messages in the year.""" + + start_ts, end_ts = _year_range_epoch_seconds(year) + my_username = str(account_dir.name or "").strip() + + # Prefer search index when available. + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone() + is not None + ) + if has_fts: + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + where = ( + f"{ts_expr} >= ? AND {ts_expr} < ? " + "AND db_stem NOT LIKE 'biz_message%' " + "AND render_type = 'text' " + "AND \"text\" IS NOT NULL " + "AND TRIM(CAST(\"text\" AS TEXT)) != ''" + ) + + sql_total = f"SELECT COALESCE(SUM(LENGTH(REPLACE(\"text\", ' ', ''))), 0) AS chars FROM message_fts WHERE {where}" + r_total = conn.execute(sql_total, (start_ts, end_ts)).fetchone() + total_chars = int((r_total[0] if r_total else 0) or 0) + + if my_username: + sql_sent = f"{sql_total} AND sender_username = ?" + r_sent = conn.execute(sql_sent, (start_ts, end_ts, my_username)).fetchone() + sent_chars = int((r_sent[0] if r_sent else 0) or 0) + else: + sent_chars = 0 + + recv_chars = max(0, total_chars - sent_chars) + return sent_chars, recv_chars + finally: + try: + conn.close() + except Exception: + pass + + # Fallback: scan message shards directly (slower, but works without the index). + t0 = time.time() + sent_total = 0 + recv_total = 0 + + db_paths = _iter_message_db_paths(account_dir) + for db_path in db_paths: + try: + if db_path.name.lower().startswith("biz_message"): + continue + except Exception: + pass + if not db_path.exists(): + continue + + conn: sqlite3.Connection | None = None + try: + conn = sqlite3.connect(str(db_path)) + conn.row_factory = sqlite3.Row + conn.text_factory = bytes + + my_rowid: Optional[int] + try: + r2 = conn.execute("SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", (my_username,)).fetchone() + my_rowid = int(r2[0]) if r2 and r2[0] is not None else None + except Exception: + my_rowid = None + + tables = _list_message_tables(conn) + if not tables: + continue + + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + + for table in tables: + qt = _quote_ident(table) + sql = ( + "SELECT real_sender_id, message_content, compress_content " + f"FROM {qt} " + "WHERE local_type = 1 " + f" AND {ts_expr} >= ? AND {ts_expr} < ?" + ) + try: + cur = conn.execute(sql, (start_ts, end_ts)) + except Exception: + continue + + for r in cur: + try: + rsid = int(r["real_sender_id"] or 0) + except Exception: + rsid = 0 + txt = "" + try: + txt = _decode_message_content(r["compress_content"], r["message_content"]).strip() + except Exception: + txt = "" + if not txt: + continue + + # Match search index semantics: count non-whitespace characters. + cnt = 0 + for ch in txt: + if not ch.isspace(): + cnt += 1 + if cnt <= 0: + continue + + if my_rowid is not None and rsid == my_rowid: + sent_total += cnt + else: + recv_total += cnt + finally: + if conn is not None: + try: + conn.close() + except Exception: + pass + + logger.info( + "Wrapped card#2 message chars computed (fallback scan): account=%s year=%s sent=%s recv=%s dbs=%s elapsed=%.2fs", + str(account_dir.name or "").strip(), + year, + int(sent_total), + int(recv_total), + len(db_paths), + time.time() - t0, + ) + return int(sent_total), int(recv_total) + + +def build_card_02_message_chars(*, account_dir: Path, year: int) -> dict[str, Any]: + sent_chars, recv_chars = compute_text_message_char_counts(account_dir=account_dir, year=year) + + sent_book = _pick_book_analogy(sent_chars) + recv_a4 = _pick_a4_analogy(recv_chars) + + # 计算键盘敲击统计 + keyboard_stats = compute_keyboard_stats(account_dir=account_dir, year=year, sample_rate=1.0) + + if sent_chars > 0 and recv_chars > 0: + narrative = f"你今年在微信里打了 {sent_chars:,} 个字,也收到了 {recv_chars:,} 个字。" + elif sent_chars > 0: + narrative = f"你今年在微信里打了 {sent_chars:,} 个字。" + elif recv_chars > 0: + narrative = f"你今年在微信里收到了 {recv_chars:,} 个字。" + else: + narrative = "今年你还没有文字消息" + + return { + "id": 2, + "title": "年度消息字数", + "scope": "global", + "category": "C", + "status": "ok", + "kind": "text/message_chars", + "narrative": narrative, + "data": { + "year": int(year), + "sentChars": int(sent_chars), + "receivedChars": int(recv_chars), + "sentBook": sent_book, + "receivedA4": recv_a4, + "keyboard": keyboard_stats, + }, + } diff --git a/src/wechat_decrypt_tool/wrapped/service.py b/src/wechat_decrypt_tool/wrapped/service.py index 3e228f7..c865cba 100644 --- a/src/wechat_decrypt_tool/wrapped/service.py +++ b/src/wechat_decrypt_tool/wrapped/service.py @@ -1,27 +1,263 @@ from __future__ import annotations import json +import sqlite3 +import threading import time from datetime import datetime from pathlib import Path from typing import Any, Optional -from ..chat_helpers import _resolve_account_dir +from ..chat_helpers import _iter_message_db_paths, _quote_ident, _resolve_account_dir +from ..chat_search_index import get_chat_search_index_db_path from ..logging_config import get_logger -from .storage import wrapped_cache_path -from .cards.card_01_cyber_schedule import build_card_01_cyber_schedule +from .storage import wrapped_cache_dir, wrapped_cache_path +from .cards.card_00_global_overview import build_card_00_global_overview +from .cards.card_01_cyber_schedule import WeekdayHourHeatmap, build_card_01_cyber_schedule, compute_weekday_hour_heatmap +from .cards.card_02_message_chars import build_card_02_message_chars logger = get_logger(__name__) -# We implement cards strictly in the order of `docs/wechat_wrapped_ideas_feasibility.md`. -_IMPLEMENTED_UPTO_ID = 1 +# We use this number to version the cache filename so adding more cards won't accidentally serve +# an older partial cache. +_IMPLEMENTED_UPTO_ID = 2 +# Bump this when we change card payloads/ordering while keeping the same implemented_upto. +_CACHE_VERSION = 4 + + +# "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card. +# Keep this list in display order (same as the old monolithic `/api/wrapped/annual` response). +_WRAPPED_CARD_MANIFEST: tuple[dict[str, Any], ...] = ( + { + "id": 0, + "title": "年度全局概览", + "scope": "global", + "category": "A", + "kind": "global/overview", + }, + { + "id": 1, + "title": "年度赛博作息表", + "scope": "global", + "category": "A", + "kind": "time/weekday_hour_heatmap", + }, + { + "id": 2, + "title": "年度消息字数", + "scope": "global", + "category": "C", + "kind": "text/message_chars", + }, +) +_WRAPPED_CARD_ID_SET = {int(c["id"]) for c in _WRAPPED_CARD_MANIFEST} + + +# Prevent duplicated heavy computations when multiple card endpoints are hit concurrently. +_LOCKS: dict[str, threading.Lock] = {} +_LOCKS_GUARD = threading.Lock() + + +def _get_lock(key: str) -> threading.Lock: + with _LOCKS_GUARD: + lock = _LOCKS.get(key) + if lock is None: + lock = threading.Lock() + _LOCKS[key] = lock + return lock def _default_year() -> int: return datetime.now().year +def _list_message_tables(conn: sqlite3.Connection) -> list[str]: + try: + rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + except Exception: + return [] + names: list[str] = [] + for r in rows: + if not r or not r[0]: + continue + name = str(r[0]) + ln = name.lower() + if ln.startswith(("msg_", "chat_")): + names.append(name) + return names + + +def list_wrapped_available_years(*, account_dir: Path) -> list[int]: + """List years that have *any* chat messages for the account (best-effort). + + Prefer using `chat_search_index.db` (fast). If not available, fall back to scanning message + shard databases (slower, but works without the index). + """ + + # Try a tiny cache first (years don't change often, but scanning can be expensive). + cache_path = wrapped_cache_dir(account_dir) / "available_years.json" + max_mtime = 0 + try: + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + max_mtime = max(max_mtime, int(index_path.stat().st_mtime)) + except Exception: + pass + try: + for p in _iter_message_db_paths(account_dir): + try: + if p.name.lower().startswith("biz_message"): + continue + if p.exists(): + max_mtime = max(max_mtime, int(p.stat().st_mtime)) + except Exception: + continue + except Exception: + pass + + if cache_path.exists(): + try: + cached = json.loads(cache_path.read_text(encoding="utf-8")) + if isinstance(cached, dict): + sig = int(cached.get("max_mtime") or 0) + years = cached.get("years") + if sig == max_mtime and isinstance(years, list): + out: list[int] = [] + for x in years: + try: + y = int(x) + except Exception: + continue + if y > 0: + out.append(y) + out.sort(reverse=True) + return out + except Exception: + pass + + # Convert millisecond timestamps defensively (some datasets store ms). + # The expression yields epoch seconds as INTEGER. + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + + # Fast path: use our unified search index when available. + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone() + is not None + ) + if has_fts: + sql = ( + "SELECT " + "CAST(strftime('%Y', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS y, " + "COUNT(1) AS cnt " + "FROM (" + f" SELECT {ts_expr} AS ts" + " FROM message_fts" + f" WHERE {ts_expr} > 0" + " AND db_stem NOT LIKE 'biz_message%'" + ") sub " + "GROUP BY y " + "HAVING cnt > 0 " + "ORDER BY y DESC" + ) + try: + rows = conn.execute(sql).fetchall() + except Exception: + rows = [] + years: list[int] = [] + for r in rows: + if not r: + continue + try: + y = int(r[0]) + cnt = int(r[1] or 0) + except Exception: + continue + if y > 0 and cnt > 0: + years.append(y) + years.sort(reverse=True) + try: + cache_path.write_text( + json.dumps({"max_mtime": max_mtime, "years": years}, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except Exception: + pass + return years + finally: + try: + conn.close() + except Exception: + pass + + # Fallback: scan message shard DBs (may be slow on very large datasets, but only runs + # when the index does not exist). + year_counts: dict[int, int] = {} + db_paths = _iter_message_db_paths(account_dir) + db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")] + for db_path in db_paths: + if not db_path.exists(): + continue + conn = sqlite3.connect(str(db_path)) + try: + tables = _list_message_tables(conn) + if not tables: + continue + for table_name in tables: + qt = _quote_ident(table_name) + sql = ( + "SELECT " + "CAST(strftime('%Y', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS y, " + "COUNT(1) AS cnt " + "FROM (" + f" SELECT {ts_expr} AS ts" + f" FROM {qt}" + f" WHERE {ts_expr} > 0" + ") sub " + "GROUP BY y" + ) + try: + rows = conn.execute(sql).fetchall() + except Exception: + continue + for r in rows: + if not r: + continue + try: + y = int(r[0]) + cnt = int(r[1] or 0) + except Exception: + continue + if y > 0 and cnt > 0: + year_counts[y] = int(year_counts.get(y, 0)) + cnt + finally: + try: + conn.close() + except Exception: + pass + + years = [y for y, cnt in year_counts.items() if int(cnt) > 0] + years.sort(reverse=True) + try: + cache_path.write_text( + json.dumps({"max_mtime": max_mtime, "years": years}, ensure_ascii=False, indent=2), + encoding="utf-8", + ) + except Exception: + pass + return years + + def build_wrapped_annual_response( *, account: Optional[str], @@ -30,25 +266,47 @@ def build_wrapped_annual_response( ) -> dict[str, Any]: """Build annual wrapped response for the given account/year. - For now we only implement cards up to id=1. + For now we implement cards up to id=2 (plus a meta overview card id=0). """ account_dir = _resolve_account_dir(account) + + available_years = list_wrapped_available_years(account_dir=account_dir) + + # If the requested year has no messages, snap to the latest available year so the selector only + # shows years with data. y = int(year or _default_year()) + if available_years and y not in available_years: + y = int(available_years[0]) scope = "global" - cache_path = wrapped_cache_path(account_dir=account_dir, scope=scope, year=y, implemented_upto=_IMPLEMENTED_UPTO_ID) + cache_path = wrapped_cache_path( + account_dir=account_dir, + scope=scope, + year=y, + implemented_upto=_IMPLEMENTED_UPTO_ID, + options_tag=f"v{_CACHE_VERSION}", + ) if (not refresh) and cache_path.exists(): try: cached_obj = json.loads(cache_path.read_text(encoding="utf-8")) if isinstance(cached_obj, dict) and isinstance(cached_obj.get("cards"), list): cached_obj["cached"] = True + cached_obj["availableYears"] = available_years return cached_obj except Exception: pass cards: list[dict[str, Any]] = [] - cards.append(build_card_01_cyber_schedule(account_dir=account_dir, year=y)) + # Wrapped cards default to "messages sent by me" (outgoing), to avoid mixing directions + # in first-person narratives like "你最常...". + heatmap_sent = _get_or_compute_heatmap_sent(account_dir=account_dir, scope=scope, year=y, refresh=refresh) + # Page 2: global overview (page 1 is the frontend cover slide). + cards.append(build_card_00_global_overview(account_dir=account_dir, year=y, heatmap=heatmap_sent)) + # Page 3: cyber schedule heatmap. + cards.append(build_card_01_cyber_schedule(account_dir=account_dir, year=y, heatmap=heatmap_sent)) + # Page 4: message char counts (sent vs received). + cards.append(build_card_02_message_chars(account_dir=account_dir, year=y)) obj: dict[str, Any] = { "account": account_dir.name, @@ -57,6 +315,7 @@ def build_wrapped_annual_response( "username": None, "generated_at": int(time.time()), "cached": False, + "availableYears": available_years, "cards": cards, } @@ -67,3 +326,183 @@ def build_wrapped_annual_response( return obj + +def build_wrapped_annual_meta( + *, + account: Optional[str], + year: Optional[int], + refresh: bool = False, +) -> dict[str, Any]: + """Return a light-weight manifest for the Wrapped annual deck. + + This is meant to be fast so the frontend can render the deck first, then + request each page (card) lazily to avoid freezing on initial load. + """ + + account_dir = _resolve_account_dir(account) + + available_years = list_wrapped_available_years(account_dir=account_dir) + + # Keep the same year snapping semantics as `build_wrapped_annual_response`. + y = int(year or _default_year()) + if available_years and y not in available_years: + y = int(available_years[0]) + + if refresh: + # The manifest itself is static today, but we keep the flag for API symmetry. + pass + + return { + "account": account_dir.name, + "year": y, + "scope": "global", + "availableYears": available_years, + # Shallow copy so callers can't mutate our module-level tuple. + "cards": [dict(c) for c in _WRAPPED_CARD_MANIFEST], + } + + +def _wrapped_cache_suffix() -> str: + return f"_v{_CACHE_VERSION}" + + +def _wrapped_card_cache_path(*, account_dir: Path, scope: str, year: int, card_id: int) -> Path: + # Keep stable names; per-account directory already namespaces the files. + return wrapped_cache_dir(account_dir) / f"{scope}_{year}_card_{card_id}{_wrapped_cache_suffix()}.json" + + +def _wrapped_heatmap_sent_cache_path(*, account_dir: Path, scope: str, year: int) -> Path: + return wrapped_cache_dir(account_dir) / f"{scope}_{year}_heatmap_sent{_wrapped_cache_suffix()}.json" + + +def _load_cached_heatmap_sent(path: Path) -> WeekdayHourHeatmap | None: + if not path.exists(): + return None + try: + obj = json.loads(path.read_text(encoding="utf-8")) + except Exception: + return None + + if not isinstance(obj, dict): + return None + + weekday_labels = obj.get("weekdayLabels") + hour_labels = obj.get("hourLabels") + matrix = obj.get("matrix") + total = obj.get("totalMessages") + + if not isinstance(weekday_labels, list) or not isinstance(hour_labels, list) or not isinstance(matrix, list): + return None + + try: + total_i = int(total or 0) + except Exception: + total_i = 0 + + # Best-effort sanitize matrix to ints; keep shape if possible. + out_matrix: list[list[int]] = [] + for row in matrix: + if not isinstance(row, list): + return None + out_row: list[int] = [] + for v in row: + try: + out_row.append(int(v or 0)) + except Exception: + out_row.append(0) + out_matrix.append(out_row) + + return WeekdayHourHeatmap( + weekday_labels=[str(x) for x in weekday_labels], + hour_labels=[str(x) for x in hour_labels], + matrix=out_matrix, + total_messages=total_i, + ) + + +def _get_or_compute_heatmap_sent(*, account_dir: Path, scope: str, year: int, refresh: bool) -> WeekdayHourHeatmap: + path = _wrapped_heatmap_sent_cache_path(account_dir=account_dir, scope=scope, year=year) + lock = _get_lock(str(path)) + with lock: + if not refresh: + cached = _load_cached_heatmap_sent(path) + if cached is not None: + return cached + + heatmap = compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=account_dir.name) + try: + path.write_text( + json.dumps( + { + "weekdayLabels": heatmap.weekday_labels, + "hourLabels": heatmap.hour_labels, + "matrix": heatmap.matrix, + "totalMessages": heatmap.total_messages, + }, + ensure_ascii=False, + indent=2, + ), + encoding="utf-8", + ) + except Exception: + logger.exception("Failed to write wrapped heatmap cache: %s", path) + return heatmap + + +def build_wrapped_annual_card( + *, + account: Optional[str], + year: Optional[int], + card_id: int, + refresh: bool = False, +) -> dict[str, Any]: + """Build one Wrapped card (page) on-demand. + + The result is cached per account/year/card_id to avoid recomputing when users + flip back and forth between pages. + """ + + cid = int(card_id) + if cid not in _WRAPPED_CARD_ID_SET: + raise ValueError(f"Unknown Wrapped card id: {cid}") + + account_dir = _resolve_account_dir(account) + + available_years = list_wrapped_available_years(account_dir=account_dir) + y = int(year or _default_year()) + if available_years and y not in available_years: + y = int(available_years[0]) + + scope = "global" + cache_path = _wrapped_card_cache_path(account_dir=account_dir, scope=scope, year=y, card_id=cid) + + lock = _get_lock(str(cache_path)) + with lock: + if (not refresh) and cache_path.exists(): + try: + cached_obj = json.loads(cache_path.read_text(encoding="utf-8")) + if isinstance(cached_obj, dict) and int(cached_obj.get("id") or -1) == cid: + return cached_obj + except Exception: + pass + + heatmap_sent: WeekdayHourHeatmap | None = None + if cid in (0, 1): + heatmap_sent = _get_or_compute_heatmap_sent(account_dir=account_dir, scope=scope, year=y, refresh=refresh) + + if cid == 0: + card = build_card_00_global_overview(account_dir=account_dir, year=y, heatmap=heatmap_sent) + elif cid == 1: + card = build_card_01_cyber_schedule(account_dir=account_dir, year=y, heatmap=heatmap_sent) + elif cid == 2: + card = build_card_02_message_chars(account_dir=account_dir, year=y) + else: + # Should be unreachable due to _WRAPPED_CARD_ID_SET check. + raise ValueError(f"Unknown Wrapped card id: {cid}") + + try: + cache_path.write_text(json.dumps(card, ensure_ascii=False, indent=2), encoding="utf-8") + except Exception: + logger.exception("Failed to write wrapped card cache: %s", cache_path) + + return card diff --git a/uv.lock b/uv.lock index fd3b9b3..f2ccf74 100644 --- a/uv.lock +++ b/uv.lock @@ -498,6 +498,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a7/c4/3a096c6e701832443b957b9dac18a163103360d0c7f5842ca41695371148/pyinstaller_hooks_contrib-2025.11-py3-none-any.whl", hash = "sha256:777e163e2942474aa41a8e6d31ac1635292d63422c3646c176d584d04d971c34", size = 449478, upload-time = "2025-12-23T12:59:35.987Z" }, ] +[[package]] +name = "pypinyin" +version = "0.55.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b4/a4/784cf98c09e0dc22776b0d7d8a4a5b761218bcae4608c2416ce1e167c8af/pypinyin-0.55.0.tar.gz", hash = "sha256:b5711b3a0c6f76e67408ec6b2e3c4987a3a806b7c528076e7c7b86fcf0eaa66b", size = 839836, upload-time = "2025-07-20T12:01:50.657Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/b9/7b/4cabc76fcc21c3c7d5c671d8783984d30ac9d3bb387c4ba784fca3cdfa3a/pypinyin-0.55.0-py2.py3-none-any.whl", hash = "sha256:d53b1e8ad2cdb815fb2cb604ed3123372f5a28c6f447571244aca36fc62a286f", size = 840203, upload-time = "2025-07-20T12:01:48.535Z" }, +] + [[package]] name = "python-dotenv" version = "1.1.0" @@ -839,6 +848,7 @@ dependencies = [ { name = "pilk" }, { name = "psutil" }, { name = "pycryptodome" }, + { name = "pypinyin" }, { name = "python-multipart" }, { name = "pywin32", marker = "sys_platform == 'win32'" }, { name = "requests" }, @@ -862,6 +872,7 @@ requires-dist = [ { name = "psutil", specifier = ">=7.0.0" }, { name = "pycryptodome", specifier = ">=3.23.0" }, { name = "pyinstaller", marker = "extra == 'build'", specifier = ">=6.0.0" }, + { name = "pypinyin", specifier = ">=0.53.0" }, { name = "python-multipart", specifier = ">=0.0.6" }, { name = "pywin32", marker = "sys_platform == 'win32'", specifier = ">=310" }, { name = "requests", specifier = ">=2.32.4" },