feat(wrapped): 年度关键词卡升级为年度常用语词云

- 新增常用语短句过滤与统计,按重复短句(非分词)生成词云数据。

- 引入常用语扫描元数据(scannedCandidates/matchedCandidates/uniquePhrases 等)。

- 示例语句抽样改为唯一优先,不足补齐重复,提升短句命中率。

- wrapped cache version 升级到 24,并补充常用语相关单元测试。
This commit is contained in:
2977094657
2026-02-22 22:18:43 +08:00
Unverified
parent e537c524f3
commit 236d0ae703
3 changed files with 384 additions and 32 deletions
@@ -41,6 +41,10 @@ _DATEISH_RE = re.compile(
r")$"
)
# Align with WeFlow Annual Report "年度常用语" logic.
# WeFlow counts repeated *phrases* (full short sent messages), not jieba tokens.
_WEFLOW_COMMON_PHRASE_LOCAL_TYPES = (1, 244813135921)
# Small but practical stopword list for chat keywords.
_STOPWORDS_ZH = {
"",
@@ -224,8 +228,8 @@ def _is_good_example_text(text: str) -> bool:
s = _clean_text(text)
if not s:
return False
# 仅过滤极短噪声,不对消息长度设置上限
if len(s) < 4:
# 常用语卡片需要保留短句(如“在吗”“好的”),仅过滤 1 字噪声
if len(s) < 2:
return False
if _URL_RE.search(s):
return False
@@ -341,7 +345,8 @@ def pick_examples(
*,
per_word: int = 3,
) -> list[dict[str, Any]]:
uniq_msgs = list(dict.fromkeys([_clean_text(x) for x in (message_pool or []) if _clean_text(x)]))
all_msgs = [_clean_text(x) for x in (message_pool or []) if _clean_text(x)]
uniq_msgs = list(dict.fromkeys(all_msgs))
out: list[dict[str, Any]] = []
for kw in keywords:
@@ -351,22 +356,28 @@ def pick_examples(
count = int(kw.get("count") or 0)
hits: list[str] = []
if _HAS_CJK_RE.search(word):
for msg in uniq_msgs:
if len(hits) >= int(per_word):
limit = max(1, int(per_word))
def _match(msg: str) -> bool:
if not _is_good_example_text(msg):
return False
if _HAS_CJK_RE.search(word):
return word in msg
return word.lower() in msg.lower()
# Pass 1: prefer unique samples for diversity.
for msg in uniq_msgs:
if len(hits) >= limit:
break
if _match(msg):
hits.append(msg)
# Pass 2: if still not enough, allow repeated samples from original pool.
if len(hits) < limit:
for msg in all_msgs:
if len(hits) >= limit:
break
if not _is_good_example_text(msg):
continue
if word in msg:
hits.append(msg)
else:
wlow = word.lower()
for msg in uniq_msgs:
if len(hits) >= int(per_word):
break
if not _is_good_example_text(msg):
continue
if wlow in msg.lower():
if _match(msg):
hits.append(msg)
out.append({"word": word, "count": int(count), "messages": hits})
@@ -407,6 +418,225 @@ def build_keywords_payload(
}
def _weflow_common_phrase_or_empty(text: Any) -> str:
"""
Match WeFlow "年度常用语" filter:
- Only short messages: 2 <= len <= 20
- Exclude links/markup: contains "http" or "<"
- Exclude bracketed / xml-like payloads: startswith "[" or "<?xml"
Note: We intentionally do NOT strip URLs or collapse whitespace to stay close to WeFlow.
"""
s = str(text or "")
if not s:
return ""
# Invisible chars are common noise across exports; removing them won't change visible text.
s = s.replace("\u200b", "").replace("\ufeff", "")
s = _CTRL_RE.sub("", s)
s = s.strip()
if not s:
return ""
if len(s) < 2 or len(s) > 20:
return ""
if "http" in s:
return ""
if "<" in s:
return ""
if s.startswith("[") or s.startswith("<?xml"):
return ""
return s
def build_common_phrases_payload(
*,
phrase_counts: Counter[str],
seed: int,
top_n: int = 32,
bubble_limit: int = 180,
example_texts: list[str] | None = None,
examples_per_word: int = 3,
) -> dict[str, Any]:
_ = seed # 保留参数以兼容现有调用/测试;气泡抽样不再使用固定 seed。
items = [(p, int(c)) for p, c in (phrase_counts or {}).items() if int(c) >= 2]
if not items:
return {"topKeyword": None, "keywords": [], "bubbleMessages": [], "examples": []}
items.sort(key=lambda kv: (-kv[1], kv[0]))
items = items[: max(0, int(top_n or 0))]
if not items:
return {"topKeyword": None, "keywords": [], "bubbleMessages": [], "examples": []}
vals = [math.sqrt(max(0, c)) for _, c in items]
minv = min(vals) if vals else 0.0
maxv = max(vals) if vals else 0.0
keywords: list[dict[str, Any]] = []
for (phrase, count), v in zip(items, vals):
if maxv <= minv:
weight = 1.0
else:
weight = 0.2 + 0.8 * ((v - minv) / (maxv - minv))
keywords.append({"word": phrase, "count": int(count), "weight": round(float(weight), 4)})
# Bubble pool: unique phrases (not all raw messages). Keep it diverse and lightweight.
bubble_candidates = list(dict.fromkeys([str(p or "").strip() for p in phrase_counts.keys()]))
bubble_candidates = [p for p in bubble_candidates if p]
rnd = random.SystemRandom()
rnd.shuffle(bubble_candidates)
bubble_messages = bubble_candidates[: max(0, int(bubble_limit or 0))]
# Examples: prefer real sampled messages; fallback to phrase itself.
if example_texts:
per_word = max(1, int(examples_per_word or 1))
examples = pick_examples(keywords, list(example_texts), per_word=per_word)
for ex in examples:
msgs = [str(m or "").strip() for m in (ex.get("messages") or []) if str(m or "").strip()]
if not msgs:
w = str(ex.get("word") or "").strip()
ex["messages"] = [w] if w else []
else:
ex["messages"] = msgs[:per_word]
else:
examples = [{"word": kw["word"], "count": int(kw["count"]), "messages": [kw["word"]]} for kw in keywords]
top_kw = {"word": str(keywords[0]["word"]), "count": int(keywords[0]["count"])} if keywords else None
return {
"topKeyword": top_kw,
"keywords": keywords,
"bubbleMessages": bubble_messages,
"examples": examples,
}
def _scan_common_phrase_counts(
*,
account_dir: Path,
year: int,
outgoing_only: bool,
seed: int,
max_seen: int | None = None,
) -> tuple[Counter[str], dict[str, Any]]:
start_ts, end_ts = _year_range_epoch_seconds(int(year))
_ = seed # 保留参数以兼容现有调用;扫描顺序不再使用随机。
db_paths = _iter_message_db_paths(account_dir)
# Prefer chat shards; biz_message often contains service/ads content.
db_paths = [p for p in db_paths if not p.name.lower().startswith("biz_message")]
phrase_counts: Counter[str] = Counter()
scanned = 0
matched = 0
capped = False
t0 = time.time()
for db_path in db_paths:
if not db_path.exists():
continue
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
my_rowid: int | None = None
if outgoing_only:
try:
r = conn.execute(
"SELECT rowid FROM Name2Id WHERE user_name = ? LIMIT 1",
(str(account_dir.name),),
).fetchone()
if r is not None and r[0] is not None:
my_rowid = int(r[0])
except Exception:
my_rowid = None
if my_rowid is None:
continue
tables = _list_message_tables(conn)
if not tables:
continue
tables.sort()
ts_expr = (
"CASE "
"WHEN CAST(create_time AS INTEGER) > 1000000000000 "
"THEN CAST(CAST(create_time AS INTEGER)/1000 AS INTEGER) "
"ELSE CAST(create_time AS INTEGER) "
"END"
)
local_types_csv = ",".join(str(int(x)) for x in _WEFLOW_COMMON_PHRASE_LOCAL_TYPES)
for table in tables:
if max_seen is not None and scanned >= int(max_seen):
capped = True
break
qt = _quote_ident(table)
where_sender = ""
params: tuple[Any, ...]
if outgoing_only and my_rowid is not None:
where_sender = " AND CAST(real_sender_id AS INTEGER) = ?"
params = (start_ts, end_ts, int(my_rowid))
else:
params = (start_ts, end_ts)
sql = (
"SELECT message_content, compress_content "
f"FROM {qt} "
f"WHERE CAST(local_type AS INTEGER) IN ({local_types_csv}) "
f" AND {ts_expr} >= ? AND {ts_expr} < ?"
f"{where_sender}"
)
try:
cur = conn.execute(sql, params)
except Exception:
continue
for r in cur:
if max_seen is not None and scanned >= int(max_seen):
capped = True
break
scanned += 1
try:
raw_txt = _decode_message_content(r["compress_content"], r["message_content"])
except Exception:
continue
phrase = _weflow_common_phrase_or_empty(raw_txt)
if not phrase:
continue
phrase_counts[phrase] += 1
matched += 1
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
if max_seen is not None and scanned >= int(max_seen):
break
elapsed = time.time() - t0
meta = {
"scannedCandidates": int(scanned),
"matchedCandidates": int(matched),
"uniquePhrases": int(len(phrase_counts)),
"capped": bool(capped),
"elapsedSec": round(float(elapsed), 3),
"localTypes": list(_WEFLOW_COMMON_PHRASE_LOCAL_TYPES),
}
return phrase_counts, meta
def _scan_message_pool(
*,
account_dir: Path,
@@ -532,24 +762,65 @@ def _scan_message_pool(
def build_card_05_keywords_wordcloud(*, account_dir: Path, year: int) -> dict[str, Any]:
title = "这一年,你把哪些说了一遍又一遍?"
title = "这一年,你把哪些说了一遍又一遍?"
seed = _stable_seed(str(account_dir.name or ""), int(year))
pool, meta = _scan_message_pool(account_dir=account_dir, year=year, outgoing_only=True, seed=seed)
if len(pool) < 80:
pool, meta = _scan_message_pool(account_dir=account_dir, year=year, outgoing_only=False, seed=seed ^ 0x1234)
phrase_counts, scan_meta = _scan_common_phrase_counts(
account_dir=account_dir,
year=year,
outgoing_only=True,
seed=seed,
)
# Fallback only when we cannot scan any candidate rows (e.g. Name2Id row missing).
if int(scan_meta.get("scannedCandidates") or 0) <= 0:
phrase_counts, scan_meta = _scan_common_phrase_counts(
account_dir=account_dir,
year=year,
outgoing_only=False,
seed=seed ^ 0x1234,
)
scan_meta["outgoingOnlyFallback"] = True
payload = build_keywords_payload(texts=pool, seed=seed)
example_pool: list[str] = []
pool_meta: dict[str, Any] = {}
if phrase_counts:
use_outgoing_only = not bool(scan_meta.get("outgoingOnlyFallback") or False)
example_pool, pool_meta = _scan_message_pool(
account_dir=account_dir,
year=year,
outgoing_only=use_outgoing_only,
seed=seed ^ 0x9E37,
max_pool=3000,
max_seen=120_000,
)
if (not example_pool) and use_outgoing_only:
example_pool, pool_meta = _scan_message_pool(
account_dir=account_dir,
year=year,
outgoing_only=False,
seed=seed ^ 0xA53C,
max_pool=3000,
max_seen=120_000,
)
pool_meta["outgoingOnlyFallback"] = True
payload = build_common_phrases_payload(
phrase_counts=phrase_counts,
seed=seed,
example_texts=example_pool,
examples_per_word=3,
)
logger.info(
"Wrapped card#6 keywords computed: account=%s year=%s keywords=%s bubble=%s scanned=%s sampled=%s elapsed=%.2fs",
"Wrapped card#6 common phrases computed: account=%s year=%s phrases=%s bubble=%s scanned=%s matched=%s capped=%s elapsed=%.2fs",
str(account_dir.name or "").strip(),
int(year),
len(payload.get("keywords") or []),
len(payload.get("bubbleMessages") or []),
int(meta.get("scannedMessages") or 0),
int(meta.get("sampledMessages") or 0),
float(meta.get("elapsedSec") or 0.0),
int(scan_meta.get("scannedCandidates") or 0),
int(scan_meta.get("matchedCandidates") or 0),
bool(scan_meta.get("capped") or False),
float(scan_meta.get("elapsedSec") or 0.0),
)
return {
@@ -559,14 +830,20 @@ def build_card_05_keywords_wordcloud(*, account_dir: Path, year: int) -> dict[st
"category": "C",
"status": "ok",
"kind": "text/keywords_wordcloud",
"narrative": "你的年度关键词词云",
"narrative": "你的年度常用语词云",
"data": {
"year": int(year),
**payload,
"meta": {
"scannedMessages": int(meta.get("scannedMessages") or 0),
"sampledMessages": int(meta.get("sampledMessages") or 0),
"sampleRate": float(meta.get("sampleRate") or 0.0),
"scannedCandidates": int(scan_meta.get("scannedCandidates") or 0),
"matchedCandidates": int(scan_meta.get("matchedCandidates") or 0),
"uniquePhrases": int(scan_meta.get("uniquePhrases") or 0),
"capped": bool(scan_meta.get("capped") or False),
"localTypes": list(scan_meta.get("localTypes") or []),
"outgoingOnlyFallback": bool(scan_meta.get("outgoingOnlyFallback") or False),
"examplePoolScannedMessages": int(pool_meta.get("scannedMessages") or 0),
"examplePoolSampledMessages": int(pool_meta.get("sampledMessages") or 0),
"examplePoolOutgoingOnlyFallback": bool(pool_meta.get("outgoingOnlyFallback") or False),
},
},
}
+1 -1
View File
@@ -27,7 +27,7 @@ logger = get_logger(__name__)
# an older partial cache.
_IMPLEMENTED_UPTO_ID = 6
# Bump this when we change card payloads/ordering while keeping the same implemented_upto.
_CACHE_VERSION = 23
_CACHE_VERSION = 24
# "Manifest" is used by the frontend to render the deck quickly, then lazily fetch each card.
+75
View File
@@ -8,6 +8,63 @@ sys.path.insert(0, str(ROOT / "src"))
class TestWrappedKeywordsWordCloud(unittest.TestCase):
def test_weflow_common_phrase_filter(self):
from wechat_decrypt_tool.wrapped.cards.card_05_keywords_wordcloud import _weflow_common_phrase_or_empty
self.assertEqual(_weflow_common_phrase_or_empty(" 在吗 "), "在吗")
self.assertEqual(_weflow_common_phrase_or_empty("ok"), "ok")
self.assertEqual(_weflow_common_phrase_or_empty("a"), "") # too short
self.assertEqual(_weflow_common_phrase_or_empty("x" * 21), "") # too long
self.assertEqual(_weflow_common_phrase_or_empty("看看 http://x.com"), "") # contains http
self.assertEqual(_weflow_common_phrase_or_empty("<msg>xml</msg>"), "") # contains "<"
self.assertEqual(_weflow_common_phrase_or_empty("[捂脸]"), "") # bracketed payload
self.assertEqual(_weflow_common_phrase_or_empty("<?xml version='1.0'?>"), "") # xml payload
def test_build_common_phrases_payload_structure(self):
from collections import Counter
from wechat_decrypt_tool.wrapped.cards.card_05_keywords_wordcloud import build_common_phrases_payload
phrase_counts = Counter({"好的": 5, "在吗": 2, "movie": 2, "单次": 1})
example_texts = [
"好的收到",
"好的好的,明白了",
"你好的呀",
"在吗宝贝",
"movie night is fun",
"MOVIE time now",
]
payload = build_common_phrases_payload(
phrase_counts=phrase_counts,
seed=123456,
top_n=32,
bubble_limit=50,
example_texts=example_texts,
examples_per_word=3,
)
self.assertIn("keywords", payload)
self.assertIn("bubbleMessages", payload)
self.assertIn("examples", payload)
self.assertIn("topKeyword", payload)
self.assertEqual(payload["topKeyword"]["word"], "好的")
self.assertEqual(int(payload["topKeyword"]["count"]), 5)
self.assertTrue(all(int(x.get("count") or 0) >= 2 for x in payload["keywords"]))
self.assertTrue(all(isinstance(x.get("word"), str) and x.get("word") for x in payload["keywords"]))
# Examples should contain real message samples with an upper bound.
for ex in payload["examples"]:
msgs = ex.get("messages") or []
self.assertGreaterEqual(len(msgs), 1)
self.assertLessEqual(len(msgs), 3)
word = str(ex.get("word") or "")
if any("\u4e00" <= ch <= "\u9fff" for ch in word):
self.assertTrue(any(word in str(m) for m in msgs))
else:
self.assertTrue(any(word.lower() in str(m).lower() for m in msgs))
def test_extract_keywords_jieba_basic(self):
from wechat_decrypt_tool.wrapped.cards.card_05_keywords_wordcloud import extract_keywords_jieba
@@ -96,6 +153,24 @@ class TestWrappedKeywordsWordCloud(unittest.TestCase):
m_movie = next(x for x in out if x["word"] == "movie")
self.assertTrue(all("movie" in m.lower() for m in m_movie["messages"]))
def test_pick_examples_short_phrase_can_fill_three(self):
from wechat_decrypt_tool.wrapped.cards.card_05_keywords_wordcloud import pick_examples
keywords = [{"word": "在吗", "count": 9, "weight": 1.0}]
pool = [
"在吗",
"在吗",
"在吗",
"在吗?",
"ok",
]
out = pick_examples(keywords, pool, per_word=3)
self.assertEqual(len(out), 1)
msgs = out[0]["messages"]
self.assertEqual(len(msgs), 3)
self.assertTrue(all("在吗" in m for m in msgs))
def test_build_keywords_payload_structure(self):
from wechat_decrypt_tool.wrapped.cards.card_05_keywords_wordcloud import build_keywords_payload