improvement(chat): realtime 直读 WCDB 并完善追踪日志

- SSE 变更扫描改用 asyncio.to_thread,避免阻塞事件循环

- sessions/messages 支持 source=realtime;realtime 下会话预览改用 session 信息避免缓存陈旧

- realtime sync/sync_all 增加 trace_id 与关键步骤日志,便于定位卡顿/锁竞争

- 支持通过 WECHAT_TOOL_LOG_LEVEL 环境变量覆盖日志级别
This commit is contained in:
2977094657
2026-01-24 18:47:06 +08:00
parent c0cddca307
commit 93ad7b7a1c
2 changed files with 306 additions and 30 deletions

View File

@@ -3,6 +3,7 @@
"""
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
@@ -58,6 +59,11 @@ class WeChatLogger:
def setup_logging(self, log_level: str = "INFO"):
"""设置日志配置"""
# Allow overriding via env var for easier debugging (e.g. WECHAT_TOOL_LOG_LEVEL=DEBUG)
env_level = str(os.environ.get("WECHAT_TOOL_LOG_LEVEL", "") or "").strip()
if env_level:
log_level = env_level
# 创建日志目录
now = datetime.now()
log_dir = Path("output/logs") / str(now.year) / f"{now.month:02d}" / f"{now.day:02d}"
@@ -88,46 +94,47 @@ class WeChatLogger:
# 文件处理器
file_handler = logging.FileHandler(self.log_file, encoding='utf-8')
file_handler.setFormatter(file_formatter)
file_handler.setLevel(getattr(logging, log_level.upper()))
level = getattr(logging, str(log_level or "INFO").upper(), logging.INFO)
file_handler.setLevel(level)
# 控制台处理器
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(console_formatter)
console_handler.setLevel(getattr(logging, log_level.upper()))
console_handler.setLevel(level)
# 配置根日志器
root_logger.setLevel(getattr(logging, log_level.upper()))
root_logger.setLevel(level)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
# 只为uvicorn日志器添加文件处理器保持其原有的控制台处理器带颜色
uvicorn_logger = logging.getLogger("uvicorn")
uvicorn_logger.addHandler(file_handler)
uvicorn_logger.setLevel(getattr(logging, log_level.upper()))
uvicorn_logger.setLevel(level)
# 只为uvicorn.access日志器添加文件处理器
uvicorn_access_logger = logging.getLogger("uvicorn.access")
uvicorn_access_logger.addHandler(file_handler)
uvicorn_access_logger.setLevel(getattr(logging, log_level.upper()))
uvicorn_access_logger.setLevel(level)
# 只为uvicorn.error日志器添加文件处理器
uvicorn_error_logger = logging.getLogger("uvicorn.error")
uvicorn_error_logger.addHandler(file_handler)
uvicorn_error_logger.setLevel(getattr(logging, log_level.upper()))
uvicorn_error_logger.setLevel(level)
# 配置FastAPI日志器
fastapi_logger = logging.getLogger("fastapi")
fastapi_logger.handlers = []
fastapi_logger.addHandler(file_handler)
fastapi_logger.addHandler(console_handler)
fastapi_logger.setLevel(getattr(logging, log_level.upper()))
fastapi_logger.setLevel(level)
# 记录初始化信息
logger = logging.getLogger(__name__)
logger.info("=" * 60)
logger.info("微信解密工具日志系统初始化完成")
logger.info(f"日志文件: {self.log_file}")
logger.info(f"日志级别: {log_level}")
logger.info(f"日志级别: {logging.getLevelName(level)}")
logger.info("=" * 60)
return self.log_file

View File

@@ -213,6 +213,13 @@ async def stream_chat_realtime_events(
if not db_storage_dir.exists() or not db_storage_dir.is_dir():
raise HTTPException(status_code=400, detail="db_storage directory not found for this account.")
logger.info(
"[realtime] SSE stream open account=%s interval_ms=%s db_storage=%s",
account_dir.name,
int(interval_ms),
str(db_storage_dir),
)
async def gen():
last_mtime_ns = 0
last_heartbeat = 0.0
@@ -226,27 +233,40 @@ async def stream_chat_realtime_events(
}
yield f"data: {json.dumps(initial, ensure_ascii=False)}\n\n"
while True:
if await request.is_disconnected():
break
try:
while True:
if await request.is_disconnected():
break
mtime_ns = _scan_db_storage_mtime_ns(db_storage_dir)
if mtime_ns and mtime_ns != last_mtime_ns:
last_mtime_ns = mtime_ns
payload = {
"type": "change",
"account": account_dir.name,
"mtimeNs": int(mtime_ns),
"ts": int(time.time() * 1000),
}
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
# Avoid blocking the event loop on a potentially large directory walk.
scan_t0 = time.perf_counter()
try:
mtime_ns = await asyncio.to_thread(_scan_db_storage_mtime_ns, db_storage_dir)
except Exception:
mtime_ns = 0
scan_ms = (time.perf_counter() - scan_t0) * 1000.0
if scan_ms > 1000:
logger.warning("[realtime] SSE scan slow account=%s ms=%.1f", account_dir.name, scan_ms)
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
yield ": ping\n\n"
if mtime_ns and mtime_ns != last_mtime_ns:
last_mtime_ns = mtime_ns
payload = {
"type": "change",
"account": account_dir.name,
"mtimeNs": int(mtime_ns),
"ts": int(time.time() * 1000),
}
logger.info("[realtime] SSE change account=%s mtime_ns=%s", account_dir.name, int(mtime_ns))
yield f"data: {json.dumps(payload, ensure_ascii=False)}\n\n"
await asyncio.sleep(interval_ms / 1000.0)
now = time.time()
if now - last_heartbeat > 15:
last_heartbeat = now
yield ": ping\n\n"
await asyncio.sleep(interval_ms / 1000.0)
finally:
logger.info("[realtime] SSE stream closed account=%s", account_dir.name)
headers = {"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"}
return StreamingResponse(gen(), media_type="text/event-stream", headers=headers)
@@ -337,7 +357,7 @@ def _ensure_session_last_message_table(conn: sqlite3.Connection) -> None:
@router.post("/api/chat/realtime/sync", summary="实时消息同步到解密库(按会话增量)")
async def sync_chat_realtime_messages(
def sync_chat_realtime_messages(
request: Request,
username: str,
account: Optional[str] = None,
@@ -357,11 +377,23 @@ async def sync_chat_realtime_messages(
max_scan = 5000
account_dir = _resolve_account_dir(account)
trace_id = f"rt-sync-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] realtime sync start account=%s username=%s max_scan=%s",
trace_id,
account_dir.name,
username,
int(max_scan),
)
# Lock per (account, username) to avoid concurrent writes to the same sqlite tables.
logger.info("[%s] acquiring per-session lock account=%s username=%s", trace_id, account_dir.name, username)
with _realtime_sync_lock(account_dir.name, username):
logger.info("[%s] per-session lock acquired account=%s username=%s", trace_id, account_dir.name, username)
try:
logger.info("[%s] ensure wcdb connected account=%s", trace_id, account_dir.name)
rt_conn = WCDB_REALTIME.ensure_connected(account_dir)
logger.info("[%s] wcdb connected account=%s handle=%s", trace_id, account_dir.name, int(rt_conn.handle))
except WCDBRealtimeError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -369,6 +401,14 @@ async def sync_chat_realtime_messages(
if not resolved:
raise HTTPException(status_code=404, detail="Conversation table not found in decrypted databases.")
msg_db_path, table_name = resolved
logger.info(
"[%s] resolved decrypted table account=%s username=%s db=%s table=%s",
trace_id,
account_dir.name,
username,
str(msg_db_path),
table_name,
)
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
@@ -457,8 +497,34 @@ async def sync_chat_realtime_messages(
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
logger.info(
"[%s] wcdb_get_messages account=%s username=%s take=%s offset=%s",
trace_id,
account_dir.name,
username,
int(take),
int(offset),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, username, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"[%s] wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
trace_id,
account_dir.name,
username,
len(raw_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"[%s] wcdb_get_messages slow account=%s username=%s ms=%.1f",
trace_id,
account_dir.name,
username,
wcdb_ms,
)
if not raw_rows:
break
@@ -526,9 +592,27 @@ async def sync_chat_realtime_messages(
# Insert older -> newer to keep sqlite btree locality similar to existing data.
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
insert_t0 = time.perf_counter()
msg_conn.executemany(insert_sql, values)
msg_conn.commit()
insert_ms = (time.perf_counter() - insert_t0) * 1000.0
inserted = len(new_rows)
logger.info(
"[%s] sqlite insert done account=%s username=%s inserted=%s ms=%.1f",
trace_id,
account_dir.name,
username,
int(inserted),
insert_ms,
)
if insert_ms > 1000:
logger.warning(
"[%s] sqlite insert slow account=%s username=%s ms=%.1f",
trace_id,
account_dir.name,
username,
insert_ms,
)
if ("packed_info_data" in insert_cols) and backfill_rows:
update_values = []
@@ -539,12 +623,30 @@ async def sync_chat_realtime_messages(
update_values.append((pdata, int(r.get("local_id") or 0)))
if update_values:
before_changes = msg_conn.total_changes
update_t0 = time.perf_counter()
msg_conn.executemany(
f"UPDATE {quoted_table} SET packed_info_data = ? WHERE local_id = ? AND (packed_info_data IS NULL OR length(packed_info_data) = 0)",
update_values,
)
msg_conn.commit()
update_ms = (time.perf_counter() - update_t0) * 1000.0
backfilled = int(msg_conn.total_changes - before_changes)
logger.info(
"[%s] sqlite backfill done account=%s username=%s rows=%s ms=%.1f",
trace_id,
account_dir.name,
username,
int(backfilled),
update_ms,
)
if update_ms > 1000:
logger.warning(
"[%s] sqlite backfill slow account=%s username=%s ms=%.1f",
trace_id,
account_dir.name,
username,
update_ms,
)
# Update session.db so left sidebar ordering/time can follow new messages.
newest = new_rows[0] if new_rows else None
@@ -636,6 +738,16 @@ async def sync_chat_realtime_messages(
finally:
sconn.close()
logger.info(
"[%s] realtime sync done account=%s username=%s scanned=%s inserted=%s backfilled=%s maxLocalIdBefore=%s",
trace_id,
account_dir.name,
username,
int(scanned),
int(inserted),
int(backfilled),
int(max_local_id),
)
return {
"status": "success",
"account": account_dir.name,
@@ -750,8 +862,31 @@ def _sync_chat_realtime_messages_for_table(
while scanned < int(max_scan):
take = min(batch_size, int(max_scan) - scanned)
logger.info(
"[realtime] wcdb_get_messages account=%s username=%s take=%s offset=%s",
account_dir.name,
username,
int(take),
int(offset),
)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_rows = _wcdb_get_messages(rt_conn.handle, username, limit=take, offset=offset)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"[realtime] wcdb_get_messages done account=%s username=%s rows=%s ms=%.1f",
account_dir.name,
username,
len(raw_rows or []),
wcdb_ms,
)
if wcdb_ms > 2000:
logger.warning(
"[realtime] wcdb_get_messages slow account=%s username=%s ms=%.1f",
account_dir.name,
username,
wcdb_ms,
)
if not raw_rows:
break
@@ -816,9 +951,25 @@ def _sync_chat_realtime_messages_for_table(
continue
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
insert_t0 = time.perf_counter()
msg_conn.executemany(insert_sql, values)
msg_conn.commit()
insert_ms = (time.perf_counter() - insert_t0) * 1000.0
inserted = len(new_rows)
logger.info(
"[realtime] sqlite insert done account=%s username=%s inserted=%s ms=%.1f",
account_dir.name,
username,
int(inserted),
insert_ms,
)
if insert_ms > 1000:
logger.warning(
"[realtime] sqlite insert slow account=%s username=%s ms=%.1f",
account_dir.name,
username,
insert_ms,
)
if ("packed_info_data" in insert_cols) and backfill_rows:
update_values = []
@@ -829,12 +980,28 @@ def _sync_chat_realtime_messages_for_table(
update_values.append((pdata, int(r.get("local_id") or 0)))
if update_values:
before_changes = msg_conn.total_changes
update_t0 = time.perf_counter()
msg_conn.executemany(
f"UPDATE {quoted_table} SET packed_info_data = ? WHERE local_id = ? AND (packed_info_data IS NULL OR length(packed_info_data) = 0)",
update_values,
)
msg_conn.commit()
update_ms = (time.perf_counter() - update_t0) * 1000.0
backfilled = int(msg_conn.total_changes - before_changes)
logger.info(
"[realtime] sqlite backfill done account=%s username=%s rows=%s ms=%.1f",
account_dir.name,
username,
int(backfilled),
update_ms,
)
if update_ms > 1000:
logger.warning(
"[realtime] sqlite backfill slow account=%s username=%s ms=%.1f",
account_dir.name,
username,
update_ms,
)
newest = new_rows[0] if new_rows else None
preview = ""
@@ -938,7 +1105,7 @@ def _sync_chat_realtime_messages_for_table(
@router.post("/api/chat/realtime/sync_all", summary="实时消息同步到解密库(全会话增量)")
async def sync_chat_realtime_messages_all(
def sync_chat_realtime_messages_all(
request: Request,
account: Optional[str] = None,
max_scan: int = 200,
@@ -953,6 +1120,16 @@ async def sync_chat_realtime_messages_all(
说明这是增量同步不会每次全表扫描priority_username 会优先同步并可设置更大的 priority_max_scan。
"""
account_dir = _resolve_account_dir(account)
trace_id = f"rt-syncall-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] realtime sync_all start account=%s max_scan=%s priority=%s include_hidden=%s include_official=%s",
trace_id,
account_dir.name,
int(max_scan),
str(priority_username or "").strip(),
bool(include_hidden),
bool(include_official),
)
if max_scan < 20:
max_scan = 20
@@ -966,15 +1143,29 @@ async def sync_chat_realtime_messages_all(
priority = str(priority_username or "").strip()
started = time.time()
logger.info("[%s] acquiring global sync lock account=%s", trace_id, account_dir.name)
with _realtime_sync_all_lock(account_dir.name):
logger.info("[%s] global sync lock acquired account=%s", trace_id, account_dir.name)
try:
logger.info("[%s] ensure wcdb connected account=%s", trace_id, account_dir.name)
rt_conn = WCDB_REALTIME.ensure_connected(account_dir)
logger.info("[%s] wcdb connected account=%s handle=%s", trace_id, account_dir.name, int(rt_conn.handle))
except WCDBRealtimeError as e:
raise HTTPException(status_code=400, detail=str(e))
try:
logger.info("[%s] wcdb_get_sessions account=%s", trace_id, account_dir.name)
wcdb_t0 = time.perf_counter()
with rt_conn.lock:
raw_sessions = _wcdb_get_sessions(rt_conn.handle)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"[%s] wcdb_get_sessions done account=%s sessions=%s ms=%.1f",
trace_id,
account_dir.name,
len(raw_sessions or []),
wcdb_ms,
)
except Exception:
raw_sessions = []
@@ -1018,6 +1209,13 @@ async def sync_chat_realtime_messages_all(
sessions = _dedupe(sessions)
sessions.sort(key=lambda x: int(x[0] or 0), reverse=True)
all_usernames = [u for _, u in sessions if u]
logger.info(
"[%s] sessions prepared account=%s raw=%s filtered=%s",
trace_id,
account_dir.name,
len(raw_sessions or []),
len(all_usernames),
)
# Skip sessions whose decrypted session.db already has a newer/equal sort_timestamp.
decrypted_ts_by_user: dict[str, int] = {}
@@ -1080,10 +1278,25 @@ async def sync_chat_realtime_messages_all(
continue
sync_usernames.append(u)
logger.info(
"[%s] sessions need_sync account=%s need_sync=%s skipped_up_to_date=%s",
trace_id,
account_dir.name,
len(sync_usernames),
int(skipped_up_to_date),
)
if priority and priority in sync_usernames:
sync_usernames = [priority] + [u for u in sync_usernames if u != priority]
table_map = _resolve_decrypted_message_tables(account_dir, sync_usernames)
logger.info(
"[%s] resolved decrypted tables account=%s resolved=%s need_sync=%s",
trace_id,
account_dir.name,
len(table_map),
len(sync_usernames),
)
scanned_total = 0
inserted_total = 0
@@ -1116,17 +1329,50 @@ async def sync_chat_realtime_messages_all(
inserted_total += ins
if ins:
updated_sessions += 1
logger.info(
"[%s] synced session account=%s username=%s inserted=%s scanned=%s",
trace_id,
account_dir.name,
uname,
ins,
int(result.get("scanned") or 0),
)
except HTTPException as e:
errors.append(f"{uname}: {str(e.detail or '')}".strip())
logger.warning(
"[%s] sync session failed account=%s username=%s err=%s",
trace_id,
account_dir.name,
uname,
str(e.detail or "").strip(),
)
continue
except Exception as e:
errors.append(f"{uname}: {str(e)}".strip())
logger.exception(
"[%s] sync session crashed account=%s username=%s",
trace_id,
account_dir.name,
uname,
)
continue
elapsed_ms = int((time.time() - started) * 1000)
if len(errors) > 20:
errors = errors[:20] + [f"... and {len(errors) - 20} more"]
logger.info(
"[%s] realtime sync_all done account=%s sessions_total=%s need_sync=%s synced=%s updated=%s inserted_total=%s elapsed_ms=%s errors=%s",
trace_id,
account_dir.name,
len(all_usernames),
len(sync_usernames),
int(synced),
int(updated_sessions),
int(inserted_total),
int(elapsed_ms),
len(errors),
)
return {
"status": "success",
"account": account_dir.name,
@@ -2134,7 +2380,7 @@ async def list_chat_accounts():
@router.get("/api/chat/sessions", summary="获取会话列表(聊天左侧列表)")
async def list_chat_sessions(
def list_chat_sessions(
request: Request,
account: Optional[str] = None,
limit: int = 400,
@@ -2157,10 +2403,32 @@ async def list_chat_sessions(
rows: list[Any]
if source_norm == "realtime":
trace_id = f"rt-sessions-{int(time.time() * 1000)}-{threading.get_ident()}"
logger.info(
"[%s] list_sessions realtime start account=%s limit=%s include_hidden=%s include_official=%s preview=%s",
trace_id,
account_dir.name,
int(limit),
bool(include_hidden),
bool(include_official),
str(preview or ""),
)
try:
logger.info("[%s] ensure wcdb connected account=%s", trace_id, account_dir.name)
conn = WCDB_REALTIME.ensure_connected(account_dir)
logger.info("[%s] wcdb connected account=%s handle=%s", trace_id, account_dir.name, int(conn.handle))
logger.info("[%s] wcdb_get_sessions account=%s", trace_id, account_dir.name)
wcdb_t0 = time.perf_counter()
with conn.lock:
raw = _wcdb_get_sessions(conn.handle)
wcdb_ms = (time.perf_counter() - wcdb_t0) * 1000.0
logger.info(
"[%s] wcdb_get_sessions done account=%s sessions=%s ms=%.1f",
trace_id,
account_dir.name,
len(raw or []),
wcdb_ms,
)
except WCDBRealtimeError as e:
raise HTTPException(status_code=400, detail=str(e))
@@ -2193,6 +2461,7 @@ async def list_chat_sessions(
norm.sort(key=lambda r: _ts(r.get("sort_timestamp")), reverse=True)
rows = norm
logger.info("[%s] list_sessions realtime normalized account=%s rows=%s", trace_id, account_dir.name, len(rows))
else:
session_db_path = account_dir / "session.db"
sconn = sqlite3.connect(str(session_db_path))
@@ -2873,7 +3142,7 @@ def _collect_chat_messages(
@router.get("/api/chat/messages", summary="获取会话消息列表")
async def list_chat_messages(
def list_chat_messages(
request: Request,
username: str,
account: Optional[str] = None,