From ffa6491ed41a61e9adefea4397af7830bc030b46 Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Thu, 19 Feb 2026 20:01:11 +0800 Subject: [PATCH] =?UTF-8?q?feat(wrapped):=20=E5=A2=9E=E5=8A=A0=E6=9C=88?= =?UTF-8?q?=E5=BA=A6=E5=A5=BD=E5=8F=8B=E5=A2=99=E5=8D=A1=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增月度好友墙卡片(chat/monthly_best_friends_wall):按月评选聊天搭子并输出评分维度 - 前端新增拍立得墙展示 12 个月获胜者与指标条,支持头像失败降级 - Wrapped deck 插入新卡片;emoji 卡片 id 顺延为 5,并同步更新测试 - Wrapped 页面默认展示上一年;切换年份时保持当前页并按需懒加载卡片 - WrappedCardShell(slide)支持 wide 布局;更新 wrapped cache version --- .../cards/Card04MonthlyBestFriendsWall.vue | 267 +++++++++++ .../wrapped/shared/WrappedCardShell.vue | 12 +- .../components/wrapped/shared/WrappedHero.vue | 4 + frontend/pages/wrapped/index.vue | 28 +- .../wrapped/cards/card_04_emoji_universe.py | 2 +- .../card_04_monthly_best_friends_wall.py | 452 ++++++++++++++++++ src/wechat_decrypt_tool/wrapped/service.py | 20 +- tests/test_wrapped_emoji_universe.py | 2 +- tests/test_wrapped_monthly_best_friends.py | 271 +++++++++++ 9 files changed, 1045 insertions(+), 13 deletions(-) create mode 100644 frontend/components/wrapped/cards/Card04MonthlyBestFriendsWall.vue create mode 100644 src/wechat_decrypt_tool/wrapped/cards/card_04_monthly_best_friends_wall.py create mode 100644 tests/test_wrapped_monthly_best_friends.py diff --git a/frontend/components/wrapped/cards/Card04MonthlyBestFriendsWall.vue b/frontend/components/wrapped/cards/Card04MonthlyBestFriendsWall.vue new file mode 100644 index 0000000..627b48d --- /dev/null +++ b/frontend/components/wrapped/cards/Card04MonthlyBestFriendsWall.vue @@ -0,0 +1,267 @@ + + + + + diff --git a/frontend/components/wrapped/shared/WrappedCardShell.vue b/frontend/components/wrapped/shared/WrappedCardShell.vue index bb94490..e23906e 100644 --- a/frontend/components/wrapped/shared/WrappedCardShell.vue +++ b/frontend/components/wrapped/shared/WrappedCardShell.vue @@ -20,7 +20,12 @@
-
+

{{ title }}

@@ -47,6 +52,9 @@ defineProps({ cardId: { type: Number, required: true }, title: { type: String, required: true }, narrative: { type: String, default: '' }, - variant: { type: String, default: 'panel' } // 'panel' | 'slide' + variant: { type: String, default: 'panel' }, // 'panel' | 'slide' + // Slide 模式下是否取消 max-width 限制(让内容直接铺满页面宽度)。 + // 用于需要横向展示的可视化(如年度日历热力图)。 + wide: { type: Boolean, default: false } }) diff --git a/frontend/components/wrapped/shared/WrappedHero.vue b/frontend/components/wrapped/shared/WrappedHero.vue index 2c545fb..162ef0d 100644 --- a/frontend/components/wrapped/shared/WrappedHero.vue +++ b/frontend/components/wrapped/shared/WrappedHero.vue @@ -242,6 +242,10 @@ const PREVIEW_BY_KIND = { summary: '回复速度', question: '谁是你愿意秒回的那个人?' }, + 'chat/monthly_best_friends_wall': { + summary: '月度好友墙', + question: '每个月谁是你最有默契的聊天搭子?' + }, 'emoji/annual_universe': { summary: '梗图年鉴', question: '你这一年最常丢出的表情包是哪张?' diff --git a/frontend/pages/wrapped/index.vue b/frontend/pages/wrapped/index.vue index caba0b3..0223126 100644 --- a/frontend/pages/wrapped/index.vue +++ b/frontend/pages/wrapped/index.vue @@ -163,8 +163,14 @@ variant="slide" class="h-full w-full" /> + { await ensureCardLoaded(cardId) } -const reload = async (forceRefresh = false) => { +const reload = async (forceRefresh = false, preserveIndex = false) => { const token = ++reportToken - activeIndex.value = 0 + const keepIndex = preserveIndex ? activeIndex.value : 0 + if (!preserveIndex) activeIndex.value = 0 error.value = '' loading.value = true refreshCards.value = !!forceRefresh @@ -502,6 +511,15 @@ const reload = async (forceRefresh = false) => { } availableYears.value = Array.isArray(resp?.availableYears) ? resp.availableYears : [] + + if (preserveIndex) { + activeIndex.value = clampIndex(keepIndex) + const cardIdx = Number(activeIndex.value) - 1 + if (cardIdx >= 0) { + const id = Number(report.value?.cards?.[cardIdx]?.id) + if (Number.isFinite(id)) void ensureCardLoaded(id) + } + } } catch (e) { if (token !== reportToken) return report.value = null @@ -576,7 +594,7 @@ watch(year, async (newYear, oldYear) => { year.value = oldYear return } - await reload() + await reload(false, true) }) diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_04_emoji_universe.py b/src/wechat_decrypt_tool/wrapped/cards/card_04_emoji_universe.py index b3fb8c3..dce3fd5 100644 --- a/src/wechat_decrypt_tool/wrapped/cards/card_04_emoji_universe.py +++ b/src/wechat_decrypt_tool/wrapped/cards/card_04_emoji_universe.py @@ -1254,7 +1254,7 @@ def build_card_04_emoji_universe(*, account_dir: Path, year: int) -> dict[str, A narrative = "".join(parts) return { - "id": 4, + "id": 5, "title": "这一年,你的表情包里藏了多少心情?", "scope": "global", "category": "B", diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_04_monthly_best_friends_wall.py b/src/wechat_decrypt_tool/wrapped/cards/card_04_monthly_best_friends_wall.py new file mode 100644 index 0000000..705406f --- /dev/null +++ b/src/wechat_decrypt_tool/wrapped/cards/card_04_monthly_best_friends_wall.py @@ -0,0 +1,452 @@ +from __future__ import annotations + +import math +import sqlite3 +import time +from dataclasses import dataclass, field +from datetime import datetime +from pathlib import Path +from typing import Any + +from ...chat_helpers import ( + _build_avatar_url, + _load_contact_rows, + _pick_display_name, + _should_keep_session, +) +from ...chat_search_index import ( + get_chat_search_index_db_path, + get_chat_search_index_status, + start_chat_search_index_build, +) +from ...logging_config import get_logger + +logger = get_logger(__name__) + + +def _year_range_epoch_seconds(year: int) -> tuple[int, int]: + start = int(datetime(year, 1, 1).timestamp()) + end = int(datetime(year + 1, 1, 1).timestamp()) + return start, end + + +def _mask_name(name: str) -> str: + s = str(name or "").strip() + if not s: + return "" + if len(s) == 1: + return "*" + if len(s) == 2: + return s[0] + "*" + return s[0] + ("*" * (len(s) - 2)) + s[-1] + + +@dataclass +class _MonthConvAgg: + username: str + month: int + incoming: int = 0 + outgoing: int = 0 + replies: int = 0 + sum_gap: int = 0 + sum_gap_capped: int = 0 + active_days: set[int] = field(default_factory=set) + time_bucket_mask: int = 0 + + @property + def total(self) -> int: + return int(self.incoming) + int(self.outgoing) + + @property + def interaction(self) -> int: + return min(int(self.incoming), int(self.outgoing)) + + @property + def active_days_count(self) -> int: + return len(self.active_days) + + @property + def time_bucket_count(self) -> int: + m = int(self.time_bucket_mask) & 0xF + return (m & 1) + ((m >> 1) & 1) + ((m >> 2) & 1) + ((m >> 3) & 1) + + def avg_reply_seconds(self) -> float: + if self.replies <= 0: + return 0.0 + return float(self.sum_gap) / float(self.replies) + + def avg_reply_seconds_capped(self) -> float: + if self.replies <= 0: + return 0.0 + return float(self.sum_gap_capped) / float(self.replies) + + def observe(self, *, day: int, hour: int) -> None: + if 1 <= day <= 31: + self.active_days.add(int(day)) + bucket = max(0, min(3, int(hour) // 6)) + self.time_bucket_mask |= 1 << bucket + + +def _score_month_agg( + *, + agg: _MonthConvAgg, + month_max_interaction: int, + month_max_active_days: int, + tau_seconds: float, + weights: dict[str, float], +) -> dict[str, float]: + max_interaction = max(1, int(month_max_interaction)) + max_active = max(1, int(month_max_active_days)) + interaction_score = math.log1p(float(agg.interaction)) / math.log1p(float(max_interaction)) + speed_score = 1.0 / (1.0 + (float(agg.avg_reply_seconds_capped()) / float(max(1.0, tau_seconds)))) + continuity_score = float(agg.active_days_count) / float(max_active) + coverage_score = float(agg.time_bucket_count) / 4.0 + final_score = ( + float(weights["interaction"]) * interaction_score + + float(weights["speed"]) * speed_score + + float(weights["continuity"]) * continuity_score + + float(weights["coverage"]) * coverage_score + ) + return { + "interaction": float(interaction_score), + "speed": float(speed_score), + "continuity": float(continuity_score), + "coverage": float(coverage_score), + "final": float(final_score), + } + + +def compute_monthly_best_friends_wall_stats(*, account_dir: Path, year: int) -> dict[str, Any]: + start_ts, end_ts = _year_range_epoch_seconds(int(year)) + my_username = str(account_dir.name or "").strip() + + gap_cap_seconds = 6 * 60 * 60 + tau_seconds = 30 * 60 + weights = { + "interaction": 0.40, + "speed": 0.30, + "continuity": 0.20, + "coverage": 0.10, + } + eligibility = { + "minTotalMessages": 8, + "minInteraction": 3, + "minReplyCount": 1, + "minActiveDays": 2, + } + + per_month_aggs: dict[int, list[_MonthConvAgg]] = {m: [] for m in range(1, 13)} + used_index = False + index_status: dict[str, Any] | None = None + + index_path = get_chat_search_index_db_path(account_dir) + if index_path.exists(): + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone() + is not None + ) + if has_fts and my_username: + used_index = True + t0 = time.time() + + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + + where = ( + f"{ts_expr} >= ? AND {ts_expr} < ? " + "AND db_stem NOT LIKE 'biz_message%' " + "AND CAST(local_type AS INTEGER) != 10000 " + "AND username NOT LIKE '%@chatroom'" + ) + + sql = ( + "SELECT " + "username, sender_username, " + f"{ts_expr} AS ts, " + "CAST(sort_seq AS INTEGER) AS sort_seq_i, " + "CAST(local_id AS INTEGER) AS local_id_i " + "FROM message_fts " + f"WHERE {where} " + "ORDER BY username ASC, ts ASC, sort_seq_i ASC, local_id_i ASC" + ) + + cur = conn.execute(sql, (start_ts, end_ts)) + + cur_username = "" + conv_month_aggs: dict[int, _MonthConvAgg] = {} + prev_other_ts: int | None = None + + def flush_conv() -> None: + nonlocal cur_username, conv_month_aggs, prev_other_ts + if not cur_username: + return + for m, agg in conv_month_aggs.items(): + if 1 <= int(m) <= 12 and agg.total > 0: + per_month_aggs[int(m)].append(agg) + conv_month_aggs = {} + prev_other_ts = None + + for row in cur: + try: + username = str(row[0] or "").strip() + sender = str(row[1] or "").strip() + ts = int(row[2] or 0) + except Exception: + continue + + if ts <= 0 or not username: + continue + + if username != cur_username: + flush_conv() + cur_username = username + + if not _should_keep_session(username, include_official=False): + continue + + dt = datetime.fromtimestamp(ts) + month = int(dt.month) + if month < 1 or month > 12: + continue + agg = conv_month_aggs.get(month) + if agg is None: + agg = _MonthConvAgg(username=username, month=month) + conv_month_aggs[month] = agg + agg.observe(day=int(dt.day), hour=int(dt.hour)) + + is_me = sender == my_username + if is_me: + agg.outgoing += 1 + if prev_other_ts is not None and ts >= prev_other_ts: + gap = int(ts - prev_other_ts) + agg.replies += 1 + agg.sum_gap += gap + agg.sum_gap_capped += min(gap, gap_cap_seconds) + prev_other_ts = None + else: + agg.incoming += 1 + prev_other_ts = ts + + flush_conv() + + logger.info( + "Wrapped card#4 monthly_best_friends computed (search index): account=%s year=%s elapsed=%.2fs", + str(account_dir.name or "").strip(), + int(year), + time.time() - t0, + ) + finally: + try: + conn.close() + except Exception: + pass + + if not used_index: + try: + index_status = get_chat_search_index_status(account_dir) + index = dict(index_status.get("index") or {}) + build = dict(index.get("build") or {}) + index_ready = bool(index.get("ready")) + build_status = str(build.get("status") or "") + index_exists = bool(index.get("exists")) + if (not index_ready) and build_status not in {"building", "error"}: + start_chat_search_index_build(account_dir, rebuild=bool(index_exists)) + index_status = get_chat_search_index_status(account_dir) + except Exception: + index_status = None + + month_winner_raw: dict[int, dict[str, Any]] = {} + winner_usernames: list[str] = [] + for month in range(1, 13): + aggs = list(per_month_aggs.get(month) or []) + eligible: list[_MonthConvAgg] = [] + for agg in aggs: + if agg.total < int(eligibility["minTotalMessages"]): + continue + if agg.interaction < int(eligibility["minInteraction"]): + continue + if agg.replies < int(eligibility["minReplyCount"]): + continue + if agg.active_days_count < int(eligibility["minActiveDays"]): + continue + eligible.append(agg) + + if not eligible: + continue + + month_max_interaction = max(agg.interaction for agg in eligible) + month_max_active_days = max(agg.active_days_count for agg in eligible) + scored: list[tuple[tuple[float, float, float, float, str], _MonthConvAgg, dict[str, float]]] = [] + for agg in eligible: + score = _score_month_agg( + agg=agg, + month_max_interaction=month_max_interaction, + month_max_active_days=month_max_active_days, + tau_seconds=float(tau_seconds), + weights=weights, + ) + tie_key = ( + -float(score["final"]), + -float(agg.interaction), + float(agg.avg_reply_seconds_capped()), + -float(agg.active_days_count), + str(agg.username), + ) + scored.append((tie_key, agg, score)) + scored.sort(key=lambda x: x[0]) + _, winner_agg, winner_score = scored[0] + month_winner_raw[month] = { + "agg": winner_agg, + "score": winner_score, + } + winner_usernames.append(winner_agg.username) + + uniq_winner_usernames: list[str] = [] + seen: set[str] = set() + for u in winner_usernames: + if u and u not in seen: + seen.add(u) + uniq_winner_usernames.append(u) + + contact_rows = _load_contact_rows(account_dir / "contact.db", uniq_winner_usernames) if uniq_winner_usernames else {} + + months: list[dict[str, Any]] = [] + for month in range(1, 13): + winner_pack = month_winner_raw.get(month) + if not winner_pack: + months.append( + { + "month": month, + "winner": None, + "metrics": None, + "raw": None, + "isFallback": False, + "reason": "insufficient_data", + } + ) + continue + + agg: _MonthConvAgg = winner_pack["agg"] + score = dict(winner_pack["score"] or {}) + row = contact_rows.get(agg.username) + display = _pick_display_name(row, agg.username) + avatar = _build_avatar_url(str(account_dir.name or ""), agg.username) if agg.username else "" + + months.append( + { + "month": month, + "winner": { + "username": agg.username, + "displayName": display, + "maskedName": _mask_name(display), + "avatarUrl": avatar, + "score": float(score.get("final") or 0.0), + "score100": round(float(score.get("final") or 0.0) * 100.0, 1), + }, + "metrics": { + "interactionScore": float(score.get("interaction") or 0.0), + "speedScore": float(score.get("speed") or 0.0), + "continuityScore": float(score.get("continuity") or 0.0), + "coverageScore": float(score.get("coverage") or 0.0), + }, + "raw": { + "incomingMessages": int(agg.incoming), + "outgoingMessages": int(agg.outgoing), + "totalMessages": int(agg.total), + "interaction": int(agg.interaction), + "replyCount": int(agg.replies), + "avgReplySeconds": float(agg.avg_reply_seconds()), + "avgReplySecondsCapped": float(agg.avg_reply_seconds_capped()), + "activeDays": int(agg.active_days_count), + "timeBucketsCount": int(agg.time_bucket_count), + }, + "isFallback": False, + } + ) + + winner_month_counts: dict[str, int] = {} + for item in months: + w = item.get("winner") + if not isinstance(w, dict): + continue + u = str(w.get("username") or "").strip() + if not u: + continue + winner_month_counts[u] = int(winner_month_counts.get(u, 0)) + 1 + + top_champion = None + if winner_month_counts: + champion_username = sorted(winner_month_counts.items(), key=lambda kv: (-int(kv[1]), str(kv[0])))[0][0] + champion_months = int(winner_month_counts.get(champion_username) or 0) + row = contact_rows.get(champion_username) + display = _pick_display_name(row, champion_username) + top_champion = { + "username": champion_username, + "displayName": display, + "maskedName": _mask_name(display), + "monthsWon": champion_months, + } + + filled_months = [int(x.get("month") or 0) for x in months if isinstance(x.get("winner"), dict)] + + return { + "year": int(year), + "months": months, + "summary": { + "monthsWithWinner": int(len(filled_months)), + "topChampion": top_champion, + "filledMonths": filled_months, + }, + "settings": { + "weights": { + "interaction": float(weights["interaction"]), + "speed": float(weights["speed"]), + "continuity": float(weights["continuity"]), + "coverage": float(weights["coverage"]), + }, + "tauSeconds": int(tau_seconds), + "gapCapSeconds": int(gap_cap_seconds), + "eligibility": { + "minTotalMessages": int(eligibility["minTotalMessages"]), + "minInteraction": int(eligibility["minInteraction"]), + "minReplyCount": int(eligibility["minReplyCount"]), + "minActiveDays": int(eligibility["minActiveDays"]), + }, + "usedIndex": bool(used_index), + "indexStatus": index_status, + }, + } + + +def build_card_04_monthly_best_friends_wall(*, account_dir: Path, year: int) -> dict[str, Any]: + data = compute_monthly_best_friends_wall_stats(account_dir=account_dir, year=year) + summary = dict(data.get("summary") or {}) + top_champion = summary.get("topChampion") + months_with_winner = int(summary.get("monthsWithWinner") or 0) + + if months_with_winner <= 0: + narrative = "今年还没有足够的聊天互动数据来评选每月最佳好友(或搜索索引尚未就绪)。" + elif isinstance(top_champion, dict) and top_champion.get("displayName"): + champ_name = str(top_champion.get("displayName") or "") + months_won = int(top_champion.get("monthsWon") or 0) + narrative = f"{champ_name} 拿下了 {months_won} 个月的月度最佳好友;这一年你们的聊天默契很稳定。" + else: + narrative = f"你在 {months_with_winner} 个月里都出现了稳定的“月度最佳好友”。" + + return { + "id": 4, + "title": "陪你走过每个月的人", + "scope": "global", + "category": "B", + "status": "ok", + "kind": "chat/monthly_best_friends_wall", + "narrative": narrative, + "data": data, + } diff --git a/src/wechat_decrypt_tool/wrapped/service.py b/src/wechat_decrypt_tool/wrapped/service.py index 8417768..aa52621 100644 --- a/src/wechat_decrypt_tool/wrapped/service.py +++ b/src/wechat_decrypt_tool/wrapped/service.py @@ -16,6 +16,7 @@ from .cards.card_00_global_overview import build_card_00_global_overview from .cards.card_01_cyber_schedule import WeekdayHourHeatmap, build_card_01_cyber_schedule, compute_weekday_hour_heatmap from .cards.card_02_message_chars import build_card_02_message_chars from .cards.card_03_reply_speed import build_card_03_reply_speed +from .cards.card_04_monthly_best_friends_wall import build_card_04_monthly_best_friends_wall from .cards.card_04_emoji_universe import build_card_04_emoji_universe logger = get_logger(__name__) @@ -23,9 +24,9 @@ logger = get_logger(__name__) # We use this number to version the cache filename so adding more cards won't accidentally serve # an older partial cache. -_IMPLEMENTED_UPTO_ID = 4 +_IMPLEMENTED_UPTO_ID = 5 # Bump this when we change card payloads/ordering while keeping the same implemented_upto. -_CACHE_VERSION = 15 +_CACHE_VERSION = 18 # "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card. @@ -61,6 +62,13 @@ _WRAPPED_CARD_MANIFEST: tuple[dict[str, Any], ...] = ( }, { "id": 4, + "title": "这一年,每个月谁最懂你?", + "scope": "global", + "category": "B", + "kind": "chat/monthly_best_friends_wall", + }, + { + "id": 5, "title": "这一年,你的表情包里藏了多少心情?", "scope": "global", "category": "B", @@ -282,7 +290,7 @@ def build_wrapped_annual_response( ) -> dict[str, Any]: """Build annual wrapped response for the given account/year. - For now we implement cards up to id=4 (plus a meta overview card id=0). + For now we implement cards up to id=5 (plus a meta overview card id=0). """ account_dir = _resolve_account_dir(account) @@ -325,7 +333,9 @@ def build_wrapped_annual_response( cards.append(build_card_02_message_chars(account_dir=account_dir, year=y)) # Page 5: reply speed / best chat buddy. cards.append(build_card_03_reply_speed(account_dir=account_dir, year=y)) - # Page 6: annual emoji universe / meme almanac. + # Page 6: monthly best friends wall (photo wall). + cards.append(build_card_04_monthly_best_friends_wall(account_dir=account_dir, year=y)) + # Page 7: annual emoji universe / meme almanac. cards.append(build_card_04_emoji_universe(account_dir=account_dir, year=y)) obj: dict[str, Any] = { @@ -519,6 +529,8 @@ def build_wrapped_annual_card( elif cid == 3: card = build_card_03_reply_speed(account_dir=account_dir, year=y) elif cid == 4: + card = build_card_04_monthly_best_friends_wall(account_dir=account_dir, year=y) + elif cid == 5: card = build_card_04_emoji_universe(account_dir=account_dir, year=y) else: # Should be unreachable due to _WRAPPED_CARD_ID_SET check. diff --git a/tests/test_wrapped_emoji_universe.py b/tests/test_wrapped_emoji_universe.py index 4aed5c3..867f0f6 100644 --- a/tests/test_wrapped_emoji_universe.py +++ b/tests/test_wrapped_emoji_universe.py @@ -659,7 +659,7 @@ class TestWrappedEmojiUniverse(unittest.TestCase): self._seed_session_db(account_dir / "session.db", usernames=[]) card = build_card_04_emoji_universe(account_dir=account_dir, year=2025) - self.assertEqual(card["id"], 4) + self.assertEqual(card["id"], 5) self.assertEqual(card["status"], "ok") self.assertEqual(card["data"]["sentStickerCount"], 0) self.assertIn("几乎没用表情表达", card["narrative"]) diff --git a/tests/test_wrapped_monthly_best_friends.py b/tests/test_wrapped_monthly_best_friends.py new file mode 100644 index 0000000..82067d6 --- /dev/null +++ b/tests/test_wrapped_monthly_best_friends.py @@ -0,0 +1,271 @@ +import sqlite3 +import unittest +from datetime import datetime +from pathlib import Path +from tempfile import TemporaryDirectory +import sys + +# Ensure "src/" is importable when running tests from repo root. +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestWrappedMonthlyBestFriends(unittest.TestCase): + def _ts(self, y: int, m: int, d: int, hh: int, mm: int, ss: int) -> int: + return int(datetime(y, m, d, hh, mm, ss).timestamp()) + + def _seed_contact_db(self, path: Path, usernames: list[str]) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS contact ( + username TEXT PRIMARY KEY, + remark TEXT, + nick_name TEXT, + alias TEXT, + big_head_url TEXT, + small_head_url TEXT + ) + """ + ) + for u in usernames: + conn.execute( + "INSERT INTO contact(username, nick_name) VALUES(?, ?)", + (u, f"Nick_{u}"), + ) + conn.commit() + finally: + conn.close() + + def _seed_index_db(self, path: Path, rows: list[dict]) -> None: + conn = sqlite3.connect(str(path)) + try: + conn.execute( + """ + CREATE TABLE IF NOT EXISTS message_fts ( + username TEXT, + sender_username TEXT, + create_time INTEGER, + sort_seq INTEGER, + local_id INTEGER, + local_type INTEGER, + db_stem TEXT + ) + """ + ) + for r in rows: + conn.execute( + """ + INSERT INTO message_fts( + username, sender_username, create_time, sort_seq, local_id, local_type, db_stem + ) VALUES(?, ?, ?, ?, ?, ?, ?) + """, + ( + r["username"], + r["sender_username"], + int(r["create_time"]), + int(r["sort_seq"]), + int(r["local_id"]), + int(r.get("local_type", 1)), + str(r.get("db_stem", "message_0")), + ), + ) + conn.commit() + finally: + conn.close() + + def test_balanced_profile_can_beat_higher_volume(self): + from wechat_decrypt_tool.wrapped.cards.card_04_monthly_best_friends_wall import ( + compute_monthly_best_friends_wall_stats, + ) + + with TemporaryDirectory() as td: + account = "wxid_me" + account_dir = Path(td) / account + account_dir.mkdir(parents=True, exist_ok=True) + + user_volume = "wxid_volume" + user_balanced = "wxid_balanced" + self._seed_contact_db(account_dir / "contact.db", [user_volume, user_balanced]) + + rows: list[dict] = [] + lid = 1 + # High-volume user: more messages but consistently slow replies and low continuity. + for d in [3, 18]: + for i in range(6): + t = self._ts(2025, 1, d, 21, i * 3, 0) + rows.append( + { + "username": user_volume, + "sender_username": user_volume, + "create_time": t, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + rows.append( + { + "username": user_volume, + "sender_username": account, + "create_time": t + 7200, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + + # Balanced user: slightly fewer interactions, but much faster and spread over more days/hours. + day_hour = [ + (2, 1), + (6, 8), + (9, 13), + (13, 19), + (20, 10), + (24, 22), + (27, 7), + (29, 16), + (30, 12), + (31, 20), + ] + for d, hh in day_hour: + t = self._ts(2025, 1, d, hh, 10, 0) + rows.append( + { + "username": user_balanced, + "sender_username": user_balanced, + "create_time": t, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + rows.append( + { + "username": user_balanced, + "sender_username": account, + "create_time": t + 20, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + + self._seed_index_db(account_dir / "chat_search_index.db", rows) + data = compute_monthly_best_friends_wall_stats(account_dir=account_dir, year=2025) + jan = data["months"][0] + self.assertIsNotNone(jan["winner"]) + self.assertEqual(jan["winner"]["username"], user_balanced) + + def test_allows_consecutive_month_wins(self): + from wechat_decrypt_tool.wrapped.cards.card_04_monthly_best_friends_wall import ( + compute_monthly_best_friends_wall_stats, + ) + + with TemporaryDirectory() as td: + account = "wxid_me" + account_dir = Path(td) / account + account_dir.mkdir(parents=True, exist_ok=True) + + buddy = "wxid_best" + self._seed_contact_db(account_dir / "contact.db", [buddy]) + + rows: list[dict] = [] + lid = 1 + for month in [1, 2]: + for d in [3, 8, 12, 18]: + t = self._ts(2025, month, d, 12, 0, 0) + rows.append( + { + "username": buddy, + "sender_username": buddy, + "create_time": t, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + rows.append( + { + "username": buddy, + "sender_username": account, + "create_time": t + 30, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + + self._seed_index_db(account_dir / "chat_search_index.db", rows) + data = compute_monthly_best_friends_wall_stats(account_dir=account_dir, year=2025) + jan = data["months"][0] + feb = data["months"][1] + self.assertEqual(jan["winner"]["username"], buddy) + self.assertEqual(feb["winner"]["username"], buddy) + + def test_month_without_enough_activity_is_empty(self): + from wechat_decrypt_tool.wrapped.cards.card_04_monthly_best_friends_wall import ( + compute_monthly_best_friends_wall_stats, + ) + + with TemporaryDirectory() as td: + account = "wxid_me" + account_dir = Path(td) / account + account_dir.mkdir(parents=True, exist_ok=True) + + user = "wxid_low" + self._seed_contact_db(account_dir / "contact.db", [user]) + + rows = [] + lid = 1 + # Only 3 reply pairs in March -> total 6 messages, below minTotalMessages=8. + for d in [5, 11, 25]: + t = self._ts(2025, 3, d, 10, 0, 0) + rows.append( + { + "username": user, + "sender_username": user, + "create_time": t, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + rows.append( + { + "username": user, + "sender_username": account, + "create_time": t + 40, + "sort_seq": lid, + "local_id": lid, + } + ) + lid += 1 + + self._seed_index_db(account_dir / "chat_search_index.db", rows) + data = compute_monthly_best_friends_wall_stats(account_dir=account_dir, year=2025) + march = data["months"][2] + self.assertIsNone(march["winner"]) + self.assertEqual(march["reason"], "insufficient_data") + + def test_card_shape_and_kind(self): + from wechat_decrypt_tool.wrapped.cards.card_04_monthly_best_friends_wall import ( + build_card_04_monthly_best_friends_wall, + ) + + with TemporaryDirectory() as td: + account = "wxid_me" + account_dir = Path(td) / account + account_dir.mkdir(parents=True, exist_ok=True) + self._seed_contact_db(account_dir / "contact.db", []) + self._seed_index_db(account_dir / "chat_search_index.db", []) + + card = build_card_04_monthly_best_friends_wall(account_dir=account_dir, year=2025) + self.assertEqual(card["id"], 4) + self.assertEqual(card["kind"], "chat/monthly_best_friends_wall") + self.assertEqual(card["status"], "ok") + self.assertEqual(len(card["data"]["months"]), 12) + +if __name__ == "__main__": + unittest.main()