diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_03_reply_speed.py b/src/wechat_decrypt_tool/wrapped/cards/card_03_reply_speed.py index b7e0ce8..0a938d0 100644 --- a/src/wechat_decrypt_tool/wrapped/cards/card_03_reply_speed.py +++ b/src/wechat_decrypt_tool/wrapped/cards/card_03_reply_speed.py @@ -65,6 +65,136 @@ def _format_duration_zh(seconds: int | None) -> str: return f"{d}天{hh}小时" if hh else f"{d}天" +def _compute_streak_days(doys: list[int]) -> int: + if not doys: + return 0 + doys_sorted = sorted({int(x) for x in doys if int(x) > 0}) + if not doys_sorted: + return 0 + + best = 1 + cur = 1 + prev = doys_sorted[0] + for d in doys_sorted[1:]: + if d == prev + 1: + cur += 1 + else: + cur = 1 + if cur > best: + best = cur + prev = d + return int(best) + + +def _compute_best_buddy_extras_from_index(*, account_dir: Path, year: int, buddy_username: str) -> dict[str, Any]: + """Compute a few extra fields for Card07 Bento summary. + + - longestStreakDays: longest consecutive days with any interaction + - peakHour/peakHourLabel: most active hour of day with this buddy + + Best-effort: returns empty dict on any failure. + """ + + buddy = str(buddy_username or "").strip() + if not buddy: + return {} + + index_path = get_chat_search_index_db_path(account_dir) + if not index_path.exists(): + return {} + + start_ts, end_ts = _year_range_epoch_seconds(int(year)) + + ts_expr = ( + "CASE " + "WHEN CAST(create_time AS INTEGER) > 1000000000000 " + "THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) " + "ELSE CAST(create_time AS INTEGER) " + "END" + ) + where = ( + f"{ts_expr} >= ? AND {ts_expr} < ? " + "AND db_stem NOT LIKE 'biz_message%' " + "AND CAST(local_type AS INTEGER) != 10000 " + "AND username = ? " + "AND username NOT LIKE '%@chatroom'" + ) + + sql_days = ( + "SELECT DISTINCT " + "CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS doy " + "FROM (" + f" SELECT {ts_expr} AS ts " + " FROM message_fts " + f" WHERE {where}" + ") sub " + "WHERE ts > 0 " + "ORDER BY doy ASC" + ) + + sql_peak_hour = ( + "SELECT " + "CAST(strftime('%H', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) AS h, " + "COUNT(1) AS cnt " + "FROM (" + f" SELECT {ts_expr} AS ts " + " FROM message_fts " + f" WHERE {where}" + ") sub " + "WHERE ts > 0 " + "GROUP BY h " + "ORDER BY cnt DESC, h ASC " + "LIMIT 1" + ) + + conn = sqlite3.connect(str(index_path)) + try: + has_fts = ( + conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone() + is not None + ) + if not has_fts: + return {} + + params = (start_ts, end_ts, buddy) + + doys: list[int] = [] + try: + rows = conn.execute(sql_days, params).fetchall() + except Exception: + rows = [] + for r in rows: + if not r or r[0] is None: + continue + try: + doys.append(int(r[0])) + except Exception: + continue + + longest_streak_days = _compute_streak_days(doys) + + peak_hour: int | None = None + try: + row = conn.execute(sql_peak_hour, params).fetchone() + if row and row[0] is not None: + peak_hour = int(row[0]) + except Exception: + peak_hour = None + + out: dict[str, Any] = {"longestStreakDays": int(longest_streak_days)} + if peak_hour is not None and 0 <= peak_hour <= 23: + out["peakHour"] = int(peak_hour) + out["peakHourLabel"] = f"{int(peak_hour):02d}:00" + return out + except Exception: + return {} + finally: + try: + conn.close() + except Exception: + pass + + @dataclass class _ConvAgg: username: str @@ -125,6 +255,9 @@ def compute_reply_speed_stats(*, account_dir: Path, year: int) -> dict[str, Any] global_slowest: int | None = None global_slowest_u: str | None = None + reply_gaps: list[int] = [] + reply_stats: dict[str, Any] | None = None + best_score = -1.0 best_agg: _ConvAgg | None = None @@ -287,6 +420,7 @@ def compute_reply_speed_stats(*, account_dir: Path, year: int) -> dict[str, Any] total_replies += 1 sum_gap += gap sum_gap_capped += min(gap, gap_cap_seconds) + reply_gaps.append(int(gap)) if replies == 1 or gap < min_gap: min_gap = gap @@ -323,6 +457,20 @@ def compute_reply_speed_stats(*, account_dir: Path, year: int) -> dict[str, Any] except Exception: pass + if reply_gaps: + try: + reply_gaps.sort() + n = int(len(reply_gaps)) + # Nearest-rank quantiles (deterministic, integer seconds). + p50_idx = max(0, min(n - 1, int(math.ceil(0.50 * n) - 1))) + p90_idx = max(0, min(n - 1, int(math.ceil(0.90 * n) - 1))) + reply_stats = { + "p50Seconds": int(reply_gaps[p50_idx]), + "p90Seconds": int(reply_gaps[p90_idx]), + } + except Exception: + reply_stats = None + # -------- Fallback path: no index -------- # Best-effort: if the index doesn't exist / isn't ready, auto-start building it (async) so user can # retry this page later. We intentionally do NOT block here. @@ -406,6 +554,14 @@ def compute_reply_speed_stats(*, account_dir: Path, year: int) -> dict[str, Any] best_buddy_obj = None if best_agg is not None: best_buddy_obj = conv_to_obj(best_score, best_agg) + if used_index and isinstance(best_buddy_obj, dict) and best_buddy_obj.get("username"): + extras = _compute_best_buddy_extras_from_index( + account_dir=account_dir, + year=int(year), + buddy_username=str(best_buddy_obj.get("username") or ""), + ) + if extras: + best_buddy_obj.update(extras) fastest_obj = None if global_fastest is not None and global_fastest_u: @@ -645,6 +801,7 @@ def compute_reply_speed_stats(*, account_dir: Path, year: int) -> dict[str, Any] "year": int(year), "sentToContacts": int(len(sent_to_contacts)), "replyEvents": int(total_replies), + "replyStats": reply_stats, "fastestReplySeconds": int(global_fastest) if global_fastest is not None else None, "longestReplySeconds": int(global_slowest) if global_slowest is not None else None, "bestBuddy": best_buddy_obj, diff --git a/src/wechat_decrypt_tool/wrapped/cards/card_07_bento_summary.py b/src/wechat_decrypt_tool/wrapped/cards/card_07_bento_summary.py index 2d97054..2727582 100644 --- a/src/wechat_decrypt_tool/wrapped/cards/card_07_bento_summary.py +++ b/src/wechat_decrypt_tool/wrapped/cards/card_07_bento_summary.py @@ -147,6 +147,77 @@ def build_card_07_bento_summary_from_sources( top_unicode_emoji = _pick_str(x0.get("emoji"), "") top_unicode_emoji_count = _pick_int(x0.get("count"), 0) + # "Top emoji" should be picked across both unicode emoji and WeChat built-in emoji. + # The deck has a separate "sticker" card; here we focus on emoji-like items. + top_emoji: dict[str, Any] | None = None + emoji_candidates: list[dict[str, Any]] = [] + + top_wechat_emojis = emoji_d.get("topWechatEmojis") + if isinstance(top_wechat_emojis, list) and top_wechat_emojis: + for item in top_wechat_emojis: + if not isinstance(item, dict): + continue + key = _pick_str(item.get("key"), "") + cnt = _pick_int(item.get("count"), 0) + if key and cnt > 0: + emoji_candidates.append( + { + "kind": "wechat", + "key": key, + "count": cnt, + "assetPath": _pick_str(item.get("assetPath"), ""), + } + ) + + top_text_emojis = emoji_d.get("topTextEmojis") + if isinstance(top_text_emojis, list) and top_text_emojis: + for item in top_text_emojis: + if not isinstance(item, dict): + continue + key = _pick_str(item.get("key"), "") + cnt = _pick_int(item.get("count"), 0) + if key and cnt > 0: + emoji_candidates.append( + { + "kind": "wechat", + "key": key, + "count": cnt, + "assetPath": _pick_str(item.get("assetPath"), ""), + } + ) + + if isinstance(top_unicode_emojis, list) and top_unicode_emojis: + for item in top_unicode_emojis: + if not isinstance(item, dict): + continue + emo = _pick_str(item.get("emoji"), "") + cnt = _pick_int(item.get("count"), 0) + if emo and cnt > 0: + emoji_candidates.append({"kind": "unicode", "emoji": emo, "count": cnt}) + + if emoji_candidates: + best = max( + emoji_candidates, + key=lambda x: ( + _pick_int(x.get("count"), 0), + 1 if str(x.get("kind")) == "wechat" else 0, + _pick_str(x.get("key") or x.get("emoji"), ""), + ), + ) + if str(best.get("kind")) == "wechat": + top_emoji = { + "kind": "wechat", + "key": _pick_str(best.get("key"), ""), + "count": _pick_int(best.get("count"), 0), + "assetPath": _pick_str(best.get("assetPath"), ""), + } + else: + top_emoji = { + "kind": "unicode", + "emoji": _pick_str(best.get("emoji"), ""), + "count": _pick_int(best.get("count"), 0), + } + monthly_best_buddies: list[dict[str, Any]] = [] months = monthly_d.get("months") if isinstance(months, list) and months: @@ -200,6 +271,7 @@ def build_card_07_bento_summary_from_sources( "topPhrase": top_phrase, "sentStickerCount": int(sent_sticker_count), "topSticker": top_sticker, + "topEmoji": top_emoji, "topUnicodeEmoji": top_unicode_emoji, "topUnicodeEmojiCount": int(top_unicode_emoji_count), "monthlyBestBuddies": monthly_best_buddies, diff --git a/src/wechat_decrypt_tool/wrapped/service.py b/src/wechat_decrypt_tool/wrapped/service.py index 5adb17e..16b7c1c 100644 --- a/src/wechat_decrypt_tool/wrapped/service.py +++ b/src/wechat_decrypt_tool/wrapped/service.py @@ -28,7 +28,7 @@ logger = get_logger(__name__) # an older partial cache. _IMPLEMENTED_UPTO_ID = 7 # Bump this when we change card payloads/ordering while keeping the same implemented_upto. -_CACHE_VERSION = 24 +_CACHE_VERSION = 26 # "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card. diff --git a/tests/test_wrapped_bento_summary_top_emoji.py b/tests/test_wrapped_bento_summary_top_emoji.py new file mode 100644 index 0000000..8876bc3 --- /dev/null +++ b/tests/test_wrapped_bento_summary_top_emoji.py @@ -0,0 +1,116 @@ +import sys +import unittest +from pathlib import Path + +# Ensure "src/" is importable when running tests from repo root. +ROOT = Path(__file__).resolve().parents[1] +sys.path.insert(0, str(ROOT / "src")) + + +class TestWrappedBentoSummaryTopEmoji(unittest.TestCase): + def _build_sources(self, *, emoji_data): + # Keep sources minimal: card_07_bento_summary only needs a handful of keys. + overview = {"data": {"totalMessages": 100, "addedFriends": 0}} + heatmap = {"data": {"totalMessages": 100, "weekdayLabels": [], "hourLabels": [], "matrix": []}} + message_chars = {"data": {"sentChars": 0}} + reply_speed = {"data": {}} + monthly = {"data": {"months": []}} + emoji = {"data": emoji_data} + return overview, heatmap, message_chars, reply_speed, monthly, emoji + + def test_top_emoji_prefers_wechat_when_count_higher(self): + from wechat_decrypt_tool.wrapped.cards.card_07_bento_summary import build_card_07_bento_summary_from_sources + + overview, heatmap, message_chars, reply_speed, monthly, emoji = self._build_sources( + emoji_data={ + "topWechatEmojis": [{"key": "[微笑]", "count": 5, "assetPath": "/wxemoji/Expression_1@2x.png"}], + "topTextEmojis": [], + "topUnicodeEmojis": [{"emoji": "🙂", "count": 2}], + } + ) + card = build_card_07_bento_summary_from_sources( + year=2025, + overview=overview, + heatmap=heatmap, + message_chars=message_chars, + reply_speed=reply_speed, + monthly=monthly, + emoji=emoji, + ) + snap = card["data"]["snapshot"] + self.assertEqual(snap["topEmoji"]["kind"], "wechat") + self.assertEqual(snap["topEmoji"]["key"], "[微笑]") + self.assertEqual(snap["topEmoji"]["count"], 5) + self.assertTrue(str(snap["topEmoji"]["assetPath"]).startswith("/wxemoji/")) + + def test_top_emoji_prefers_unicode_when_count_higher(self): + from wechat_decrypt_tool.wrapped.cards.card_07_bento_summary import build_card_07_bento_summary_from_sources + + overview, heatmap, message_chars, reply_speed, monthly, emoji = self._build_sources( + emoji_data={ + "topWechatEmojis": [{"key": "[微笑]", "count": 5, "assetPath": "/wxemoji/Expression_1@2x.png"}], + "topTextEmojis": [], + "topUnicodeEmojis": [{"emoji": "🙂", "count": 9}], + } + ) + card = build_card_07_bento_summary_from_sources( + year=2025, + overview=overview, + heatmap=heatmap, + message_chars=message_chars, + reply_speed=reply_speed, + monthly=monthly, + emoji=emoji, + ) + snap = card["data"]["snapshot"] + self.assertEqual(snap["topEmoji"]["kind"], "unicode") + self.assertEqual(snap["topEmoji"]["emoji"], "🙂") + self.assertEqual(snap["topEmoji"]["count"], 9) + + def test_top_emoji_includes_top_text_emojis(self): + from wechat_decrypt_tool.wrapped.cards.card_07_bento_summary import build_card_07_bento_summary_from_sources + + overview, heatmap, message_chars, reply_speed, monthly, emoji = self._build_sources( + emoji_data={ + "topWechatEmojis": [{"key": "[表情1]", "count": 2, "assetPath": "/wxemoji/Expression_1@2x.png"}], + "topTextEmojis": [{"key": "[嘿哈]", "count": 4, "assetPath": "/wxemoji/Expression_99@2x.png"}], + "topUnicodeEmojis": [{"emoji": "🙂", "count": 3}], + } + ) + card = build_card_07_bento_summary_from_sources( + year=2025, + overview=overview, + heatmap=heatmap, + message_chars=message_chars, + reply_speed=reply_speed, + monthly=monthly, + emoji=emoji, + ) + snap = card["data"]["snapshot"] + self.assertEqual(snap["topEmoji"]["kind"], "wechat") + self.assertEqual(snap["topEmoji"]["key"], "[嘿哈]") + self.assertEqual(snap["topEmoji"]["count"], 4) + self.assertTrue(str(snap["topEmoji"]["assetPath"]).endswith("Expression_99@2x.png")) + + def test_top_emoji_none_when_no_emoji_stats(self): + from wechat_decrypt_tool.wrapped.cards.card_07_bento_summary import build_card_07_bento_summary_from_sources + + overview, heatmap, message_chars, reply_speed, monthly, emoji = self._build_sources( + emoji_data={"topWechatEmojis": [], "topTextEmojis": [], "topUnicodeEmojis": []} + ) + card = build_card_07_bento_summary_from_sources( + year=2025, + overview=overview, + heatmap=heatmap, + message_chars=message_chars, + reply_speed=reply_speed, + monthly=monthly, + emoji=emoji, + ) + snap = card["data"]["snapshot"] + self.assertIsNone(snap.get("topEmoji")) + + +if __name__ == "__main__": + unittest.main() +