feat(wrapped): 年度总结支持目录/单卡片接口,新增卡片#0/#2

- 新增 /api/wrapped/annual/meta 与 /api/wrapped/annual/cards/{card_id},用于前端懒加载单页卡片
- 增加卡片 manifest / 缓存版本控制 / 并发锁,避免重复计算与旧缓存串数据
- 新增 Card#0「年度全局概览」:活跃天数、top 联系人/群、常用表达/金句/表情等汇总
- 新增 Card#2「年度消息字数」:收发字数统计 + 类比呈现 + 键盘敲击统计
- 完善 Card#1 赛博作息表:支持更快的索引计算与更丰富的叙事文案
This commit is contained in:
2977094657
2026-01-31 14:54:11 +08:00
parent 79da96b2d3
commit 77a60bde70
7 changed files with 2246 additions and 21 deletions

View File

@@ -3,10 +3,10 @@ from __future__ import annotations
import asyncio
from typing import Optional
from fastapi import APIRouter, Query
from fastapi import APIRouter, HTTPException, Path, Query
from ..path_fix import PathFixRoute
from ..wrapped.service import build_wrapped_annual_response
from ..wrapped.service import build_wrapped_annual_card, build_wrapped_annual_meta, build_wrapped_annual_response
router = APIRouter(route_class=PathFixRoute)
@@ -17,7 +17,39 @@ async def wrapped_annual(
account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"),
refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"),
):
"""返回年度总结数据(目前仅实现第 1 个点子:年度赛博作息表)。"""
"""返回年度总结完整数据(一次性包含全部卡片,可能较慢)。"""
# This endpoint performs blocking sqlite/file IO, so run it in a worker thread.
return await asyncio.to_thread(build_wrapped_annual_response, account=account, year=year, refresh=refresh)
@router.get("/api/wrapped/annual/meta", summary="微信聊天年度总结WeChat Wrapped- 目录(轻量)")
async def wrapped_annual_meta(
year: Optional[int] = Query(None, description="年份(例如 2026。默认当前年份。"),
account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"),
refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"),
):
"""返回年度总结的目录/元信息,用于前端懒加载每一页。"""
return await asyncio.to_thread(build_wrapped_annual_meta, account=account, year=year, refresh=refresh)
@router.get("/api/wrapped/annual/cards/{card_id}", summary="微信聊天年度总结WeChat Wrapped- 单张卡片(按页加载)")
async def wrapped_annual_card(
card_id: int = Path(..., description="卡片ID与前端页面一一对应", ge=0),
year: Optional[int] = Query(None, description="年份(例如 2026。默认当前年份。"),
account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"),
refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"),
):
"""按卡片 ID 返回单页数据(避免首屏一次性计算全部卡片)。"""
try:
return await asyncio.to_thread(
build_wrapped_annual_card,
account=account,
year=year,
card_id=card_id,
refresh=refresh,
)
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e)) from e

View File

@@ -0,0 +1,759 @@
from __future__ import annotations
import hashlib
import re
import sqlite3
import time
from collections import Counter
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from .card_01_cyber_schedule import WeekdayHourHeatmap, compute_weekday_hour_heatmap
from ...chat_search_index import get_chat_search_index_db_path
from ...chat_helpers import (
_build_avatar_url,
_decode_sqlite_text,
_iter_message_db_paths,
_load_contact_rows,
_pick_avatar_url,
_pick_display_name,
_quote_ident,
_should_keep_session,
)
from ...logging_config import get_logger
logger = get_logger(__name__)
_MD5_HEX_RE = re.compile(r"(?i)[0-9a-f]{32}")
@dataclass(frozen=True)
class GlobalOverviewStats:
year: int
active_days: int
local_type_counts: dict[int, int]
kind_counts: dict[str, int]
latest_ts: int
top_phrase: Optional[tuple[str, int]]
top_emoji: Optional[tuple[str, int]]
top_contact: Optional[tuple[str, int]]
top_group: Optional[tuple[str, int]]
def _year_range_epoch_seconds(year: int) -> tuple[int, int]:
# Keep the same semantics as other parts of the project: local time boundaries.
start = int(datetime(year, 1, 1).timestamp())
end = int(datetime(year + 1, 1, 1).timestamp())
return start, end
def _list_message_tables(conn: sqlite3.Connection) -> list[str]:
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
except Exception:
return []
names: list[str] = []
for r in rows:
if not r or not r[0]:
continue
name = str(r[0])
ln = name.lower()
if ln.startswith(("msg_", "chat_")):
names.append(name)
return names
def _list_session_usernames(session_db_path: Path) -> list[str]:
if not session_db_path.exists():
return []
conn = sqlite3.connect(str(session_db_path))
try:
try:
rows = conn.execute("SELECT username FROM SessionTable").fetchall()
except sqlite3.OperationalError:
rows = conn.execute("SELECT username FROM Session").fetchall()
except Exception:
rows = []
finally:
conn.close()
out: list[str] = []
for r in rows:
if not r or not r[0]:
continue
u = str(r[0]).strip()
if u:
out.append(u)
return out
def _mask_name(name: str) -> str:
s = str(name or "").strip()
if not s:
return ""
if len(s) == 1:
return "*"
if len(s) == 2:
return s[0] + "*"
return s[0] + ("*" * (len(s) - 2)) + s[-1]
def _normalize_phrase(v: Any) -> str:
s = _decode_sqlite_text(v).strip()
if not s:
return ""
s = re.sub(r"\s+", " ", s).strip()
if not s:
return ""
if len(s) > 12:
return ""
lower = s.lower()
if "http://" in lower or "https://" in lower:
return ""
if s.startswith("<"):
return ""
# Avoid pure punctuation / numbers.
if not re.search(r"[\u4e00-\u9fffA-Za-z]", s):
return ""
return s
def _normalize_emoji(v: Any) -> str:
s = _decode_sqlite_text(v).strip()
if not s:
return ""
s = re.sub(r"\s+", " ", s).strip()
if not s or len(s) > 48:
return ""
if s.startswith("<"):
return ""
# If it is an md5 or some opaque token, don't show it.
if re.fullmatch(r"(?i)[0-9a-f]{32}", s):
return ""
return s
def _kind_from_local_type(t: int) -> str:
# See `_infer_local_type` in chat_helpers for known values.
if t == 1:
return "text"
if t == 3:
return "image"
if t == 34:
return "voice"
if t == 43:
return "video"
if t == 47:
return "emoji"
if t in (49, 17179869233, 21474836529, 154618822705, 12884901937, 270582939697):
return "link"
if t == 25769803825:
return "file"
if t == 10000:
return "system"
if t == 50:
return "voip"
if t == 244813135921:
return "quote"
if t == 8594229559345:
return "red_packet"
if t == 8589934592049:
return "transfer"
if t == 266287972401:
return "pat"
return "other"
def _weekday_name_zh(weekday_index: int) -> str:
labels = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
if 0 <= weekday_index < len(labels):
return labels[weekday_index]
return ""
def _kind_label_zh(kind: str) -> str:
return {
"text": "文字",
"emoji": "表情包",
"voice": "语音",
"image": "图片",
"video": "视频",
"link": "链接/小程序",
"file": "文件",
"system": "系统消息",
"other": "其他",
}.get(kind, kind)
def compute_global_overview_stats(
*,
account_dir: Path,
year: int,
sender_username: str | None = None,
) -> GlobalOverviewStats:
"""Compute global overview stats for wrapped.
Notes:
- Best-effort only. Different WeChat versions may store different message types/values.
- We default to excluding `biz_message*.db` to reduce noise.
- If `sender_username` is provided, only messages sent by that sender are counted
(best-effort).
"""
start_ts, end_ts = _year_range_epoch_seconds(year)
sender = str(sender_username).strip() if sender_username and str(sender_username).strip() else None
# Prefer using the unified search index if available; it already merges all shards/tables.
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
conn = sqlite3.connect(str(index_path))
try:
has_fts = (
conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1"
).fetchone()
is not None
)
if has_fts:
t0 = time.time()
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
where = f"{ts_expr} >= ? AND {ts_expr} < ? AND db_stem NOT LIKE 'biz_message%'"
params: tuple[Any, ...] = (start_ts, end_ts)
if sender:
where += " AND sender_username = ?"
params = (start_ts, end_ts, sender)
# activeDays + latest_ts in one pass.
sql_meta = (
"SELECT "
"COUNT(DISTINCT date(datetime(ts, 'unixepoch', 'localtime'))) AS active_days, "
"MAX(ts) AS latest_ts "
"FROM ("
f" SELECT {ts_expr} AS ts"
" FROM message_fts"
f" WHERE {where}"
") sub"
)
r = conn.execute(sql_meta, params).fetchone()
active_days_i = int((r[0] if r else 0) or 0)
latest_ts_i = int((r[1] if r else 0) or 0)
# local_type distribution (for message kind).
local_type_counts_i: Counter[int] = Counter()
kind_counts_i: Counter[str] = Counter()
try:
rows = conn.execute(
f"SELECT CAST(local_type AS INTEGER) AS lt, COUNT(1) AS cnt "
f"FROM message_fts WHERE {where} GROUP BY lt",
params,
).fetchall()
except Exception:
rows = []
for rr in rows:
if not rr:
continue
try:
lt = int(rr[0] or 0)
cnt = int(rr[1] or 0)
except Exception:
continue
if cnt <= 0:
continue
local_type_counts_i[lt] += cnt
kind_counts_i[_kind_from_local_type(lt)] += cnt
# Top conversations (best-effort: only needs a small LIMIT).
per_username_counts_i: Counter[str] = Counter()
try:
rows_u = conn.execute(
f"SELECT username, COUNT(1) AS cnt "
f"FROM message_fts WHERE {where} "
"GROUP BY username ORDER BY cnt DESC LIMIT 400",
params,
).fetchall()
except Exception:
rows_u = []
for rr in rows_u:
if not rr:
continue
u = str(rr[0] or "").strip()
if not u:
continue
try:
cnt = int(rr[1] or 0)
except Exception:
cnt = 0
if cnt > 0:
per_username_counts_i[u] = cnt
# Top phrases (short text only).
phrase_counts_i: Counter[str] = Counter()
try:
rows_p = conn.execute(
f"SELECT \"text\" AS txt, COUNT(1) AS cnt "
f"FROM message_fts WHERE {where} AND render_type = 'text' "
" AND \"text\" IS NOT NULL "
" AND TRIM(\"text\") != '' "
" AND LENGTH(TRIM(\"text\")) <= 12 "
"GROUP BY txt ORDER BY cnt DESC LIMIT 400",
params,
).fetchall()
except Exception:
rows_p = []
for rr in rows_p:
if not rr:
continue
phrase = _normalize_phrase(rr[0])
if not phrase:
continue
try:
cnt = int(rr[1] or 0)
except Exception:
cnt = 0
if cnt > 0:
phrase_counts_i[phrase] += cnt
def pick_top(counter: Counter[Any]) -> Optional[tuple[Any, int]]:
if not counter:
return None
best_item = max(counter.items(), key=lambda kv: (kv[1], str(kv[0])))
if best_item[1] <= 0:
return None
return best_item[0], int(best_item[1])
def is_keep_username(u: str) -> bool:
return _should_keep_session(u, include_official=False)
contact_counts_i = Counter(
{
u: c
for u, c in per_username_counts_i.items()
if (not u.endswith("@chatroom")) and is_keep_username(u)
}
)
group_counts_i = Counter(
{u: c for u, c in per_username_counts_i.items() if u.endswith("@chatroom") and is_keep_username(u)}
)
top_contact = pick_top(contact_counts_i)
top_group = pick_top(group_counts_i)
top_phrase = pick_top(phrase_counts_i)
total_messages = int(sum(local_type_counts_i.values()))
logger.info(
"Wrapped card#0 overview computed (search index): account=%s year=%s total=%s active_days=%s sender=%s db=%s elapsed=%.2fs",
str(account_dir.name or "").strip(),
year,
total_messages,
active_days_i,
sender or "*",
str(index_path.name),
time.time() - t0,
)
return GlobalOverviewStats(
year=year,
active_days=active_days_i,
local_type_counts={int(k): int(v) for k, v in local_type_counts_i.items()},
kind_counts={str(k): int(v) for k, v in kind_counts_i.items()},
latest_ts=latest_ts_i,
top_phrase=(str(top_phrase[0]), int(top_phrase[1])) if top_phrase else None,
top_emoji=None,
top_contact=(str(top_contact[0]), int(top_contact[1])) if top_contact else None,
top_group=(str(top_group[0]), int(top_group[1])) if top_group else None,
)
finally:
try:
conn.close()
except Exception:
pass
# Resolve all sessions (usernames) so we can map msg_xxx/chat_xxx tables back to usernames.
session_usernames = _list_session_usernames(account_dir / "session.db")
md5_to_username: dict[str, str] = {}
table_to_username: dict[str, str] = {}
for u in session_usernames:
md5_hex = hashlib.md5(u.encode("utf-8")).hexdigest().lower()
md5_to_username[md5_hex] = u
table_to_username[f"msg_{md5_hex}"] = u
table_to_username[f"chat_{md5_hex}"] = u
def resolve_username_from_table(table_name: str) -> Optional[str]:
ln = str(table_name or "").lower()
u = table_to_username.get(ln)
if u:
return u
m = _MD5_HEX_RE.search(ln)
if m:
return md5_to_username.get(m.group(0).lower())
return None
db_paths = _iter_message_db_paths(account_dir)
db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")]
# Convert millisecond timestamps defensively.
ts_expr = (
"CASE WHEN create_time > 1000000000000 THEN CAST(create_time/1000 AS INTEGER) ELSE create_time END"
)
local_type_counts: Counter[int] = Counter()
kind_counts: Counter[str] = Counter()
active_days: set[str] = set()
per_username_counts: Counter[str] = Counter()
phrase_counts: Counter[str] = Counter()
latest_ts = 0
t0 = time.time()
for db_path in db_paths:
if not db_path.exists():
continue
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
tables = _list_message_tables(conn)
if not tables:
continue
sender_rowid: int | None = None
if sender:
try:
r2 = conn.execute(
"SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1",
(sender,),
).fetchone()
if r2 is not None and r2[0] is not None:
sender_rowid = int(r2[0])
except Exception:
sender_rowid = None
# Can't reliably filter by sender for this shard; skip to avoid mixing directions.
if sender_rowid is None:
continue
for table_name in tables:
qt = _quote_ident(table_name)
username = resolve_username_from_table(table_name)
sender_where = " AND real_sender_id = ?" if sender_rowid is not None else ""
params = (start_ts, end_ts, sender_rowid) if sender_rowid is not None else (start_ts, end_ts)
# 1) local_type distribution + table total
sql_types = (
"SELECT local_type, COUNT(1) AS cnt "
"FROM ("
f" SELECT local_type, {ts_expr} AS ts "
f" FROM {qt} "
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}"
") sub "
"GROUP BY local_type"
)
try:
rows = conn.execute(sql_types, params).fetchall()
except Exception:
continue
if not rows:
continue
table_total = 0
table_text_cnt = 0
for r in rows:
if not r:
continue
try:
lt = int(r[0] or 0)
except Exception:
lt = 0
try:
cnt = int(r[1] or 0)
except Exception:
cnt = 0
if cnt <= 0:
continue
table_total += cnt
local_type_counts[lt] += cnt
kind_counts[_kind_from_local_type(lt)] += cnt
if lt == 1:
table_text_cnt = cnt
if table_total <= 0:
continue
if username:
per_username_counts[username] += table_total
# 3) active days (distinct dates)
sql_days = (
"SELECT DISTINCT date(datetime(ts, 'unixepoch', 'localtime')) AS d "
"FROM ("
f" SELECT {ts_expr} AS ts"
f" FROM {qt}"
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}"
") sub"
)
try:
rows_d = conn.execute(sql_days, params).fetchall()
except Exception:
rows_d = []
for rd in rows_d:
if not rd or not rd[0]:
continue
active_days.add(str(rd[0]))
# 4) latest timestamp within this year
sql_max_ts = f"SELECT MAX({ts_expr}) AS mx FROM {qt} WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}"
try:
rmax = conn.execute(sql_max_ts, params).fetchone()
except Exception:
rmax = None
try:
mx = int((rmax[0] if rmax else 0) or 0)
except Exception:
mx = 0
if mx > latest_ts:
latest_ts = mx
# 5) top phrases (best-effort via short, repeated text messages)
if table_text_cnt > 0:
sql_phrase = (
"SELECT message_content AS txt, COUNT(1) AS cnt "
f"FROM {qt} "
f"WHERE local_type = 1 "
f" AND {ts_expr} >= ? AND {ts_expr} < ?{sender_where} "
" AND message_content IS NOT NULL "
" AND TRIM(CAST(message_content AS TEXT)) != '' "
" AND LENGTH(TRIM(CAST(message_content AS TEXT))) <= 12 "
"GROUP BY txt "
"ORDER BY cnt DESC "
"LIMIT 60"
)
try:
rows_p = conn.execute(sql_phrase, params).fetchall()
except Exception:
rows_p = []
for rp in rows_p:
if not rp:
continue
phrase = _normalize_phrase(rp[0])
if not phrase:
continue
try:
cnt = int(rp[1] or 0)
except Exception:
cnt = 0
if cnt > 0:
phrase_counts[phrase] += cnt
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
def pick_top(counter: Counter[Any]) -> Optional[tuple[Any, int]]:
if not counter:
return None
# Deterministic tie-breaker: key string ascending.
best_item = max(counter.items(), key=lambda kv: (kv[1], str(kv[0])))
if best_item[1] <= 0:
return None
return best_item[0], int(best_item[1])
# Pick top contact & group (exclude official/service accounts by default).
def is_keep_username(u: str) -> bool:
return _should_keep_session(u, include_official=False)
contact_counts = Counter({u: c for u, c in per_username_counts.items() if (not u.endswith("@chatroom")) and is_keep_username(u)})
group_counts = Counter({u: c for u, c in per_username_counts.items() if u.endswith("@chatroom") and is_keep_username(u)})
top_contact = pick_top(contact_counts)
top_group = pick_top(group_counts)
top_phrase = pick_top(phrase_counts)
total_messages = int(sum(local_type_counts.values()))
logger.info(
"Wrapped card#0 overview computed: account=%s year=%s total=%s active_days=%s sender=%s dbs=%s elapsed=%.2fs",
str(account_dir.name or "").strip(),
year,
total_messages,
len(active_days),
sender or "*",
len(db_paths),
time.time() - t0,
)
return GlobalOverviewStats(
year=year,
active_days=len(active_days),
local_type_counts={int(k): int(v) for k, v in local_type_counts.items()},
kind_counts={str(k): int(v) for k, v in kind_counts.items()},
latest_ts=int(latest_ts),
top_phrase=(str(top_phrase[0]), int(top_phrase[1])) if top_phrase else None,
top_emoji=None,
top_contact=(str(top_contact[0]), int(top_contact[1])) if top_contact else None,
top_group=(str(top_group[0]), int(top_group[1])) if top_group else None,
)
def build_card_00_global_overview(
*,
account_dir: Path,
year: int,
heatmap: WeekdayHourHeatmap | None = None,
) -> dict[str, Any]:
"""Card #0: 年度全局概览开场综合页建议作为第2页"""
sender = str(account_dir.name or "").strip()
heatmap = heatmap or compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=sender)
stats = compute_global_overview_stats(account_dir=account_dir, year=year, sender_username=sender)
# Resolve display names for top sessions (best-effort).
contact_db_path = account_dir / "contact.db"
top_usernames: list[str] = []
if stats.top_contact:
top_usernames.append(stats.top_contact[0])
if stats.top_group:
top_usernames.append(stats.top_group[0])
contact_rows = _load_contact_rows(contact_db_path, top_usernames) if top_usernames else {}
top_contact_obj = None
if stats.top_contact:
u, cnt = stats.top_contact
row = contact_rows.get(u)
display = _pick_display_name(row, u)
avatar = _pick_avatar_url(row) or (_build_avatar_url(str(account_dir.name or ""), u) if u else "")
top_contact_obj = {
"username": u,
"displayName": display,
"maskedName": _mask_name(display),
"avatarUrl": avatar,
"messages": int(cnt),
"isGroup": False,
}
top_group_obj = None
if stats.top_group:
u, cnt = stats.top_group
row = contact_rows.get(u)
display = _pick_display_name(row, u)
avatar = _pick_avatar_url(row) or (_build_avatar_url(str(account_dir.name or ""), u) if u else "")
top_group_obj = {
"username": u,
"displayName": display,
"maskedName": _mask_name(display),
"avatarUrl": avatar,
"messages": int(cnt),
"isGroup": True,
}
# Derive the top "message kind".
top_kind = None
if stats.kind_counts:
kc = Counter(stats.kind_counts)
# Exclude mostly-unhelpful kinds from the "top" pick.
for drop in ("system", "other"):
if drop in kc:
del kc[drop]
if kc:
kind, count = max(kc.items(), key=lambda kv: (kv[1], str(kv[0])))
ratio = (float(count) / float(heatmap.total_messages)) if heatmap.total_messages > 0 else 0.0
top_kind = {
"kind": str(kind),
"label": _kind_label_zh(str(kind)),
"count": int(count),
"ratio": ratio,
}
messages_per_day = 0.0
if stats.active_days > 0:
messages_per_day = heatmap.total_messages / float(stats.active_days)
most_active_hour: Optional[int] = None
most_active_weekday: Optional[int] = None
if heatmap.total_messages > 0:
hour_totals = [sum(heatmap.matrix[w][h] for w in range(7)) for h in range(24)]
most_active_hour = max(range(24), key=lambda h: (hour_totals[h], -h))
weekday_totals = [sum(heatmap.matrix[w][h] for h in range(24)) for w in range(7)]
most_active_weekday = max(range(7), key=lambda w: (weekday_totals[w], -w))
most_active_weekday_name = _weekday_name_zh(most_active_weekday or -1) if most_active_weekday is not None else ""
highlight = None
if stats.latest_ts > 0:
dt = datetime.fromtimestamp(int(stats.latest_ts))
highlight = {
"timestamp": int(stats.latest_ts),
"date": dt.strftime("%Y-%m-%d"),
"time": dt.strftime("%H:%M"),
# Keep it privacy-safe by default: no content/object here.
"action": "你还在微信里发送消息",
}
lines: list[str] = []
if heatmap.total_messages > 0:
lines.append(f"今年以来,你在微信里发送了 {heatmap.total_messages:,} 条消息,平均每天 {messages_per_day:.1f} 条。")
else:
lines.append("今年以来,你在微信里还没有发出聊天消息。")
if stats.active_days > 0:
if most_active_hour is not None and most_active_weekday_name:
lines.append(f"和微信共度的 {stats.active_days} 天里,你最常在 {most_active_hour} 点出没;{most_active_weekday_name}是你最爱聊天的日子。")
else:
lines.append(f"和微信共度的 {stats.active_days} 天里,你留下了很多对话的痕迹。")
if top_contact_obj or top_group_obj:
parts: list[str] = []
if top_contact_obj:
parts.append(f"你发消息最多的人是「{top_contact_obj['maskedName']}」({int(top_contact_obj['messages']):,} 条)")
if top_group_obj:
parts.append(f"你最常发言的群是「{top_group_obj['maskedName']}」({int(top_group_obj['messages']):,} 条)")
if parts:
lines.append("".join(parts) + "")
if top_kind and top_kind.get("count", 0) > 0:
pct = float(top_kind.get("ratio") or 0.0) * 100.0
lines.append(f"你最常用的表达方式是{top_kind['label']}(占 {pct:.0f}%)。")
if stats.top_phrase and stats.top_phrase[0] and stats.top_phrase[1] > 0:
phrase, cnt = stats.top_phrase
lines.append(f"你今年说得最多的一句话是「{phrase}」(共 {cnt:,} 次)。")
# NOTE: We keep the `highlight` field in `data` for future use, but do not
# surface it in the page narrative for now (per product requirement).
narrative = "一屏读懂你的年度微信聊天画像"
return {
"id": 0,
"title": "年度全局概览",
"scope": "global",
"category": "A",
"status": "ok",
"kind": "global/overview",
"narrative": narrative,
"data": {
"year": int(year),
"totalMessages": int(heatmap.total_messages),
"activeDays": int(stats.active_days),
"messagesPerDay": messages_per_day,
"mostActiveHour": most_active_hour,
"mostActiveWeekday": most_active_weekday,
"mostActiveWeekdayName": most_active_weekday_name,
"topContact": top_contact_obj,
"topGroup": top_group_obj,
"topKind": top_kind,
"topPhrase": {"phrase": stats.top_phrase[0], "count": int(stats.top_phrase[1])} if stats.top_phrase else None,
"topEmoji": {"emoji": stats.top_emoji[0], "count": int(stats.top_emoji[1])} if stats.top_emoji else None,
"highlight": highlight,
"lines": lines,
},
}

View File

@@ -7,6 +7,7 @@ from datetime import datetime
from pathlib import Path
from typing import Any
from ...chat_search_index import get_chat_search_index_db_path
from ...chat_helpers import _iter_message_db_paths, _quote_ident
from ...logging_config import get_logger
@@ -25,6 +26,54 @@ class WeekdayHourHeatmap:
total_messages: int
def _get_time_personality(hour: int) -> str:
if 5 <= hour <= 8:
return "early_bird"
if 9 <= hour <= 12:
return "office_worker"
if 13 <= hour <= 17:
return "afternoon"
if 18 <= hour <= 23:
return "night_owl"
if 0 <= hour <= 4:
return "late_night"
return "unknown"
def _get_weekday_name(weekday_index: int) -> str:
if 0 <= weekday_index < len(_WEEKDAY_LABELS_ZH):
return _WEEKDAY_LABELS_ZH[weekday_index]
return ""
def _build_narrative(*, hour: int, weekday: str, total: int) -> str:
personality = _get_time_personality(hour)
templates: dict[str, str] = {
"early_bird": (
f"清晨 {hour:02d}:00当城市还在沉睡你已经开始了新一天的问候。"
f"{weekday}是你最健谈的一天,这一年你用 {total:,} 条消息记录了这些早起时光。"
),
"office_worker": (
f"忙碌的上午 {hour:02d}:00是你最常敲击键盘的时刻。"
f"{weekday}最活跃,这一年你用 {total:,} 条消息把工作与生活都留在了对话里。"
),
"afternoon": (
f"午后的阳光里,{hour:02d}:00 是你最爱分享的时刻。"
f"{weekday}的聊天最热闹,这一年共 {total:,} 条消息串起了你的午后时光。"
),
"night_owl": (
f"夜幕降临,{hour:02d}:00 是你最常出没的时刻。"
f"{weekday}最活跃,这一年 {total:,} 条消息陪你把每个夜晚都聊得更亮。"
),
"late_night": (
f"当世界沉睡,凌晨 {hour:02d}:00 的你依然在线。"
f"{weekday}最活跃,这一年 {total:,} 条深夜消息,是你与这个世界的悄悄话。"
),
}
return templates.get(personality, f"你在 {hour:02d}:00 最活跃")
def _year_range_epoch_seconds(year: int) -> tuple[int, int]:
# Use local time boundaries (same semantics as sqlite "localtime").
start = int(datetime(year, 1, 1).timestamp())
@@ -54,6 +103,7 @@ def _accumulate_db(
start_ts: int,
end_ts: int,
matrix: list[list[int]],
sender_username: str | None = None,
) -> int:
"""Accumulate message counts from one message shard DB into matrix.
@@ -77,9 +127,30 @@ def _accumulate_db(
"CASE WHEN create_time > 1000000000000 THEN CAST(create_time/1000 AS INTEGER) ELSE create_time END"
)
# Optional sender filter (best-effort). When provided, we only count
# messages whose `real_sender_id` maps to `sender_username`.
sender_rowid: int | None = None
if sender_username and str(sender_username).strip():
try:
r = conn.execute(
"SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1",
(str(sender_username).strip(),),
).fetchone()
if r is not None and r[0] is not None:
sender_rowid = int(r[0])
except Exception:
sender_rowid = None
counted = 0
for table_name in tables:
qt = _quote_ident(table_name)
sender_where = ""
params: tuple[Any, ...]
if sender_rowid is not None:
sender_where = " AND real_sender_id = ?"
params = (start_ts, end_ts, sender_rowid)
else:
params = (start_ts, end_ts)
sql = (
"SELECT "
# %w: 0..6 with Sunday=0, so shift to Monday=0..Sunday=6
@@ -89,12 +160,12 @@ def _accumulate_db(
"FROM ("
f" SELECT {ts_expr} AS ts"
f" FROM {qt}"
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?"
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?{sender_where}"
") sub "
"GROUP BY weekday, hour"
)
try:
rows = conn.execute(sql, (start_ts, end_ts)).fetchall()
rows = conn.execute(sql, params).fetchall()
except Exception:
continue
@@ -119,25 +190,114 @@ def _accumulate_db(
pass
def compute_weekday_hour_heatmap(*, account_dir: Path, year: int) -> WeekdayHourHeatmap:
def compute_weekday_hour_heatmap(*, account_dir: Path, year: int, sender_username: str | None = None) -> WeekdayHourHeatmap:
start_ts, end_ts = _year_range_epoch_seconds(year)
matrix: list[list[int]] = [[0 for _ in range(24)] for _ in range(7)]
total = 0
# Prefer using our unified search index if available; it's much faster than scanning all msg tables.
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
conn = sqlite3.connect(str(index_path))
try:
has_fts = (
conn.execute(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1"
).fetchone()
is not None
)
if has_fts:
# Convert millisecond timestamps defensively (some datasets store ms).
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
sender_clause = ""
if sender_username and str(sender_username).strip():
sender_clause = " AND sender_username = ?"
sql = (
"SELECT "
"((CAST(strftime('%w', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) + 6) % 7) AS weekday, "
"CAST(strftime('%H', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS hour, "
"COUNT(1) AS cnt "
"FROM ("
f" SELECT {ts_expr} AS ts"
" FROM message_fts"
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?"
" AND db_stem NOT LIKE 'biz_message%'"
f"{sender_clause}"
") sub "
"GROUP BY weekday, hour"
)
t0 = time.time()
try:
params: tuple[Any, ...] = (start_ts, end_ts)
if sender_username and str(sender_username).strip():
params = (start_ts, end_ts, str(sender_username).strip())
rows = conn.execute(sql, params).fetchall()
except Exception:
rows = []
for r in rows:
if not r:
continue
try:
w = int(r[0] or 0)
h = int(r[1] or 0)
cnt = int(r[2] or 0)
except Exception:
continue
if 0 <= w < 7 and 0 <= h < 24 and cnt > 0:
matrix[w][h] += cnt
total += cnt
logger.info(
"Wrapped heatmap computed (search index): account=%s year=%s total=%s sender=%s db=%s elapsed=%.2fs",
str(account_dir.name or "").strip(),
year,
total,
str(sender_username).strip() if sender_username else "*",
str(index_path.name),
time.time() - t0,
)
return WeekdayHourHeatmap(
weekday_labels=list(_WEEKDAY_LABELS_ZH),
hour_labels=list(_HOUR_LABELS),
matrix=matrix,
total_messages=total,
)
finally:
try:
conn.close()
except Exception:
pass
db_paths = _iter_message_db_paths(account_dir)
# Default: exclude official/biz shards (biz_message*.db) to reduce noise.
db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")]
my_wxid = str(account_dir.name or "").strip()
t0 = time.time()
for db_path in db_paths:
total += _accumulate_db(db_path=db_path, start_ts=start_ts, end_ts=end_ts, matrix=matrix)
total += _accumulate_db(
db_path=db_path,
start_ts=start_ts,
end_ts=end_ts,
matrix=matrix,
sender_username=str(sender_username).strip() if sender_username else None,
)
logger.info(
"Wrapped card#1 heatmap computed: account=%s year=%s total=%s dbs=%s elapsed=%.2fs",
"Wrapped heatmap computed: account=%s year=%s total=%s sender=%s dbs=%s elapsed=%.2fs",
my_wxid,
year,
total,
str(sender_username).strip() if sender_username else "*",
len(db_paths),
time.time() - t0,
)
@@ -150,17 +310,36 @@ def compute_weekday_hour_heatmap(*, account_dir: Path, year: int) -> WeekdayHour
)
def build_card_01_cyber_schedule(*, account_dir: Path, year: int) -> dict[str, Any]:
"""Card #1: 年度赛博作息表 (24x7 heatmap)."""
def build_card_01_cyber_schedule(
*,
account_dir: Path,
year: int,
heatmap: WeekdayHourHeatmap | None = None,
) -> dict[str, Any]:
"""Card #1: 年度赛博作息表 (24x7 heatmap).
heatmap = compute_weekday_hour_heatmap(account_dir=account_dir, year=year)
`heatmap` can be provided by the caller to reuse computation across cards.
"""
narrative = "今年你没有聊天消息"
sender = str(account_dir.name or "").strip()
heatmap = heatmap or compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=sender)
narrative = "今年你没有发出聊天消息"
if heatmap.total_messages > 0:
hour_totals = [sum(heatmap.matrix[w][h] for w in range(7)) for h in range(24)]
# Deterministic: pick earliest hour on ties.
most_active_hour = max(range(24), key=lambda h: (hour_totals[h], -h))
narrative = f"你在 {most_active_hour:02d}:00 最活跃"
weekday_totals = [sum(heatmap.matrix[w][h] for h in range(24)) for w in range(7)]
# Deterministic: pick earliest weekday on ties.
most_active_weekday = max(range(7), key=lambda w: (weekday_totals[w], -w))
weekday_name = _get_weekday_name(most_active_weekday)
narrative = _build_narrative(
hour=most_active_hour,
weekday=weekday_name,
total=heatmap.total_messages,
)
return {
"id": 1,

View File

@@ -0,0 +1,804 @@
from __future__ import annotations
import math
import random
import sqlite3
import time
from collections import Counter
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from pypinyin import lazy_pinyin, Style
from ...chat_helpers import _decode_message_content, _iter_message_db_paths, _quote_ident
from ...chat_search_index import get_chat_search_index_db_path
from ...logging_config import get_logger
logger = get_logger(__name__)
# 键盘布局中用于“磨损”展示的按键(字母 + 数字 + 常用标点)。
# 注意功能键Tab/Enter/Backspace 等)不统计;空格键单独放在 spaceHits。
_KEYBOARD_KEYS = (
list("`1234567890-=")
+ list("qwertyuiop[]\\")
+ list("asdfghjkl;\'")
+ list("zxcvbnm,./")
)
_KEYBOARD_KEY_SET = set(_KEYBOARD_KEYS)
# 将“显示字符”映射到键盘上的“实际按键”(用基础键位表示,如 '!' => '1', '' => '/')。
_CHAR_TO_KEY: dict[str, str] = {
# ASCII shifted symbols
"~": "`",
"!": "1",
"@": "2",
"#": "3",
"$": "4",
"%": "5",
"^": "6",
"&": "7",
"*": "8",
"(": "9",
")": "0",
"_": "-",
"+": "=",
"{": "[",
"}": "]",
"|": "\\",
":": ";",
'"': "'",
"<": ",",
">": ".",
"?": "/",
# Common fullwidth / CJK punctuation (approximate key mapping)
"": "`",
"": "1",
"": "2",
"": "3",
"": "4",
"": "5",
"": "6",
"": "7",
"": "8",
"": "9",
"": "0",
"¥": "4",
"": "4",
"_": "-",
"": "=",
"": "[",
"": "]",
"": "\\",
"": ";",
"": "'",
"": ",",
"": ".",
"": "/",
"": ",",
"": ",",
"": ".",
"": ".",
"": ";",
"": "'",
"": "'",
"": "'",
"": "'",
"": "[",
"": "]",
"": ",",
"": ".",
"": "-",
"": "-",
"": "=",
"": "/",
"": "\\",
"·": "`", # 常见:中文输入法下“·”常用 ` 键打出
"": ".", # 近似处理:省略号按 '.' 计
}
# 默认拼音字母频率分布(用于:有中文但采样不足时的兜底估算)
_DEFAULT_PINYIN_FREQ = {
"a": 0.121,
"i": 0.118,
"n": 0.098,
"e": 0.089,
"u": 0.082,
"g": 0.072,
"h": 0.065,
"o": 0.052,
"z": 0.048,
"s": 0.042,
"x": 0.038,
"y": 0.036,
"d": 0.032,
"l": 0.028,
"j": 0.026,
"b": 0.022,
"c": 0.020,
"w": 0.018,
"m": 0.016,
"f": 0.014,
"t": 0.012,
"r": 0.010,
"p": 0.009,
"k": 0.007,
"q": 0.005,
"v": 0.001,
}
_AVG_PINYIN_LEN = 2.8
def _is_cjk_han(ch: str) -> bool:
"""是否为中文汉字(用于拼音估算)。"""
if not ch:
return False
o = ord(ch)
return (0x4E00 <= o <= 0x9FFF) or (0x3400 <= o <= 0x4DBF)
def _char_to_key(ch: str) -> str | None:
"""将单个字符映射为键盘按键 code与前端键盘布局的 code 保持一致)。"""
if not ch:
return None
# Fullwidth digits: ''..''
if "" <= ch <= "":
return chr(ord(ch) - ord("") + ord("0"))
if ch in _KEYBOARD_KEY_SET:
return ch
mapped = _CHAR_TO_KEY.get(ch)
if mapped is not None:
return mapped
if ch.isalpha():
low = ch.lower()
if low in _KEYBOARD_KEY_SET:
return low
return None
def _update_keyboard_counters(
text: str,
*,
direct_counter: Counter,
pinyin_counter: Counter,
pinyin_cache: dict[str, str],
do_pinyin: bool,
) -> tuple[int, int, int]:
"""
扫描一条消息文本,累加:
- direct_counter: 非中文汉字部分(英文/数字/标点)可直接映射到按键的统计(精确)
- pinyin_counter: 中文汉字部分的拼音字母统计(仅当 do_pinyin=True 时才做;用于采样估算)
并返回 (nonspace_chars, cjk_han_chars, space_chars)。
"""
if not text:
return 0, 0, 0
nonspace = 0
cjk = 0
spaces = 0
for ch in text:
# 真实可见空格:统计进 spaceHits不计入 sentChars/receivedChars 的口径)
if ch == " " or ch == "\u3000":
spaces += 1
continue
if ch.isspace():
continue
nonspace += 1
if _is_cjk_han(ch):
cjk += 1
if do_pinyin:
py = pinyin_cache.get(ch)
if py is None:
lst = lazy_pinyin(ch, style=Style.NORMAL)
py = (lst[0] or "").lower() if lst else ""
pinyin_cache[ch] = py
for letter in py:
# pypinyin 在 Style.NORMAL 下通常只会给出 a-z含 ü=>v这里再做一次过滤。
if letter in _KEYBOARD_KEY_SET:
pinyin_counter[letter] += 1
continue
k = _char_to_key(ch)
if k is not None:
direct_counter[k] += 1
return nonspace, cjk, spaces
def compute_keyboard_stats(*, account_dir: Path, year: int, sample_rate: float = 1.0) -> dict[str, Any]:
"""
统计键盘敲击数据。
- 英文/数字/标点:可直接从消息文本映射到按键(精确统计)
- 中文汉字需要拼音转换成本高对“消息”做采样sample_rate后估算总体拼音字母分布
"""
start_ts, end_ts = _year_range_epoch_seconds(year)
my_username = str(account_dir.name or "").strip()
sample_rate = max(0.0, min(1.0, float(sample_rate)))
direct_counter: Counter[str] = Counter()
pinyin_counter: Counter[str] = Counter()
pinyin_cache: dict[str, str] = {}
total_cjk_chars = 0
sampled_cjk_chars = 0
actual_space_chars = 0
total_messages = 0
sampled_messages = 0
used_index = False
# 优先使用搜索索引(更快)
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
conn = sqlite3.connect(str(index_path))
try:
has_fts = (
conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone()
is not None
)
if has_fts and my_username:
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
where = (
f"{ts_expr} >= ? AND {ts_expr} < ? "
"AND db_stem NOT LIKE 'biz_message%' "
"AND render_type = 'text' "
"AND \"text\" IS NOT NULL "
"AND TRIM(CAST(\"text\" AS TEXT)) != '' "
"AND sender_username = ?"
)
sql = f"SELECT \"text\" FROM message_fts WHERE {where}"
try:
cur = conn.execute(sql, (start_ts, end_ts, my_username))
used_index = True
for row in cur:
txt = str(row[0] or "").strip()
if not txt:
continue
total_messages += 1
if sample_rate >= 1.0:
do_sample = True
elif sample_rate <= 0.0:
do_sample = False
else:
do_sample = random.random() < sample_rate
if do_sample:
sampled_messages += 1
_, cjk, spaces = _update_keyboard_counters(
txt,
direct_counter=direct_counter,
pinyin_counter=pinyin_counter,
pinyin_cache=pinyin_cache,
do_pinyin=do_sample,
)
total_cjk_chars += cjk
actual_space_chars += spaces
if do_sample:
sampled_cjk_chars += cjk
except Exception:
used_index = False
finally:
try:
conn.close()
except Exception:
pass
# 如果索引不可用,回退到直接扫描(慢,但兼容)
if not used_index:
db_paths = _iter_message_db_paths(account_dir)
for db_path in db_paths:
try:
if db_path.name.lower().startswith("biz_message"):
continue
except Exception:
pass
if not db_path.exists():
continue
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
my_rowid: Optional[int]
try:
r2 = conn.execute("SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", (my_username,)).fetchone()
my_rowid = int(r2[0]) if r2 and r2[0] is not None else None
except Exception:
my_rowid = None
if my_rowid is None:
continue
tables = _list_message_tables(conn)
if not tables:
continue
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
for table in tables:
qt = _quote_ident(table)
sql = (
"SELECT real_sender_id, message_content, compress_content "
f"FROM {qt} "
"WHERE local_type = 1 "
f" AND {ts_expr} >= ? AND {ts_expr} < ?"
)
try:
cur = conn.execute(sql, (start_ts, end_ts))
except Exception:
continue
for r in cur:
try:
rsid = int(r["real_sender_id"] or 0)
except Exception:
rsid = 0
if rsid != my_rowid:
continue
txt = ""
try:
txt = _decode_message_content(r["compress_content"], r["message_content"]).strip()
except Exception:
txt = ""
if not txt:
continue
total_messages += 1
if sample_rate >= 1.0:
do_sample = True
elif sample_rate <= 0.0:
do_sample = False
else:
do_sample = random.random() < sample_rate
if do_sample:
sampled_messages += 1
_, cjk, spaces = _update_keyboard_counters(
txt,
direct_counter=direct_counter,
pinyin_counter=pinyin_counter,
pinyin_cache=pinyin_cache,
do_pinyin=do_sample,
)
total_cjk_chars += cjk
actual_space_chars += spaces
if do_sample:
sampled_cjk_chars += cjk
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
# 中文拼音部分:按“中文汉字数量”缩放(比按总字符缩放更合理,也能让数字/标点更准确)
est_pinyin_counter: Counter[str] = Counter()
sampled_pinyin_hits = int(sum(pinyin_counter.values()))
if total_cjk_chars > 0:
if sampled_cjk_chars > 0 and sampled_pinyin_hits > 0:
scale_factor = total_cjk_chars / sampled_cjk_chars
for k, cnt in pinyin_counter.items():
est_pinyin_counter[k] = int(round(cnt * scale_factor))
else:
# 兜底:有中文但采样不足(或采样中无法提取拼音),用默认分布估算
total_pinyin_hits = int(total_cjk_chars * _AVG_PINYIN_LEN)
for k, freq in _DEFAULT_PINYIN_FREQ.items():
est_pinyin_counter[k] = int(freq * total_pinyin_hits)
key_hits_counter: Counter[str] = Counter()
key_hits_counter.update(direct_counter)
key_hits_counter.update(est_pinyin_counter)
key_hits: dict[str, int] = {k: int(key_hits_counter.get(k, 0)) for k in _KEYBOARD_KEYS}
total_non_space_hits = int(sum(key_hits.values()))
# 空格键:= 真实空格(如英文句子) + 中文拼音选词带来的“隐含空格”(粗略估算)
implied_space_hits = int(sum(est_pinyin_counter.values()) * 0.15)
space_hits = int(actual_space_chars + implied_space_hits)
total_key_hits = int(total_non_space_hits + space_hits)
# 频率只对“非空格键”归一化;空格频率由 spaceHits 单独给出
key_frequency: dict[str, float] = {}
for k in _KEYBOARD_KEYS:
key_frequency[k] = (key_hits.get(k, 0) / total_non_space_hits) if total_non_space_hits > 0 else 0.0
logger.info(
"Keyboard stats computed: account=%s year=%s sample_rate=%.2f msgs=%d sampled=%d cjk=%d sampled_cjk=%d total_hits=%d",
my_username,
year,
float(sample_rate),
int(total_messages),
int(sampled_messages),
int(total_cjk_chars),
int(sampled_cjk_chars),
int(total_key_hits),
)
return {
"totalKeyHits": total_key_hits,
"keyHits": key_hits,
"keyFrequency": key_frequency,
"spaceHits": space_hits,
}
def _year_range_epoch_seconds(year: int) -> tuple[int, int]:
# Use local time boundaries (same semantics as sqlite "localtime").
start = int(datetime(year, 1, 1).timestamp())
end = int(datetime(year + 1, 1, 1).timestamp())
return start, end
def _list_message_tables(conn: sqlite3.Connection) -> list[str]:
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
except Exception:
return []
names: list[str] = []
for r in rows:
if not r or not r[0]:
continue
name = str(r[0])
ln = name.lower()
if ln.startswith(("msg_", "chat_")):
names.append(name)
return names
# Book analogy table (for "sent chars").
_BOOK_ANALOGIES: list[dict[str, Any]] = [
{"min": 1, "max": 100_000, "level": "小量级", "options": ["一本《小王子》", "一本《解忧杂货店》"]},
{"min": 100_000, "max": 500_000, "level": "中量级", "options": ["一本《三体Ⅰ:地球往事》", "一套《朝花夕拾+呐喊》(鲁迅经典合集)"]},
{"min": 500_000, "max": 1_000_000, "level": "大量级", "options": ["一本《红楼梦》(全本)", "一本《百年孤独》(全本无删减)"]},
{"min": 1_000_000, "max": 5_000_000, "level": "超大量级", "options": ["一套《三体》全三册", "一本《西游记》(全本白话文)"]},
{"min": 5_000_000, "max": 10_000_000, "level": "千万级Ⅰ", "options": ["一套金庸武侠《射雕+神雕+倚天》(经典三部曲)", "一套《平凡的世界》全三册"]},
{"min": 10_000_000, "max": 50_000_000, "level": "千万级Ⅱ", "options": ["一套《哈利·波特》全七册(中文版)", "一本《资治通鉴》(文白对照全本)"]},
{"min": 50_000_000, "max": 100_000_000, "level": "亿级Ⅰ", "options": ["一套《冰与火之歌》全系列(中文版)", "一本《史记》(全本含集解索隐正义)"]},
{"min": 100_000_000, "max": 500_000_000, "level": "亿级Ⅱ", "options": ["一套《中国大百科全书》(单卷本全册)", "一套《金庸武侠全集》15部完整版"]},
{"min": 500_000_000, "max": None, "level": "亿级Ⅲ", "options": ["一套《四库全书》(文津阁精选集)", "一套《大英百科全书》(国际完整版)"]},
]
# A4 analogy table (for "received chars").
# Estimation assumptions:
# - A4 (single side) holds about 1700 chars (depends on font/spacing; this is an approximation).
# - 70g A4 paper thickness is roughly 0.1mm => 100 sheets ≈ 1cm.
_A4_CHARS_PER_SHEET = 1700
_A4_SHEETS_PER_CM = 100.0
# "Level" is a coarse grouping by character count; the physical object analogy is picked by the
# estimated stacked height (so the text stays self-consistent).
_A4_LEVELS: list[dict[str, Any]] = [
{"min": 1, "max": 100_000, "level": "小量级"},
{"min": 100_000, "max": 500_000, "level": "中量级"},
{"min": 500_000, "max": 1_000_000, "level": "大量级"},
{"min": 1_000_000, "max": 5_000_000, "level": "超大量级"},
{"min": 5_000_000, "max": 10_000_000, "level": "千万级Ⅰ"},
{"min": 10_000_000, "max": 50_000_000, "level": "千万级Ⅱ"},
{"min": 50_000_000, "max": 100_000_000, "level": "亿级Ⅰ"},
{"min": 100_000_000, "max": 500_000_000, "level": "亿级Ⅱ"},
{"min": 500_000_000, "max": None, "level": "亿级Ⅲ"},
]
# Physical object analogies by stacked height (cm).
_A4_HEIGHT_ANALOGIES: list[dict[str, Any]] = [
{"minCm": 0.0, "maxCm": 0.5, "objects": ["1枚硬币的厚度", "1张银行卡的厚度"]},
{"minCm": 0.5, "maxCm": 2.0, "objects": ["1叠便利贴", "1本薄款软皮笔记本"]},
{"minCm": 2.0, "maxCm": 6.0, "objects": ["3-5本加厚硬壳笔记本", "1本厚词典"]},
{"minCm": 6.0, "maxCm": 30.0, "objects": ["10本办公台账", "1个矮款文件柜单层满装"]},
{"minCm": 30.0, "maxCm": 60.0, "objects": ["1个标准办公文件盒", "1个登机箱约55cm"]},
{"minCm": 60.0, "maxCm": 200.0, "objects": ["1.7-1.8m成年人身高", "2个办公文件柜叠放"]},
{"minCm": 200.0, "maxCm": 600.0, "objects": ["2层普通住宅层高", "1棵成年矮树枇杷树/橘子树)"]},
{"minCm": 600.0, "maxCm": 2500.0, "objects": ["4-8层居民楼层高", "1棵成年大树梧桐树/樟树)"]},
{"minCm": 2500.0, "maxCm": 5000.0, "objects": ["10-18层小高层住宅", "1栋小型临街写字楼"]},
{"minCm": 5000.0, "maxCm": 25000.0, "objects": ["20-80层超高层住宅", "城市核心区小高层地标"]},
{"minCm": 25000.0, "maxCm": None, "objects": ["1栋城市核心超高层写字楼", "国内中型摩天大楼约100层"]},
]
def _pick_option(options: list[str], *, seed: int) -> str:
if not options:
return ""
idx = abs(int(seed)) % len(options)
return str(options[idx] or "").strip()
def _pick_book_analogy(chars: int) -> Optional[dict[str, Any]]:
n = int(chars or 0)
if n <= 0:
return None
for row in _BOOK_ANALOGIES:
lo = int(row["min"] or 0)
hi = row.get("max")
if n < lo:
continue
if hi is None or n < int(hi):
picked = _pick_option(list(row.get("options") or []), seed=n)
return {
"level": str(row.get("level") or ""),
"book": picked,
"text": f"相当于写了{picked}" if picked else "",
}
return None
def _format_height(height_cm: float) -> str:
try:
cm = float(height_cm)
except Exception:
cm = 0.0
if cm <= 0:
return "0cm"
if cm < 1:
mm = cm * 10.0
return f"{mm:.1f}mm"
if cm < 100:
if cm < 10:
return f"{cm:.1f}cm"
return f"{cm:.0f}cm"
m = cm / 100.0
if m < 10:
return f"{m:.1f}m"
return f"{m:.0f}m"
def _a4_stats(chars: int) -> dict[str, Any]:
# Rough estimate: 1 A4 page ~ 1700 chars; 100 pages ~ 1cm thick.
n = int(chars or 0)
if n <= 0:
return {"sheets": 0, "heightCm": 0.0, "heightText": "0cm"}
sheets = int(math.ceil(n / float(_A4_CHARS_PER_SHEET)))
height_cm = float(sheets) / float(_A4_SHEETS_PER_CM)
return {"sheets": int(sheets), "heightCm": float(height_cm), "heightText": _format_height(height_cm)}
def _pick_a4_analogy(chars: int) -> Optional[dict[str, Any]]:
n = int(chars or 0)
if n <= 0:
return None
a4 = _a4_stats(n)
level = ""
for row in _A4_LEVELS:
lo = int(row["min"] or 0)
hi = row.get("max")
if n < lo:
continue
if hi is None or n < int(hi):
level = str(row.get("level") or "")
break
height_cm = float(a4.get("heightCm") or 0.0)
picked = ""
for row in _A4_HEIGHT_ANALOGIES:
lo = float(row.get("minCm") or 0.0)
hi = row.get("maxCm")
if height_cm < lo:
continue
if hi is None or height_cm < float(hi):
picked = _pick_option(list(row.get("objects") or []), seed=n)
break
return {
"level": level,
"object": picked,
"a4": a4,
"text": (
f"大约 {int(a4['sheets']):,} 张 A4堆起来约 {a4['heightText']}" + (f",差不多是{picked}的高度" if picked else "")
).strip(""),
}
def compute_text_message_char_counts(*, account_dir: Path, year: int) -> tuple[int, int]:
"""Return (sent_chars, received_chars) for render_type='text' messages in the year."""
start_ts, end_ts = _year_range_epoch_seconds(year)
my_username = str(account_dir.name or "").strip()
# Prefer search index when available.
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
conn = sqlite3.connect(str(index_path))
try:
has_fts = (
conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone()
is not None
)
if has_fts:
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
where = (
f"{ts_expr} >= ? AND {ts_expr} < ? "
"AND db_stem NOT LIKE 'biz_message%' "
"AND render_type = 'text' "
"AND \"text\" IS NOT NULL "
"AND TRIM(CAST(\"text\" AS TEXT)) != ''"
)
sql_total = f"SELECT COALESCE(SUM(LENGTH(REPLACE(\"text\", ' ', ''))), 0) AS chars FROM message_fts WHERE {where}"
r_total = conn.execute(sql_total, (start_ts, end_ts)).fetchone()
total_chars = int((r_total[0] if r_total else 0) or 0)
if my_username:
sql_sent = f"{sql_total} AND sender_username = ?"
r_sent = conn.execute(sql_sent, (start_ts, end_ts, my_username)).fetchone()
sent_chars = int((r_sent[0] if r_sent else 0) or 0)
else:
sent_chars = 0
recv_chars = max(0, total_chars - sent_chars)
return sent_chars, recv_chars
finally:
try:
conn.close()
except Exception:
pass
# Fallback: scan message shards directly (slower, but works without the index).
t0 = time.time()
sent_total = 0
recv_total = 0
db_paths = _iter_message_db_paths(account_dir)
for db_path in db_paths:
try:
if db_path.name.lower().startswith("biz_message"):
continue
except Exception:
pass
if not db_path.exists():
continue
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
my_rowid: Optional[int]
try:
r2 = conn.execute("SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1", (my_username,)).fetchone()
my_rowid = int(r2[0]) if r2 and r2[0] is not None else None
except Exception:
my_rowid = None
tables = _list_message_tables(conn)
if not tables:
continue
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
for table in tables:
qt = _quote_ident(table)
sql = (
"SELECT real_sender_id, message_content, compress_content "
f"FROM {qt} "
"WHERE local_type = 1 "
f" AND {ts_expr} >= ? AND {ts_expr} < ?"
)
try:
cur = conn.execute(sql, (start_ts, end_ts))
except Exception:
continue
for r in cur:
try:
rsid = int(r["real_sender_id"] or 0)
except Exception:
rsid = 0
txt = ""
try:
txt = _decode_message_content(r["compress_content"], r["message_content"]).strip()
except Exception:
txt = ""
if not txt:
continue
# Match search index semantics: count non-whitespace characters.
cnt = 0
for ch in txt:
if not ch.isspace():
cnt += 1
if cnt <= 0:
continue
if my_rowid is not None and rsid == my_rowid:
sent_total += cnt
else:
recv_total += cnt
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
logger.info(
"Wrapped card#2 message chars computed (fallback scan): account=%s year=%s sent=%s recv=%s dbs=%s elapsed=%.2fs",
str(account_dir.name or "").strip(),
year,
int(sent_total),
int(recv_total),
len(db_paths),
time.time() - t0,
)
return int(sent_total), int(recv_total)
def build_card_02_message_chars(*, account_dir: Path, year: int) -> dict[str, Any]:
sent_chars, recv_chars = compute_text_message_char_counts(account_dir=account_dir, year=year)
sent_book = _pick_book_analogy(sent_chars)
recv_a4 = _pick_a4_analogy(recv_chars)
# 计算键盘敲击统计
keyboard_stats = compute_keyboard_stats(account_dir=account_dir, year=year, sample_rate=1.0)
if sent_chars > 0 and recv_chars > 0:
narrative = f"你今年在微信里打了 {sent_chars:,} 个字,也收到了 {recv_chars:,} 个字。"
elif sent_chars > 0:
narrative = f"你今年在微信里打了 {sent_chars:,} 个字。"
elif recv_chars > 0:
narrative = f"你今年在微信里收到了 {recv_chars:,} 个字。"
else:
narrative = "今年你还没有文字消息"
return {
"id": 2,
"title": "年度消息字数",
"scope": "global",
"category": "C",
"status": "ok",
"kind": "text/message_chars",
"narrative": narrative,
"data": {
"year": int(year),
"sentChars": int(sent_chars),
"receivedChars": int(recv_chars),
"sentBook": sent_book,
"receivedA4": recv_a4,
"keyboard": keyboard_stats,
},
}

View File

@@ -1,27 +1,263 @@
from __future__ import annotations
import json
import sqlite3
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from ..chat_helpers import _resolve_account_dir
from ..chat_helpers import _iter_message_db_paths, _quote_ident, _resolve_account_dir
from ..chat_search_index import get_chat_search_index_db_path
from ..logging_config import get_logger
from .storage import wrapped_cache_path
from .cards.card_01_cyber_schedule import build_card_01_cyber_schedule
from .storage import wrapped_cache_dir, wrapped_cache_path
from .cards.card_00_global_overview import build_card_00_global_overview
from .cards.card_01_cyber_schedule import WeekdayHourHeatmap, build_card_01_cyber_schedule, compute_weekday_hour_heatmap
from .cards.card_02_message_chars import build_card_02_message_chars
logger = get_logger(__name__)
# We implement cards strictly in the order of `docs/wechat_wrapped_ideas_feasibility.md`.
_IMPLEMENTED_UPTO_ID = 1
# We use this number to version the cache filename so adding more cards won't accidentally serve
# an older partial cache.
_IMPLEMENTED_UPTO_ID = 2
# Bump this when we change card payloads/ordering while keeping the same implemented_upto.
_CACHE_VERSION = 4
# "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card.
# Keep this list in display order (same as the old monolithic `/api/wrapped/annual` response).
_WRAPPED_CARD_MANIFEST: tuple[dict[str, Any], ...] = (
{
"id": 0,
"title": "年度全局概览",
"scope": "global",
"category": "A",
"kind": "global/overview",
},
{
"id": 1,
"title": "年度赛博作息表",
"scope": "global",
"category": "A",
"kind": "time/weekday_hour_heatmap",
},
{
"id": 2,
"title": "年度消息字数",
"scope": "global",
"category": "C",
"kind": "text/message_chars",
},
)
_WRAPPED_CARD_ID_SET = {int(c["id"]) for c in _WRAPPED_CARD_MANIFEST}
# Prevent duplicated heavy computations when multiple card endpoints are hit concurrently.
_LOCKS: dict[str, threading.Lock] = {}
_LOCKS_GUARD = threading.Lock()
def _get_lock(key: str) -> threading.Lock:
with _LOCKS_GUARD:
lock = _LOCKS.get(key)
if lock is None:
lock = threading.Lock()
_LOCKS[key] = lock
return lock
def _default_year() -> int:
return datetime.now().year
def _list_message_tables(conn: sqlite3.Connection) -> list[str]:
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
except Exception:
return []
names: list[str] = []
for r in rows:
if not r or not r[0]:
continue
name = str(r[0])
ln = name.lower()
if ln.startswith(("msg_", "chat_")):
names.append(name)
return names
def list_wrapped_available_years(*, account_dir: Path) -> list[int]:
"""List years that have *any* chat messages for the account (best-effort).
Prefer using `chat_search_index.db` (fast). If not available, fall back to scanning message
shard databases (slower, but works without the index).
"""
# Try a tiny cache first (years don't change often, but scanning can be expensive).
cache_path = wrapped_cache_dir(account_dir) / "available_years.json"
max_mtime = 0
try:
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
max_mtime = max(max_mtime, int(index_path.stat().st_mtime))
except Exception:
pass
try:
for p in _iter_message_db_paths(account_dir):
try:
if p.name.lower().startswith("biz_message"):
continue
if p.exists():
max_mtime = max(max_mtime, int(p.stat().st_mtime))
except Exception:
continue
except Exception:
pass
if cache_path.exists():
try:
cached = json.loads(cache_path.read_text(encoding="utf-8"))
if isinstance(cached, dict):
sig = int(cached.get("max_mtime") or 0)
years = cached.get("years")
if sig == max_mtime and isinstance(years, list):
out: list[int] = []
for x in years:
try:
y = int(x)
except Exception:
continue
if y > 0:
out.append(y)
out.sort(reverse=True)
return out
except Exception:
pass
# Convert millisecond timestamps defensively (some datasets store ms).
# The expression yields epoch seconds as INTEGER.
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
# Fast path: use our unified search index when available.
index_path = get_chat_search_index_db_path(account_dir)
if index_path.exists():
conn = sqlite3.connect(str(index_path))
try:
has_fts = (
conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone()
is not None
)
if has_fts:
sql = (
"SELECT "
"CAST(strftime('%Y', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS y, "
"COUNT(1) AS cnt "
"FROM ("
f" SELECT {ts_expr} AS ts"
" FROM message_fts"
f" WHERE {ts_expr} > 0"
" AND db_stem NOT LIKE 'biz_message%'"
") sub "
"GROUP BY y "
"HAVING cnt > 0 "
"ORDER BY y DESC"
)
try:
rows = conn.execute(sql).fetchall()
except Exception:
rows = []
years: list[int] = []
for r in rows:
if not r:
continue
try:
y = int(r[0])
cnt = int(r[1] or 0)
except Exception:
continue
if y > 0 and cnt > 0:
years.append(y)
years.sort(reverse=True)
try:
cache_path.write_text(
json.dumps({"max_mtime": max_mtime, "years": years}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception:
pass
return years
finally:
try:
conn.close()
except Exception:
pass
# Fallback: scan message shard DBs (may be slow on very large datasets, but only runs
# when the index does not exist).
year_counts: dict[int, int] = {}
db_paths = _iter_message_db_paths(account_dir)
db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")]
for db_path in db_paths:
if not db_path.exists():
continue
conn = sqlite3.connect(str(db_path))
try:
tables = _list_message_tables(conn)
if not tables:
continue
for table_name in tables:
qt = _quote_ident(table_name)
sql = (
"SELECT "
"CAST(strftime('%Y', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS y, "
"COUNT(1) AS cnt "
"FROM ("
f" SELECT {ts_expr} AS ts"
f" FROM {qt}"
f" WHERE {ts_expr} > 0"
") sub "
"GROUP BY y"
)
try:
rows = conn.execute(sql).fetchall()
except Exception:
continue
for r in rows:
if not r:
continue
try:
y = int(r[0])
cnt = int(r[1] or 0)
except Exception:
continue
if y > 0 and cnt > 0:
year_counts[y] = int(year_counts.get(y, 0)) + cnt
finally:
try:
conn.close()
except Exception:
pass
years = [y for y, cnt in year_counts.items() if int(cnt) > 0]
years.sort(reverse=True)
try:
cache_path.write_text(
json.dumps({"max_mtime": max_mtime, "years": years}, ensure_ascii=False, indent=2),
encoding="utf-8",
)
except Exception:
pass
return years
def build_wrapped_annual_response(
*,
account: Optional[str],
@@ -30,25 +266,47 @@ def build_wrapped_annual_response(
) -> dict[str, Any]:
"""Build annual wrapped response for the given account/year.
For now we only implement cards up to id=1.
For now we implement cards up to id=2 (plus a meta overview card id=0).
"""
account_dir = _resolve_account_dir(account)
available_years = list_wrapped_available_years(account_dir=account_dir)
# If the requested year has no messages, snap to the latest available year so the selector only
# shows years with data.
y = int(year or _default_year())
if available_years and y not in available_years:
y = int(available_years[0])
scope = "global"
cache_path = wrapped_cache_path(account_dir=account_dir, scope=scope, year=y, implemented_upto=_IMPLEMENTED_UPTO_ID)
cache_path = wrapped_cache_path(
account_dir=account_dir,
scope=scope,
year=y,
implemented_upto=_IMPLEMENTED_UPTO_ID,
options_tag=f"v{_CACHE_VERSION}",
)
if (not refresh) and cache_path.exists():
try:
cached_obj = json.loads(cache_path.read_text(encoding="utf-8"))
if isinstance(cached_obj, dict) and isinstance(cached_obj.get("cards"), list):
cached_obj["cached"] = True
cached_obj["availableYears"] = available_years
return cached_obj
except Exception:
pass
cards: list[dict[str, Any]] = []
cards.append(build_card_01_cyber_schedule(account_dir=account_dir, year=y))
# Wrapped cards default to "messages sent by me" (outgoing), to avoid mixing directions
# in first-person narratives like "你最常...".
heatmap_sent = _get_or_compute_heatmap_sent(account_dir=account_dir, scope=scope, year=y, refresh=refresh)
# Page 2: global overview (page 1 is the frontend cover slide).
cards.append(build_card_00_global_overview(account_dir=account_dir, year=y, heatmap=heatmap_sent))
# Page 3: cyber schedule heatmap.
cards.append(build_card_01_cyber_schedule(account_dir=account_dir, year=y, heatmap=heatmap_sent))
# Page 4: message char counts (sent vs received).
cards.append(build_card_02_message_chars(account_dir=account_dir, year=y))
obj: dict[str, Any] = {
"account": account_dir.name,
@@ -57,6 +315,7 @@ def build_wrapped_annual_response(
"username": None,
"generated_at": int(time.time()),
"cached": False,
"availableYears": available_years,
"cards": cards,
}
@@ -67,3 +326,183 @@ def build_wrapped_annual_response(
return obj
def build_wrapped_annual_meta(
*,
account: Optional[str],
year: Optional[int],
refresh: bool = False,
) -> dict[str, Any]:
"""Return a light-weight manifest for the Wrapped annual deck.
This is meant to be fast so the frontend can render the deck first, then
request each page (card) lazily to avoid freezing on initial load.
"""
account_dir = _resolve_account_dir(account)
available_years = list_wrapped_available_years(account_dir=account_dir)
# Keep the same year snapping semantics as `build_wrapped_annual_response`.
y = int(year or _default_year())
if available_years and y not in available_years:
y = int(available_years[0])
if refresh:
# The manifest itself is static today, but we keep the flag for API symmetry.
pass
return {
"account": account_dir.name,
"year": y,
"scope": "global",
"availableYears": available_years,
# Shallow copy so callers can't mutate our module-level tuple.
"cards": [dict(c) for c in _WRAPPED_CARD_MANIFEST],
}
def _wrapped_cache_suffix() -> str:
return f"_v{_CACHE_VERSION}"
def _wrapped_card_cache_path(*, account_dir: Path, scope: str, year: int, card_id: int) -> Path:
# Keep stable names; per-account directory already namespaces the files.
return wrapped_cache_dir(account_dir) / f"{scope}_{year}_card_{card_id}{_wrapped_cache_suffix()}.json"
def _wrapped_heatmap_sent_cache_path(*, account_dir: Path, scope: str, year: int) -> Path:
return wrapped_cache_dir(account_dir) / f"{scope}_{year}_heatmap_sent{_wrapped_cache_suffix()}.json"
def _load_cached_heatmap_sent(path: Path) -> WeekdayHourHeatmap | None:
if not path.exists():
return None
try:
obj = json.loads(path.read_text(encoding="utf-8"))
except Exception:
return None
if not isinstance(obj, dict):
return None
weekday_labels = obj.get("weekdayLabels")
hour_labels = obj.get("hourLabels")
matrix = obj.get("matrix")
total = obj.get("totalMessages")
if not isinstance(weekday_labels, list) or not isinstance(hour_labels, list) or not isinstance(matrix, list):
return None
try:
total_i = int(total or 0)
except Exception:
total_i = 0
# Best-effort sanitize matrix to ints; keep shape if possible.
out_matrix: list[list[int]] = []
for row in matrix:
if not isinstance(row, list):
return None
out_row: list[int] = []
for v in row:
try:
out_row.append(int(v or 0))
except Exception:
out_row.append(0)
out_matrix.append(out_row)
return WeekdayHourHeatmap(
weekday_labels=[str(x) for x in weekday_labels],
hour_labels=[str(x) for x in hour_labels],
matrix=out_matrix,
total_messages=total_i,
)
def _get_or_compute_heatmap_sent(*, account_dir: Path, scope: str, year: int, refresh: bool) -> WeekdayHourHeatmap:
path = _wrapped_heatmap_sent_cache_path(account_dir=account_dir, scope=scope, year=year)
lock = _get_lock(str(path))
with lock:
if not refresh:
cached = _load_cached_heatmap_sent(path)
if cached is not None:
return cached
heatmap = compute_weekday_hour_heatmap(account_dir=account_dir, year=year, sender_username=account_dir.name)
try:
path.write_text(
json.dumps(
{
"weekdayLabels": heatmap.weekday_labels,
"hourLabels": heatmap.hour_labels,
"matrix": heatmap.matrix,
"totalMessages": heatmap.total_messages,
},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
except Exception:
logger.exception("Failed to write wrapped heatmap cache: %s", path)
return heatmap
def build_wrapped_annual_card(
*,
account: Optional[str],
year: Optional[int],
card_id: int,
refresh: bool = False,
) -> dict[str, Any]:
"""Build one Wrapped card (page) on-demand.
The result is cached per account/year/card_id to avoid recomputing when users
flip back and forth between pages.
"""
cid = int(card_id)
if cid not in _WRAPPED_CARD_ID_SET:
raise ValueError(f"Unknown Wrapped card id: {cid}")
account_dir = _resolve_account_dir(account)
available_years = list_wrapped_available_years(account_dir=account_dir)
y = int(year or _default_year())
if available_years and y not in available_years:
y = int(available_years[0])
scope = "global"
cache_path = _wrapped_card_cache_path(account_dir=account_dir, scope=scope, year=y, card_id=cid)
lock = _get_lock(str(cache_path))
with lock:
if (not refresh) and cache_path.exists():
try:
cached_obj = json.loads(cache_path.read_text(encoding="utf-8"))
if isinstance(cached_obj, dict) and int(cached_obj.get("id") or -1) == cid:
return cached_obj
except Exception:
pass
heatmap_sent: WeekdayHourHeatmap | None = None
if cid in (0, 1):
heatmap_sent = _get_or_compute_heatmap_sent(account_dir=account_dir, scope=scope, year=y, refresh=refresh)
if cid == 0:
card = build_card_00_global_overview(account_dir=account_dir, year=y, heatmap=heatmap_sent)
elif cid == 1:
card = build_card_01_cyber_schedule(account_dir=account_dir, year=y, heatmap=heatmap_sent)
elif cid == 2:
card = build_card_02_message_chars(account_dir=account_dir, year=y)
else:
# Should be unreachable due to _WRAPPED_CARD_ID_SET check.
raise ValueError(f"Unknown Wrapped card id: {cid}")
try:
cache_path.write_text(json.dumps(card, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
logger.exception("Failed to write wrapped card cache: %s", cache_path)
return card