mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
improvement(wrapped): 精简全局概览热力图,移除 highlights 标记
- 移除 annualHeatmap highlights 的扫描与计算逻辑,降低卡片计算复杂度。 - card_00_global_overview 固定返回 highlights: [],保持热力图主体数据不变。 - 清理不再使用的辅助函数与正则依赖。
This commit is contained in:
@@ -6,7 +6,7 @@ import sqlite3
|
||||
import time
|
||||
from collections import Counter
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any, Optional
|
||||
|
||||
@@ -28,7 +28,6 @@ logger = get_logger(__name__)
|
||||
|
||||
|
||||
_MD5_HEX_RE = re.compile(r"(?i)[0-9a-f]{32}")
|
||||
_EMOJI_CHAR_RE = re.compile(r"[\U0001F300-\U0001FAFF\u2600-\u27BF]")
|
||||
# Best-effort heuristics for "new friends added" detection: WeChat system messages vary by version.
|
||||
_ADDED_FRIEND_PATTERNS: tuple[str, ...] = (
|
||||
"你已添加了",
|
||||
@@ -296,284 +295,6 @@ def compute_annual_daily_counts(*, account_dir: Path, year: int, sender_username
|
||||
return counts
|
||||
|
||||
|
||||
def _ymd_from_doy(*, year: int, doy: int) -> str:
|
||||
try:
|
||||
dt = datetime(int(year), 1, 1) + timedelta(days=int(doy))
|
||||
except Exception:
|
||||
return ""
|
||||
return dt.strftime("%Y-%m-%d")
|
||||
|
||||
|
||||
def _best_doy_by_max(values: list[int]) -> tuple[int, int] | tuple[None, int]:
|
||||
best_doy: int | None = None
|
||||
best = 0
|
||||
for i, v in enumerate(values):
|
||||
try:
|
||||
n = int(v or 0)
|
||||
except Exception:
|
||||
n = 0
|
||||
if n > best or (n == best and best_doy is not None and i < best_doy):
|
||||
best = n
|
||||
best_doy = i
|
||||
return best_doy, best
|
||||
|
||||
|
||||
def compute_annual_heatmap_highlights(
|
||||
*,
|
||||
account_dir: Path,
|
||||
year: int,
|
||||
sender_username: str,
|
||||
sent_daily_counts: list[int],
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Compute special day highlights for the annual calendar heatmap (best-effort).
|
||||
|
||||
We prefer the unified search index when available; fallback mode returns the subset
|
||||
that can be derived from `sent_daily_counts` without scanning all message content.
|
||||
"""
|
||||
|
||||
days = int(len(sent_daily_counts) or _days_in_year(year))
|
||||
out: list[dict[str, Any]] = []
|
||||
|
||||
def add_highlight(
|
||||
*,
|
||||
key: str,
|
||||
label: str,
|
||||
doy: int,
|
||||
value: int | None = None,
|
||||
value_label: str = "",
|
||||
date: str = "",
|
||||
) -> None:
|
||||
try:
|
||||
d = int(doy)
|
||||
except Exception:
|
||||
return
|
||||
if d < 0 or d >= days:
|
||||
return
|
||||
ymd = str(date or "") or _ymd_from_doy(year=int(year), doy=d)
|
||||
if not ymd:
|
||||
return
|
||||
obj: dict[str, Any] = {
|
||||
"key": str(key),
|
||||
"label": str(label),
|
||||
"doy": int(d),
|
||||
"date": ymd,
|
||||
}
|
||||
if value is not None:
|
||||
try:
|
||||
obj["value"] = int(value)
|
||||
except Exception:
|
||||
pass
|
||||
if value_label:
|
||||
obj["valueLabel"] = str(value_label)
|
||||
out.append(obj)
|
||||
|
||||
# 3. Sent messages max day (always available from sent daily counts)
|
||||
best_doy, best_val = _best_doy_by_max(sent_daily_counts or [])
|
||||
if best_doy is not None and best_val > 0:
|
||||
add_highlight(key="sent_messages_max", label="发送消息条数最多的一天", doy=int(best_doy), value=int(best_val), value_label=f"{int(best_val)} 条")
|
||||
|
||||
sender = str(sender_username or "").strip()
|
||||
if not sender:
|
||||
return out
|
||||
|
||||
index_path = get_chat_search_index_db_path(account_dir)
|
||||
if not index_path.exists():
|
||||
return out
|
||||
|
||||
start_ts, end_ts = _year_range_epoch_seconds(year)
|
||||
|
||||
conn = sqlite3.connect(str(index_path))
|
||||
try:
|
||||
has_fts = (
|
||||
conn.execute("SELECT 1 FROM sqlite_master WHERE type='table' AND name='message_fts' LIMIT 1").fetchone()
|
||||
is not None
|
||||
)
|
||||
if not has_fts:
|
||||
return out
|
||||
|
||||
# Convert millisecond timestamps defensively (some datasets store ms).
|
||||
ts_expr = (
|
||||
"CASE "
|
||||
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
|
||||
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
|
||||
"ELSE CAST(create_time AS INTEGER) "
|
||||
"END"
|
||||
)
|
||||
|
||||
base_where = f"{ts_expr} >= ? AND {ts_expr} < ? AND db_stem NOT LIKE 'biz_message%'"
|
||||
base_params: tuple[Any, ...] = (start_ts, end_ts)
|
||||
|
||||
def fetch_best_doy_value(sql: str, params: tuple[Any, ...]) -> tuple[int, int] | None:
|
||||
try:
|
||||
r = conn.execute(sql, params).fetchone()
|
||||
except Exception:
|
||||
r = None
|
||||
if not r:
|
||||
return None
|
||||
try:
|
||||
doy = int(r[0] if r[0] is not None else -1)
|
||||
val = int(r[1] or 0)
|
||||
except Exception:
|
||||
return None
|
||||
if val <= 0 or doy < 0 or doy >= days:
|
||||
return None
|
||||
return doy, val
|
||||
|
||||
# 1. Sent chars max day (text messages only, non-whitespace approximation)
|
||||
char_expr = (
|
||||
"length(replace(replace(replace(replace(coalesce(text,''),' ',''), char(10), ''), char(13), ''), char(9), ''))"
|
||||
)
|
||||
sql_sent_chars = (
|
||||
"SELECT doy, chars FROM ("
|
||||
" SELECT CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
f" SUM({char_expr}) AS chars "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts, text "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} AND sender_username = ? AND CAST(local_type AS INTEGER) = 1"
|
||||
" ) sub "
|
||||
" GROUP BY doy"
|
||||
") t ORDER BY chars DESC, doy ASC LIMIT 1"
|
||||
)
|
||||
r_sent_chars = fetch_best_doy_value(sql_sent_chars, base_params + (sender,))
|
||||
if r_sent_chars is not None:
|
||||
doy, v = r_sent_chars
|
||||
add_highlight(key="sent_chars_max", label="发送字最多的一天", doy=doy, value=v, value_label=f"{v} 字")
|
||||
|
||||
# 2. Received chars max day (text messages only)
|
||||
sql_recv_chars = (
|
||||
"SELECT doy, chars FROM ("
|
||||
" SELECT CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
f" SUM({char_expr}) AS chars "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts, text "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} AND COALESCE(sender_username,'') != ? AND CAST(local_type AS INTEGER) = 1"
|
||||
" ) sub "
|
||||
" GROUP BY doy"
|
||||
") t ORDER BY chars DESC, doy ASC LIMIT 1"
|
||||
)
|
||||
r_recv_chars = fetch_best_doy_value(sql_recv_chars, base_params + (sender,))
|
||||
if r_recv_chars is not None:
|
||||
doy, v = r_recv_chars
|
||||
add_highlight(key="received_chars_max", label="接收字最多的一天", doy=doy, value=v, value_label=f"{v} 字")
|
||||
|
||||
# 4. Received message count max day (exclude system messages)
|
||||
sql_recv_msgs = (
|
||||
"SELECT doy, cnt FROM ("
|
||||
" SELECT CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
" COUNT(1) AS cnt "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} AND COALESCE(sender_username,'') != ? AND CAST(local_type AS INTEGER) != 10000"
|
||||
" ) sub "
|
||||
" GROUP BY doy"
|
||||
") t ORDER BY cnt DESC, doy ASC LIMIT 1"
|
||||
)
|
||||
r_recv_msgs = fetch_best_doy_value(sql_recv_msgs, base_params + (sender,))
|
||||
if r_recv_msgs is not None:
|
||||
doy, v = r_recv_msgs
|
||||
add_highlight(key="received_messages_max", label="接收消息条数最多的一天", doy=doy, value=v, value_label=f"{v} 条")
|
||||
|
||||
# 5. Added friends max day (best-effort via system message patterns, exclude official/chatrooms)
|
||||
added_like_patterns = [f"%{p}%" for p in _ADDED_FRIEND_PATTERNS if str(p or "").strip()]
|
||||
if added_like_patterns:
|
||||
cond_added = " OR ".join(["text LIKE ?"] * len(added_like_patterns))
|
||||
sql_added = (
|
||||
"SELECT doy, cnt FROM ("
|
||||
" SELECT CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
" COUNT(DISTINCT username) AS cnt "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts, username, text "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} "
|
||||
" AND CAST(local_type AS INTEGER) = 10000 "
|
||||
" AND COALESCE(is_official, 0) = 0 "
|
||||
" AND username NOT LIKE '%@chatroom' "
|
||||
f" AND ({cond_added})"
|
||||
" ) sub "
|
||||
" GROUP BY doy"
|
||||
") t ORDER BY cnt DESC, doy ASC LIMIT 1"
|
||||
)
|
||||
params_added: tuple[Any, ...] = base_params + tuple(added_like_patterns)
|
||||
r_added = fetch_best_doy_value(sql_added, params_added)
|
||||
if r_added is not None:
|
||||
doy, v = r_added
|
||||
add_highlight(key="added_friends_max", label="加好友最多的一天", doy=doy, value=v, value_label=f"{v} 位")
|
||||
|
||||
# 6. Sticker/emoji messages max day (WeChat local_type=47)
|
||||
sql_emoji_msgs = (
|
||||
"SELECT doy, cnt FROM ("
|
||||
" SELECT CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
" COUNT(1) AS cnt "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} AND sender_username = ? AND CAST(local_type AS INTEGER) = 47"
|
||||
" ) sub "
|
||||
" GROUP BY doy"
|
||||
") t ORDER BY cnt DESC, doy ASC LIMIT 1"
|
||||
)
|
||||
r_emoji_msgs = fetch_best_doy_value(sql_emoji_msgs, base_params + (sender,))
|
||||
if r_emoji_msgs is not None:
|
||||
doy, v = r_emoji_msgs
|
||||
add_highlight(key="sticker_messages_max", label="发表情包最多的一天", doy=doy, value=v, value_label=f"{v} 条")
|
||||
|
||||
# 7. Unicode emoji chars max day (best-effort; count emoji codepoints in sent text)
|
||||
emoji_counts: list[int] = [0 for _ in range(days)]
|
||||
sql_emoji_text = (
|
||||
"SELECT doy, text FROM ("
|
||||
" SELECT "
|
||||
" CAST(strftime('%j', datetime(ts, 'unixepoch', 'localtime')) AS INTEGER) - 1 AS doy, "
|
||||
" text "
|
||||
" FROM ("
|
||||
f" SELECT {ts_expr} AS ts, text "
|
||||
" FROM message_fts "
|
||||
f" WHERE {base_where} AND sender_username = ? AND CAST(local_type AS INTEGER) = 1"
|
||||
" ) sub2"
|
||||
") sub "
|
||||
"WHERE text IS NOT NULL AND text != ''"
|
||||
)
|
||||
try:
|
||||
cur = conn.execute(sql_emoji_text, base_params + (sender,))
|
||||
except Exception:
|
||||
cur = None
|
||||
if cur is not None:
|
||||
for r in cur:
|
||||
try:
|
||||
doy = int(r[0] if r and r[0] is not None else -1)
|
||||
except Exception:
|
||||
continue
|
||||
if doy < 0 or doy >= days:
|
||||
continue
|
||||
try:
|
||||
txt = str(r[1] or "")
|
||||
except Exception:
|
||||
txt = ""
|
||||
if not txt:
|
||||
continue
|
||||
emoji_counts[doy] += len(_EMOJI_CHAR_RE.findall(txt))
|
||||
|
||||
best_emoji_doy, best_emoji = _best_doy_by_max(emoji_counts)
|
||||
if best_emoji_doy is not None and best_emoji > 0:
|
||||
add_highlight(
|
||||
key="emoji_chars_max",
|
||||
label="发emoji最多的一天",
|
||||
doy=int(best_emoji_doy),
|
||||
value=int(best_emoji),
|
||||
value_label=f"{int(best_emoji)} 个",
|
||||
)
|
||||
|
||||
finally:
|
||||
try:
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return out
|
||||
|
||||
|
||||
def _list_session_usernames(session_db_path: Path) -> list[str]:
|
||||
if not session_db_path.exists():
|
||||
return []
|
||||
@@ -1268,19 +989,14 @@ def build_card_00_global_overview(
|
||||
}
|
||||
|
||||
daily_counts = compute_annual_daily_counts(account_dir=account_dir, year=year, sender_username=sender)
|
||||
annual_highlights = compute_annual_heatmap_highlights(
|
||||
account_dir=account_dir,
|
||||
year=year,
|
||||
sender_username=sender,
|
||||
sent_daily_counts=daily_counts,
|
||||
)
|
||||
annual_heatmap = {
|
||||
"year": int(year),
|
||||
"startDate": f"{int(year)}-01-01",
|
||||
"endDate": f"{int(year)}-12-31",
|
||||
"days": int(len(daily_counts)),
|
||||
"dailyCounts": daily_counts,
|
||||
"highlights": annual_highlights,
|
||||
# Product decision: keep the calendar heatmap lightweight (no extra "best day" markers).
|
||||
"highlights": [],
|
||||
}
|
||||
|
||||
lines: list[str] = []
|
||||
|
||||
Reference in New Issue
Block a user