feat(wrapped): 新增年度总结接口与卡片 #1(赛博作息表)

- 新增 /api/wrapped/annual(year/account/refresh),统计在 worker thread 中执行

- 实现卡片#1:按 周×小时 聚合消息量,默认过滤 biz_message*.db

- 增加 _wrapped/cache JSON 缓存(global_<year>_upto_1.json),refresh 支持强制重算
This commit is contained in:
2977094657
2026-01-30 16:26:04 +08:00
parent 950fb4c7b4
commit 519e9e9299
7 changed files with 314 additions and 0 deletions

View File

@@ -20,6 +20,7 @@ from .routers.keys import router as _keys_router
from .routers.media import router as _media_router from .routers.media import router as _media_router
from .routers.sns import router as _sns_router from .routers.sns import router as _sns_router
from .routers.wechat_detection import router as _wechat_detection_router from .routers.wechat_detection import router as _wechat_detection_router
from .routers.wrapped import router as _wrapped_router
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
# 初始化日志系统 # 初始化日志系统
@@ -53,6 +54,7 @@ app.include_router(_chat_router)
app.include_router(_chat_export_router) app.include_router(_chat_export_router)
app.include_router(_chat_media_router) app.include_router(_chat_media_router)
app.include_router(_sns_router) app.include_router(_sns_router)
app.include_router(_wrapped_router)
class _SPAStaticFiles(StaticFiles): class _SPAStaticFiles(StaticFiles):

View File

@@ -0,0 +1,23 @@
from __future__ import annotations
import asyncio
from typing import Optional
from fastapi import APIRouter, Query
from ..path_fix import PathFixRoute
from ..wrapped.service import build_wrapped_annual_response
router = APIRouter(route_class=PathFixRoute)
@router.get("/api/wrapped/annual", summary="微信聊天年度总结WeChat Wrapped- 后端数据")
async def wrapped_annual(
year: Optional[int] = Query(None, description="年份(例如 2026。默认当前年份。"),
account: Optional[str] = Query(None, description="解密后的账号目录名。默认取第一个可用账号。"),
refresh: bool = Query(False, description="是否强制重新计算(忽略缓存)。"),
):
"""返回年度总结数据(目前仅实现第 1 个点子:年度赛博作息表)。"""
# This endpoint performs blocking sqlite/file IO, so run it in a worker thread.
return await asyncio.to_thread(build_wrapped_annual_response, account=account, year=year, refresh=refresh)

View File

@@ -0,0 +1,6 @@
"""WeChat Wrapped (年度总结) backend modules.
This package is intentionally split into small modules so we can implement
ideas incrementally (按点子编号依次实现), avoiding a single giant file.
"""

View File

@@ -0,0 +1,2 @@
"""Card implementations for WeChat Wrapped (年度总结)."""

View File

@@ -0,0 +1,179 @@
from __future__ import annotations
import sqlite3
import time
from dataclasses import dataclass
from datetime import datetime
from pathlib import Path
from typing import Any
from ...chat_helpers import _iter_message_db_paths, _quote_ident
from ...logging_config import get_logger
logger = get_logger(__name__)
_WEEKDAY_LABELS_ZH = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
_HOUR_LABELS = [f"{h:02d}" for h in range(24)]
@dataclass(frozen=True)
class WeekdayHourHeatmap:
weekday_labels: list[str]
hour_labels: list[str]
matrix: list[list[int]] # 7 x 24, weekday major (Mon..Sun) then hour
total_messages: int
def _year_range_epoch_seconds(year: int) -> tuple[int, int]:
# Use local time boundaries (same semantics as sqlite "localtime").
start = int(datetime(year, 1, 1).timestamp())
end = int(datetime(year + 1, 1, 1).timestamp())
return start, end
def _list_message_tables(conn: sqlite3.Connection) -> list[str]:
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
except Exception:
return []
names: list[str] = []
for r in rows:
if not r or not r[0]:
continue
name = str(r[0])
ln = name.lower()
if ln.startswith(("msg_", "chat_")):
names.append(name)
return names
def _accumulate_db(
*,
db_path: Path,
start_ts: int,
end_ts: int,
matrix: list[list[int]],
) -> int:
"""Accumulate message counts from one message shard DB into matrix.
Returns the number of messages counted.
"""
if not db_path.exists():
return 0
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
tables = _list_message_tables(conn)
if not tables:
return 0
# Convert millisecond timestamps defensively (some datasets store ms).
# The expression yields epoch seconds as INTEGER.
ts_expr = (
"CASE WHEN create_time > 1000000000000 THEN CAST(create_time/1000 AS INTEGER) ELSE create_time END"
)
counted = 0
for table_name in tables:
qt = _quote_ident(table_name)
sql = (
"SELECT "
# %w: 0..6 with Sunday=0, so shift to Monday=0..Sunday=6
"((CAST(strftime('%w', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) + 6) % 7) AS weekday, "
"CAST(strftime('%H', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS hour, "
"COUNT(1) AS cnt "
"FROM ("
f" SELECT {ts_expr} AS ts"
f" FROM {qt}"
f" WHERE {ts_expr} >= ? AND {ts_expr} < ?"
") sub "
"GROUP BY weekday, hour"
)
try:
rows = conn.execute(sql, (start_ts, end_ts)).fetchall()
except Exception:
continue
for weekday, hour, cnt in rows:
try:
w = int(weekday)
h = int(hour)
c = int(cnt)
except Exception:
continue
if not (0 <= w < 7 and 0 <= h < 24 and c > 0):
continue
matrix[w][h] += c
counted += c
return counted
finally:
try:
if conn is not None:
conn.close()
except Exception:
pass
def compute_weekday_hour_heatmap(*, account_dir: Path, year: int) -> WeekdayHourHeatmap:
start_ts, end_ts = _year_range_epoch_seconds(year)
matrix: list[list[int]] = [[0 for _ in range(24)] for _ in range(7)]
total = 0
db_paths = _iter_message_db_paths(account_dir)
# Default: exclude official/biz shards (biz_message*.db) to reduce noise.
db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")]
my_wxid = str(account_dir.name or "").strip()
t0 = time.time()
for db_path in db_paths:
total += _accumulate_db(db_path=db_path, start_ts=start_ts, end_ts=end_ts, matrix=matrix)
logger.info(
"Wrapped card#1 heatmap computed: account=%s year=%s total=%s dbs=%s elapsed=%.2fs",
my_wxid,
year,
total,
len(db_paths),
time.time() - t0,
)
return WeekdayHourHeatmap(
weekday_labels=list(_WEEKDAY_LABELS_ZH),
hour_labels=list(_HOUR_LABELS),
matrix=matrix,
total_messages=total,
)
def build_card_01_cyber_schedule(*, account_dir: Path, year: int) -> dict[str, Any]:
"""Card #1: 年度赛博作息表 (24x7 heatmap)."""
heatmap = compute_weekday_hour_heatmap(account_dir=account_dir, year=year)
narrative = "今年你没有聊天消息"
if heatmap.total_messages > 0:
hour_totals = [sum(heatmap.matrix[w][h] for w in range(7)) for h in range(24)]
# Deterministic: pick earliest hour on ties.
most_active_hour = max(range(24), key=lambda h: (hour_totals[h], -h))
narrative = f"你在 {most_active_hour:02d}:00 最活跃"
return {
"id": 1,
"title": "年度赛博作息表",
"scope": "global",
"category": "A",
"status": "ok",
"kind": "time/weekday_hour_heatmap",
"narrative": narrative,
"data": {
"weekdayLabels": heatmap.weekday_labels,
"hourLabels": heatmap.hour_labels,
"matrix": heatmap.matrix,
"totalMessages": heatmap.total_messages,
},
}

View File

@@ -0,0 +1,69 @@
from __future__ import annotations
import json
import time
from datetime import datetime
from pathlib import Path
from typing import Any, Optional
from ..chat_helpers import _resolve_account_dir
from ..logging_config import get_logger
from .storage import wrapped_cache_path
from .cards.card_01_cyber_schedule import build_card_01_cyber_schedule
logger = get_logger(__name__)
# We implement cards strictly in the order of `docs/wechat_wrapped_ideas_feasibility.md`.
_IMPLEMENTED_UPTO_ID = 1
def _default_year() -> int:
return datetime.now().year
def build_wrapped_annual_response(
*,
account: Optional[str],
year: Optional[int],
refresh: bool = False,
) -> dict[str, Any]:
"""Build annual wrapped response for the given account/year.
For now we only implement cards up to id=1.
"""
account_dir = _resolve_account_dir(account)
y = int(year or _default_year())
scope = "global"
cache_path = wrapped_cache_path(account_dir=account_dir, scope=scope, year=y, implemented_upto=_IMPLEMENTED_UPTO_ID)
if (not refresh) and cache_path.exists():
try:
cached_obj = json.loads(cache_path.read_text(encoding="utf-8"))
if isinstance(cached_obj, dict) and isinstance(cached_obj.get("cards"), list):
cached_obj["cached"] = True
return cached_obj
except Exception:
pass
cards: list[dict[str, Any]] = []
cards.append(build_card_01_cyber_schedule(account_dir=account_dir, year=y))
obj: dict[str, Any] = {
"account": account_dir.name,
"year": y,
"scope": scope,
"username": None,
"generated_at": int(time.time()),
"cached": False,
"cards": cards,
}
try:
cache_path.write_text(json.dumps(obj, ensure_ascii=False, indent=2), encoding="utf-8")
except Exception:
logger.exception("Failed to write wrapped cache: %s", cache_path)
return obj

View File

@@ -0,0 +1,33 @@
from __future__ import annotations
from pathlib import Path
def wrapped_account_dir(account_dir: Path) -> Path:
"""Return the per-account wrapped working directory.
We keep all wrapped artifacts under `<account>/_wrapped` so they travel
with the decrypted databases and are easy to inspect/backup.
"""
return account_dir / "_wrapped"
def wrapped_cache_dir(account_dir: Path) -> Path:
d = wrapped_account_dir(account_dir) / "cache"
d.mkdir(parents=True, exist_ok=True)
return d
def wrapped_cache_path(
*,
account_dir: Path,
scope: str,
year: int,
implemented_upto: int,
options_tag: str | None = None,
) -> Path:
# NOTE: Keep the filename stable and versioned by "implemented_upto" so when we
# add more cards later we don't accidentally serve a partial cache.
suffix = f"_{options_tag}" if options_tag else ""
return wrapped_cache_dir(account_dir) / f"{scope}_{year}_upto_{implemented_upto}{suffix}.json"