feat: improve text chat record loading diagnostics

This commit is contained in:
2977094657
2026-04-03 18:39:55 +08:00
Unverified
parent 7ae152af33
commit 4ba2c75332
7 changed files with 564 additions and 36 deletions
+39 -2
View File
@@ -457,6 +457,33 @@ if (scope === 'global') {
}
return (text.charAt(0) || '?').toString()
}
const buildTransientSearchTargetContact = ({ username, displayName = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
if (!u) return null
const name = String(displayName || u).trim() || u
return {
id: u,
username: u,
name,
avatar: String(avatar || '').trim() || null,
avatarColor: '#4B5563',
lastMessage: '',
lastMessageTime: '',
unreadCount: 0,
isGroup: typeof isGroup === 'boolean' ? isGroup : u.endsWith('@chatroom'),
isTop: false
}
}
const resolveSearchTargetContact = ({ username, displayName = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
if (!u) return null
const existing = contacts.value.find((c) => String(c?.username || '').trim() === u)
if (existing) return existing
if (String(selectedContact.value?.username || '').trim() === u) return selectedContact.value
return buildTransientSearchTargetContact({ username: u, displayName, avatar, isGroup })
}
const searchContextBannerText = computed(() => {
if (!searchContext.value?.active) return ''
const kind = String(searchContext.value.kind || 'search')
@@ -986,7 +1013,12 @@ if (!hit?.id) return
const targetUsername = String(hit?.username || selectedContact.value?.username || '').trim()
if (!targetUsername) return
const targetContact = contacts.value.find((c) => c?.username === targetUsername)
const targetContact = resolveSearchTargetContact({
username: targetUsername,
displayName: String(hit?.conversationName || hit?.username || targetUsername).trim(),
avatar: String(hit?.conversationAvatar || hit?.senderAvatar || '').trim(),
isGroup: targetUsername.endsWith('@chatroom')
})
if (targetContact && selectedContact.value?.username !== targetUsername) {
await selectContact(targetContact, { skipLoadMessages: true })
}
@@ -1051,7 +1083,12 @@ const u = String(targetUsername || selectedContact.value?.username || '').trim()
const anchor = String(anchorId || '').trim()
if (!u || !anchor) return
const targetContact = contacts.value.find((c) => c?.username === u)
const targetContact = resolveSearchTargetContact({
username: u,
displayName: String(selectedContact.value?.name || u).trim(),
avatar: String(selectedContact.value?.avatar || '').trim(),
isGroup: u.endsWith('@chatroom')
})
if (targetContact && selectedContact.value?.username !== u) {
await selectContact(targetContact, { skipLoadMessages: true })
}
+39 -2
View File
@@ -119,6 +119,23 @@ const nextMessageLoadToken = () => {
return messageLoadSequence
}
const buildTransientContact = ({ username, name = '', avatar = '', isGroup = null } = {}) => {
const u = String(username || '').trim()
const displayName = String(name || u).trim() || u
return {
id: u,
username: u,
name: displayName,
avatar: String(avatar || '').trim() || null,
avatarColor: '#4B5563',
lastMessage: '',
lastMessageTime: '',
unreadCount: 0,
isGroup: typeof isGroup === 'boolean' ? isGroup : u.endsWith('@chatroom'),
isTop: false
}
}
const buildChatPath = (username) => {
return username ? `/chat/${encodeURIComponent(username)}` : '/chat'
}
@@ -334,14 +351,28 @@ const selectContact = async (contact, options = {}) => {
}
const applyRouteSelection = async (options = {}) => {
const selectionReason = String(options.reason || 'route-selection').trim() || 'route-selection'
const requested = routeUsername.value || ''
if ((!contacts.value || contacts.value.length === 0) && requested) {
if (selectedContact.value?.username === requested) {
return
}
await selectContact(buildTransientContact({ username: requested }), {
syncRoute: false,
deferLoadMessages: !!options.deferLoadMessages,
reason: `${selectionReason}:transient-route-empty-list`
})
return
}
if (!contacts.value || contacts.value.length === 0) {
selectedContact.value = null
return
}
const selectionReason = String(options.reason || 'route-selection').trim() || 'route-selection'
const requested = routeUsername.value || ''
if (requested) {
if (selectedContact.value?.username === requested) {
return
}
const matched = contacts.value.find((contact) => contact.username === requested)
if (matched) {
if (selectedContact.value?.username !== matched.username) {
@@ -353,6 +384,12 @@ const applyRouteSelection = async (options = {}) => {
}
return
}
await selectContact(buildTransientContact({ username: requested }), {
syncRoute: false,
deferLoadMessages: !!options.deferLoadMessages,
reason: `${selectionReason}:transient-route`
})
return
}
await selectContact(contacts.value[0], {
+83 -6
View File
@@ -14,6 +14,7 @@ from fastapi import HTTPException
from .app_paths import get_output_databases_dir
from .logging_config import get_logger
from .sqlite_diagnostics import collect_sqlite_diagnostics, format_sqlite_diagnostics
try:
import zstandard as zstd # type: ignore
@@ -1755,9 +1756,10 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
session_db_path = Path(account_dir) / "session.db"
if session_db_path.exists() and remaining:
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
sconn: Optional[sqlite3.Connection] = None
try:
sconn = sqlite3.connect(str(session_db_path))
sconn.row_factory = sqlite3.Row
uniq = list(dict.fromkeys([u for u in remaining if u]))
chunk_size = 900
for i in range(0, len(uniq), chunk_size):
@@ -1786,10 +1788,24 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
if not u:
continue
expected_ts_by_user[u] = int(r["last_timestamp"] or 0)
except sqlite3.DatabaseError as e:
expected_ts_by_user = {}
logger.warning(
"[sessions.preview] session timestamp lookup failed account=%s db=%s usernames=%s sample_usernames=%s error=%s diag=%s",
account_dir.name,
str(session_db_path),
len(remaining),
sorted([u for u in remaining if u])[:5],
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(session_db_path, quick_check=True, table_name="SessionTable")
),
)
except Exception:
expected_ts_by_user = {}
finally:
sconn.close()
if sconn is not None:
sconn.close()
if _DEBUG_SESSIONS:
logger.info(
@@ -1800,9 +1816,16 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
)
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn: Optional[sqlite3.Connection] = None
stage = "connect"
stage_username = ""
stage_table = ""
try:
stage = "connect"
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
stage = "sqlite_master"
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
names = [str(r[0]) for r in rows if r and r[0]]
lower_to_actual = {n.lower(): n for n in names}
@@ -1818,9 +1841,12 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
conn.text_factory = bytes
for u, tn in found.items():
stage_username = str(u)
stage_table = str(tn)
quoted = _quote_ident(tn)
try:
try:
stage = "latest_row_with_name2id"
r = conn.execute(
"SELECT "
"m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
@@ -1831,6 +1857,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"LIMIT 1"
).fetchone()
except Exception:
stage = "latest_row_without_name2id"
r = conn.execute(
"SELECT "
"local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
@@ -1838,6 +1865,20 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"ORDER BY sort_seq DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] latest row query failed account=%s db=%s username=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
str(db_path),
str(u),
str(tn),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=tn)
),
)
continue
except Exception as e:
if _DEBUG_SESSIONS:
logger.info(
@@ -1855,6 +1896,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
expected_ts = int(expected_ts_by_user.get(u) or 0)
if expected_ts > 0 and create_time > 0 and create_time < expected_ts:
try:
stage = "latest_row_by_create_time_with_name2id"
r2 = conn.execute(
"SELECT "
"m.local_type, m.message_content, m.compress_content, m.create_time, m.sort_seq, m.local_id, "
@@ -1866,6 +1908,7 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
).fetchone()
except Exception:
try:
stage = "latest_row_by_create_time_without_name2id"
r2 = conn.execute(
"SELECT "
"local_type, message_content, compress_content, create_time, sort_seq, local_id, '' AS sender_username "
@@ -1873,6 +1916,20 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
"ORDER BY COALESCE(create_time, 0) DESC, COALESCE(sort_seq, 0) DESC, local_id DESC "
"LIMIT 1"
).fetchone()
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] latest row requery failed account=%s db=%s username=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
str(db_path),
str(u),
str(tn),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=tn)
),
)
r2 = None
except Exception:
r2 = None
@@ -1900,8 +1957,28 @@ def _load_latest_message_previews(account_dir: Path, usernames: list[str]) -> di
prev = best.get(u)
if prev is None or sort_key > prev[0]:
best[u] = (sort_key, preview)
except sqlite3.DatabaseError as e:
logger.warning(
"[sessions.preview] malformed message db account=%s db=%s stage=%s username=%s table=%s remaining=%s sample_usernames=%s error=%s diag=%s",
account_dir.name,
str(db_path),
stage,
stage_username,
stage_table,
len(remaining),
sorted([u for u in remaining if u])[:5],
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(db_path, quick_check=True, table_name=(stage_table or None))
),
)
continue
finally:
conn.close()
if conn is not None:
try:
conn.close()
except Exception:
pass
previews = {u: v[1] for u, v in best.items() if v and v[1]}
if _DEBUG_SESSIONS:
+80 -8
View File
@@ -77,6 +77,7 @@ from ..session_last_message import (
get_session_last_message_status,
load_session_last_messages,
)
from ..sqlite_diagnostics import collect_sqlite_diagnostics, format_sqlite_diagnostics
from ..wcdb_realtime import (
WCDBRealtimeError,
WCDB_REALTIME,
@@ -2022,12 +2023,18 @@ def _sync_chat_realtime_messages_for_table(
if backfill_limit > max_scan:
backfill_limit = max_scan
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
msg_conn: Optional[sqlite3.Connection] = None
stage = "connect"
try:
stage = "connect"
msg_conn = sqlite3.connect(str(msg_db_path))
msg_conn.row_factory = sqlite3.Row
stage = "resolve_db_storage_paths"
msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem)
name2id_synced = False
try:
stage = "sync_name2id"
name2id_result = _sync_output_name2id_from_live(
msg_conn,
rt_conn=rt_conn,
@@ -2050,12 +2057,14 @@ def _sync_chat_realtime_messages_for_table(
)
quoted_table = _quote_ident(table_name)
stage = "max_local_id"
row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone()
try:
max_local_id = int((row["mx"] if row is not None else 0) or 0)
except Exception:
max_local_id = 0
stage = "pragma_table_info"
cols = msg_conn.execute(f"PRAGMA table_info({quoted_table})").fetchall()
available_cols = {str(c[1] or "") for c in cols}
base_cols = [
@@ -2075,6 +2084,7 @@ def _sync_chat_realtime_messages_for_table(
placeholders = ",".join(["?"] * len(insert_cols))
insert_sql = f"INSERT OR IGNORE INTO {quoted_table} ({','.join(insert_cols)}) VALUES ({placeholders})"
stage = "collect_realtime_rows"
fetch_result = _collect_realtime_rows_for_session(
trace_id=None,
account_name=account_dir.name,
@@ -2094,6 +2104,7 @@ def _sync_chat_realtime_messages_for_table(
backfilled = 0
if new_rows:
if not name2id_synced:
stage = "upsert_name2id_fallback"
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
@@ -2102,6 +2113,7 @@ def _sync_chat_realtime_messages_for_table(
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
insert_t0 = time.perf_counter()
stage = "insert_new_rows"
msg_conn.executemany(insert_sql, values)
msg_conn.commit()
insert_ms = (time.perf_counter() - insert_t0) * 1000.0
@@ -2131,6 +2143,7 @@ def _sync_chat_realtime_messages_for_table(
if update_values:
before_changes = msg_conn.total_changes
update_t0 = time.perf_counter()
stage = "backfill_packed_info"
msg_conn.executemany(
f"UPDATE {quoted_table} SET packed_info_data = ? WHERE local_id = ? AND (packed_info_data IS NULL OR length(packed_info_data) = 0)",
update_values,
@@ -2187,8 +2200,11 @@ def _sync_chat_realtime_messages_for_table(
if inserted and newest_ts:
session_db_path = account_dir / "session.db"
sconn = sqlite3.connect(str(session_db_path))
sconn: Optional[sqlite3.Connection] = None
try:
stage = "open_session_db"
sconn = sqlite3.connect(str(session_db_path))
stage = "update_session_table"
sconn.execute("INSERT OR IGNORE INTO SessionTable(username) VALUES (?)", (username,))
sconn.execute(
"""
@@ -2217,6 +2233,7 @@ def _sync_chat_realtime_messages_for_table(
),
)
stage = "update_session_last_message"
_ensure_session_last_message_table(sconn)
sconn.execute(
"""
@@ -2239,8 +2256,25 @@ def _sync_chat_realtime_messages_for_table(
),
)
sconn.commit()
except sqlite3.DatabaseError as e:
logger.warning(
"[realtime] malformed session db during sync account=%s username=%s session_db=%s stage=%s error=%s diag=%s",
account_dir.name,
username,
str(session_db_path),
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(session_db_path, quick_check=True, table_name="SessionTable")
),
)
raise HTTPException(
status_code=500,
detail=f"Malformed session db during realtime sync: {session_db_path.name}",
)
finally:
sconn.close()
if sconn is not None:
sconn.close()
return {
"username": username,
@@ -2250,8 +2284,23 @@ def _sync_chat_realtime_messages_for_table(
"backfilled": int(backfilled),
"preview": preview or "",
}
except sqlite3.DatabaseError as e:
logger.warning(
"[realtime] malformed decrypted message db account=%s username=%s db=%s table=%s stage=%s error=%s diag=%s",
account_dir.name,
username,
str(msg_db_path),
table_name,
stage,
str(e),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(msg_db_path, quick_check=True, table_name=table_name)
),
)
raise HTTPException(status_code=500, detail=f"Malformed decrypted message db: {msg_db_path.name}")
finally:
msg_conn.close()
if msg_conn is not None:
msg_conn.close()
@router.post("/api/chat/realtime/sync_all", summary="实时消息同步到解密库(全会话增量)")
@@ -2545,20 +2594,24 @@ def sync_chat_realtime_messages_all(
except HTTPException as e:
errors.append(f"{uname}: {str(e.detail or '')}".strip())
logger.warning(
"[%s] sync session failed account=%s username=%s err=%s",
"[%s] sync session failed account=%s username=%s db=%s table=%s err=%s",
trace_id,
account_dir.name,
uname,
str(msg_db_path),
str(table_name),
str(e.detail or "").strip(),
)
continue
except Exception as e:
errors.append(f"{uname}: {str(e)}".strip())
logger.exception(
"[%s] sync session crashed account=%s username=%s",
"[%s] sync session crashed account=%s username=%s db=%s table=%s",
trace_id,
account_dir.name,
uname,
str(msg_db_path),
str(table_name),
)
continue
@@ -4173,6 +4226,15 @@ def list_chat_sessions(
)
last_previews = load_session_last_messages(account_dir, usernames)
except Exception:
logger.exception(
"[sessions.list] session_last_message preview load failed account=%s preview_mode=%s usernames=%s diag=%s",
account_dir.name,
preview_mode,
len(usernames),
format_sqlite_diagnostics(
collect_sqlite_diagnostics(account_dir / "session.db", quick_check=True, table_name="session_last_message")
),
)
last_previews = {}
def _is_generic_location_preview(value: Any) -> bool:
@@ -4189,7 +4251,17 @@ def list_chat_sessions(
else [u for u in usernames if u and ((u not in last_previews) or _is_generic_location_preview(last_previews.get(u)))]
)
if targets:
legacy = _load_latest_message_previews(account_dir, targets)
try:
legacy = _load_latest_message_previews(account_dir, targets)
except Exception:
logger.exception(
"[sessions.list] legacy latest-message preview fallback failed account=%s preview_mode=%s targets=%s sample_targets=%s; falling back to session summaries",
account_dir.name,
preview_mode,
len(targets),
[str(u) for u in targets[:5]],
)
legacy = {}
for u, v in legacy.items():
if v:
last_previews[u] = v
+50 -12
View File
@@ -59,6 +59,12 @@ async def decrypt_databases(request: DecryptRequest):
raise HTTPException(status_code=400, detail=results["message"])
logger.info(f"解密完成: 成功 {results['successful_count']}/{results['total_databases']} 个数据库")
if int(results.get("diagnostic_warning_count") or 0) > 0:
logger.warning(
"解密完成但检测到诊断告警: warning_dbs=%s total=%s",
int(results.get("diagnostic_warning_count") or 0),
int(results.get("total_databases") or 0),
)
# 成功解密后,按账号保存数据库密钥(用于前端自动回填)
try:
@@ -77,6 +83,7 @@ async def decrypt_databases(request: DecryptRequest):
"processed_files": results["processed_files"],
"failed_files": results["failed_files"],
"account_results": results.get("account_results", {}),
"diagnostic_warning_count": int(results.get("diagnostic_warning_count") or 0),
}
except HTTPException:
@@ -160,6 +167,7 @@ async def decrypt_databases_stream(
processed_files: list[str] = []
failed_files: list[str] = []
account_results: dict = {}
diagnostic_warning_count = 0
overall_current = 0
for account, dbs in account_databases.items():
@@ -181,6 +189,8 @@ async def decrypt_databases_stream(
account_success = 0
account_processed: list[str] = []
account_failed: list[str] = []
account_db_diagnostics: dict[str, dict] = {}
account_diagnostic_warning_count = 0
for db_info in dbs:
if await request.is_disconnected():
@@ -223,6 +233,23 @@ async def decrypt_databases_stream(
ok = bool(task.result())
except Exception:
ok = False
db_diagnostic = dict(getattr(decryptor, "last_result", {}) or {})
if not db_diagnostic:
db_diagnostic = {
"db_path": str(db_path),
"db_name": str(db_name),
"output_path": str(output_path),
"success": bool(ok),
}
db_diagnostic["account"] = str(account)
account_db_diagnostics[db_name] = db_diagnostic
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
if ok:
account_success += 1
@@ -238,18 +265,25 @@ async def decrypt_databases_stream(
status = "fail"
msg = "解密失败"
yield _sse(
{
"type": "progress",
"current": overall_current,
"total": total_databases,
"success_count": success_count,
"fail_count": fail_count,
"current_file": current_file,
"status": status,
"message": msg,
}
)
payload = {
"type": "progress",
"current": overall_current,
"total": total_databases,
"success_count": success_count,
"fail_count": fail_count,
"current_file": current_file,
"status": status,
"message": msg,
}
if db_diagnostic:
payload["diagnostic_status"] = str(db_diagnostic.get("diagnostic_status") or "")
payload["page_failures"] = int(db_diagnostic.get("failed_pages") or 0)
if db_diagnostic.get("failed_page_samples"):
payload["failed_page_samples"] = db_diagnostic.get("failed_page_samples")
if db_diagnostic.get("diagnostics"):
payload["diagnostics"] = db_diagnostic.get("diagnostics")
yield _sse(payload)
if overall_current % 5 == 0:
await asyncio.sleep(0)
@@ -261,7 +295,10 @@ async def decrypt_databases_stream(
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed,
"db_diagnostics": account_db_diagnostics,
"diagnostic_warning_count": int(account_diagnostic_warning_count),
}
diagnostic_warning_count += int(account_diagnostic_warning_count)
# Build cache table (keep behavior consistent with the POST endpoint).
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
@@ -311,6 +348,7 @@ async def decrypt_databases_stream(
"processed_files": processed_files,
"failed_files": failed_files,
"account_results": account_results,
"diagnostic_warning_count": int(diagnostic_warning_count),
}
# Save db key for frontend autofill.
@@ -0,0 +1,150 @@
from __future__ import annotations
import json
import sqlite3
from pathlib import Path
from typing import Any, Mapping, Optional
SQLITE_HEADER = b"SQLite format 3\x00"
def _clean_text(value: Any, *, limit: int = 240) -> str:
text = " ".join(str(value or "").split()).strip()
if len(text) > limit:
return text[: limit - 3] + "..."
return text
def _clean_error(exc: BaseException, *, limit: int = 240) -> str:
text = _clean_text(exc, limit=limit)
if text:
return f"{type(exc).__name__}: {text}"
return type(exc).__name__
def _quote_ident(name: str) -> str:
return '"' + str(name or "").replace('"', '""') + '"'
def collect_sqlite_diagnostics(
path: str | Path,
*,
quick_check: bool = True,
table_name: Optional[str] = None,
table_sample_limit: int = 5,
) -> dict[str, Any]:
db_path = Path(path)
diagnostics: dict[str, Any] = {
"path": str(db_path),
"exists": bool(db_path.exists()),
}
if not diagnostics["exists"]:
return diagnostics
try:
diagnostics["size"] = int(db_path.stat().st_size)
except Exception as exc:
diagnostics["size_error"] = _clean_error(exc)
try:
with db_path.open("rb") as f:
header = f.read(len(SQLITE_HEADER))
diagnostics["header_ok"] = header == SQLITE_HEADER
diagnostics["header_hex"] = header.hex()
except Exception as exc:
diagnostics["header_error"] = _clean_error(exc)
if not quick_check:
return diagnostics
conn: sqlite3.Connection | None = None
try:
conn = sqlite3.connect(str(db_path))
try:
row = conn.execute("PRAGMA page_size").fetchone()
diagnostics["page_size"] = int((row[0] if row is not None else 0) or 0)
except Exception as exc:
diagnostics["page_size_error"] = _clean_error(exc)
try:
row = conn.execute("PRAGMA page_count").fetchone()
diagnostics["page_count"] = int((row[0] if row is not None else 0) or 0)
except Exception as exc:
diagnostics["page_count_error"] = _clean_error(exc)
try:
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' ORDER BY name").fetchall()
table_names = [str(row[0]) for row in rows if row and row[0]]
diagnostics["table_count"] = len(table_names)
if table_names:
diagnostics["tables_sample"] = table_names[: max(int(table_sample_limit or 0), 1)]
except Exception as exc:
diagnostics["table_list_error"] = _clean_error(exc)
if table_name:
diagnostics["target_table"] = str(table_name)
try:
cols = conn.execute(f"PRAGMA table_info({_quote_ident(table_name)})").fetchall()
diagnostics["target_table_exists"] = bool(cols)
if cols:
diagnostics["target_table_columns"] = [
str(col[1])
for col in cols[:8]
if len(col) > 1 and str(col[1] or "").strip()
]
except Exception as exc:
diagnostics["target_table_error"] = _clean_error(exc)
try:
rows = conn.execute("PRAGMA quick_check").fetchall()
values = [_clean_text(row[0]) for row in rows if row and row[0] is not None]
if values:
diagnostics["quick_check"] = values[0] if len(values) == 1 else values[:5]
diagnostics["quick_check_ok"] = len(values) == 1 and values[0].lower() == "ok"
if len(values) > 5:
diagnostics["quick_check_truncated"] = len(values) - 5
else:
diagnostics["quick_check"] = ""
diagnostics["quick_check_ok"] = None
except Exception as exc:
diagnostics["quick_check_error"] = _clean_error(exc)
diagnostics["quick_check_ok"] = False
except Exception as exc:
diagnostics["connect_error"] = _clean_error(exc)
finally:
if conn is not None:
try:
conn.close()
except Exception:
pass
return diagnostics
def sqlite_diagnostics_status(diagnostics: Mapping[str, Any]) -> str:
if not diagnostics:
return "not_run"
if not diagnostics.get("exists", False):
return "missing"
if diagnostics.get("header_ok") is False:
return "bad_header"
if diagnostics.get("connect_error"):
return "connect_error"
if diagnostics.get("quick_check_error"):
return "quick_check_error"
if diagnostics.get("quick_check_ok") is False:
return "quick_check_failed"
if diagnostics.get("quick_check_ok") is True:
return "ok"
return "header_only"
def format_sqlite_diagnostics(diagnostics: Mapping[str, Any]) -> str:
compact: dict[str, Any] = {}
for key, value in diagnostics.items():
if value in (None, "", [], {}):
continue
compact[str(key)] = value
return json.dumps(compact, ensure_ascii=False, sort_keys=True)
+123 -6
View File
@@ -21,6 +21,7 @@ from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .app_paths import get_output_databases_dir
from .sqlite_diagnostics import collect_sqlite_diagnostics, sqlite_diagnostics_status
# 注意:不再支持默认密钥,所有密钥必须通过参数传入
@@ -221,6 +222,7 @@ class WeChatDatabaseDecryptor:
self.key_bytes = bytes.fromhex(key_hex)
except ValueError:
raise ValueError("密钥必须是有效的十六进制字符串")
self.last_result: dict = {}
def decrypt_database(self, db_path: str, output_path: str) -> bool:
"""解密微信4.x版本数据库
@@ -234,6 +236,79 @@ class WeChatDatabaseDecryptor:
from .logging_config import get_logger
logger = get_logger(__name__)
result = {
"db_path": str(db_path),
"db_name": Path(str(db_path)).name,
"output_path": str(output_path),
"success": False,
"copied_as_sqlite": False,
"input_size": 0,
"output_size": 0,
"total_pages": 0,
"successful_pages": 0,
"failed_pages": 0,
"failed_page_samples": [],
"failure_reasons": {},
"diagnostics": {},
"diagnostic_status": "not_run",
"error": "",
}
self.last_result = result
def _append_failed_page(page_num: int, reason: str, error: str = "") -> None:
result["failure_reasons"][reason] = int(result["failure_reasons"].get(reason) or 0) + 1
if len(result["failed_page_samples"]) >= 8:
return
item = {"page": int(page_num), "reason": str(reason)}
err = " ".join(str(error or "").split()).strip()
if err:
item["error"] = err[:200]
result["failed_page_samples"].append(item)
def _finalize(success: bool, error: str = "") -> bool:
result["success"] = bool(success)
if error:
result["error"] = " ".join(str(error).split()).strip()
output_file = Path(str(output_path))
if output_file.exists():
try:
result["output_size"] = int(output_file.stat().st_size)
except Exception:
pass
diagnostics = collect_sqlite_diagnostics(output_file, quick_check=True)
result["diagnostics"] = diagnostics
result["diagnostic_status"] = sqlite_diagnostics_status(diagnostics)
payload = {
"db_name": result["db_name"],
"db_path": result["db_path"],
"output_path": result["output_path"],
"success": result["success"],
"copied_as_sqlite": result["copied_as_sqlite"],
"input_size": result["input_size"],
"output_size": result["output_size"],
"total_pages": result["total_pages"],
"successful_pages": result["successful_pages"],
"failed_pages": result["failed_pages"],
"failure_reasons": result["failure_reasons"],
"failed_page_samples": result["failed_page_samples"],
"diagnostic_status": result["diagnostic_status"],
"diagnostics": result["diagnostics"],
"error": result["error"],
}
log_fn = logger.info
if (
(not result["success"])
or int(result["failed_pages"] or 0) > 0
or str(result["diagnostic_status"] or "") != "ok"
):
log_fn = logger.warning
log_fn("[decrypt.diagnostic] %s", json.dumps(payload, ensure_ascii=False, sort_keys=True))
self.last_result = result
return bool(success)
logger.info(f"开始解密数据库: {db_path}")
try:
@@ -241,17 +316,19 @@ class WeChatDatabaseDecryptor:
encrypted_data = f.read()
logger.info(f"读取文件大小: {len(encrypted_data)} bytes")
result["input_size"] = int(len(encrypted_data))
if len(encrypted_data) < 4096:
logger.warning(f"文件太小,跳过解密: {db_path}")
return False
return _finalize(False, "file_too_small")
# 检查是否已经是解密的数据库
if encrypted_data.startswith(SQLITE_HEADER):
logger.info(f"文件已是SQLite格式,直接复制: {db_path}")
with open(output_path, 'wb') as f:
f.write(encrypted_data)
return True
result["copied_as_sqlite"] = True
return _finalize(True)
# 提取salt (前16字节)
salt = encrypted_data[:16]
@@ -295,6 +372,7 @@ class WeChatDatabaseDecryptor:
total_pages = len(encrypted_data) // page_size
successful_pages = 0
failed_pages = 0
result["total_pages"] = int(total_pages)
# 逐页解密
for cur_page in range(total_pages):
@@ -329,6 +407,7 @@ class WeChatDatabaseDecryptor:
if stored_hmac != expected_hmac:
logger.warning(f"页面 {page_num} HMAC验证失败")
failed_pages += 1
_append_failed_page(page_num, "hmac")
continue
# 提取IV和加密数据用于AES解密
@@ -354,20 +433,32 @@ class WeChatDatabaseDecryptor:
except Exception as e:
logger.error(f"页面 {page_num} AES解密失败: {e}")
failed_pages += 1
_append_failed_page(page_num, "aes", str(e))
continue
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages}")
result["successful_pages"] = int(successful_pages)
result["failed_pages"] = int(failed_pages)
# 写入解密后的文件
with open(output_path, 'wb') as f:
f.write(decrypted_data)
logger.info(f"解密文件大小: {len(decrypted_data)} bytes")
return True
if failed_pages > 0:
logger.warning(
"解密输出包含页失败: db=%s total_pages=%s failed_pages=%s failure_reasons=%s samples=%s",
result["db_name"],
int(total_pages),
int(failed_pages),
json.dumps(result["failure_reasons"], ensure_ascii=False, sort_keys=True),
json.dumps(result["failed_page_samples"], ensure_ascii=False),
)
return _finalize(True)
except Exception as e:
logger.error(f"解密失败: {db_path}, 错误: {e}")
return False
return _finalize(False, str(e))
def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> dict:
"""
@@ -493,6 +584,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
processed_files = []
failed_files = []
account_results = {}
diagnostic_warning_count = 0
for account_name, databases in account_databases.items():
logger.info(f"开始解密账号 {account_name}{len(databases)} 个数据库")
@@ -523,6 +615,8 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
account_success = 0
account_processed = []
account_failed = []
account_db_diagnostics = {}
account_diagnostic_warning_count = 0
for db_info in databases:
db_path = db_info['path']
@@ -533,7 +627,26 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
# 解密数据库
logger.info(f"解密 {account_name}/{db_name}")
if decryptor.decrypt_database(db_path, str(output_path)):
ok = decryptor.decrypt_database(db_path, str(output_path))
db_diagnostic = dict(getattr(decryptor, "last_result", {}) or {})
if not db_diagnostic:
db_diagnostic = {
"db_path": str(db_path),
"db_name": str(db_name),
"output_path": str(output_path),
"success": bool(ok),
}
db_diagnostic["account"] = str(account_name)
account_db_diagnostics[db_name] = db_diagnostic
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
if ok:
account_success += 1
success_count += 1
account_processed.append(str(output_path))
@@ -551,8 +664,11 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed": len(databases) - account_success,
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed
"failed_files": account_failed,
"db_diagnostics": account_db_diagnostics,
"diagnostic_warning_count": int(account_diagnostic_warning_count),
}
diagnostic_warning_count += int(account_diagnostic_warning_count)
# 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表
if os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", "1") != "0":
@@ -586,6 +702,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed_files": failed_files,
"account_results": account_results, # 新增:按账号的详细结果
"detected_accounts": detected_accounts,
"diagnostic_warning_count": int(diagnostic_warning_count),
}
logger.info("=" * 60)