diff --git a/src/wechat_decrypt_tool/routers/decrypt.py b/src/wechat_decrypt_tool/routers/decrypt.py index bf8f90e..50dfe2c 100644 --- a/src/wechat_decrypt_tool/routers/decrypt.py +++ b/src/wechat_decrypt_tool/routers/decrypt.py @@ -403,6 +403,7 @@ async def decrypt_databases_stream( if ( (not bool(db_diagnostic.get("success", ok))) or int(db_diagnostic.get("failed_pages") or 0) > 0 + or int(db_diagnostic.get("hmac_warning_pages") or 0) > 0 or str(db_diagnostic.get("diagnostic_status") or "") != "ok" ): account_diagnostic_warning_count += 1 @@ -434,8 +435,11 @@ async def decrypt_databases_stream( if db_diagnostic: payload["diagnostic_status"] = str(db_diagnostic.get("diagnostic_status") or "") payload["page_failures"] = int(db_diagnostic.get("failed_pages") or 0) + payload["hmac_warning_pages"] = int(db_diagnostic.get("hmac_warning_pages") or 0) if db_diagnostic.get("failed_page_samples"): payload["failed_page_samples"] = db_diagnostic.get("failed_page_samples") + if db_diagnostic.get("hmac_warning_samples"): + payload["hmac_warning_samples"] = db_diagnostic.get("hmac_warning_samples") if db_diagnostic.get("diagnostics"): payload["diagnostics"] = db_diagnostic.get("diagnostics") diff --git a/src/wechat_decrypt_tool/wechat_decrypt.py b/src/wechat_decrypt_tool/wechat_decrypt.py index feb3b2d..90deb15 100644 --- a/src/wechat_decrypt_tool/wechat_decrypt.py +++ b/src/wechat_decrypt_tool/wechat_decrypt.py @@ -13,7 +13,10 @@ import hashlib import hmac import os import json +import struct +import time from pathlib import Path +from typing import Any from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes @@ -56,6 +59,201 @@ def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes: return mac.digest() +def _compute_page_hmac_variant( + mac_key: bytes, + page: bytes, + page_num: int, + *, + endian: str = "little", + include_iv: bool = True, +) -> bytes: + """用于诊断的 HMAC 变体计算,不参与实际解密决策。""" + offset = SALT_SIZE if page_num == 1 else 0 + data_end = PAGE_SIZE - RESERVE_SIZE + (IV_SIZE if include_iv else 0) + mac = hmac.new(mac_key, digestmod=hashlib.sha512) + mac.update(page[offset:data_end]) + mac.update(page_num.to_bytes(4, endian)) + return mac.digest() + + +def _hash_prefix(data: bytes, *, length: int = 16) -> str: + """返回 SHA256 前缀,避免日志输出明文数据。""" + try: + return hashlib.sha256(bytes(data or b"")).hexdigest()[: max(int(length), 8)] + except Exception: + return "" + + +def _hex_prefix(data: bytes, *, length: int = 32) -> str: + try: + return bytes(data or b"")[: max(int(length), 0)].hex() + except Exception: + return "" + + +def _safe_file_snapshot(path: str | Path) -> dict[str, Any]: + """采集源/输出文件与 WAL 旁路文件信息,用于定位解密时文件是否变化。""" + p = Path(path) + out: dict[str, Any] = {"path": str(p), "exists": False} + try: + st = p.stat() + out.update( + { + "exists": True, + "size": int(st.st_size), + "mtime_ns": int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))), + } + ) + except Exception as exc: + out["stat_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}" + + siblings: dict[str, Any] = {} + for suffix in ("-wal", "-shm", "-journal"): + sp = Path(str(p) + suffix) + try: + st = sp.stat() + siblings[suffix] = { + "exists": True, + "size": int(st.st_size), + "mtime_ns": int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))), + } + except FileNotFoundError: + siblings[suffix] = {"exists": False} + except Exception as exc: + siblings[suffix] = { + "exists": False, + "stat_error": f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}", + } + out["siblings"] = siblings + return out + + +def _read_plain_sqlite_header_debug(path: str | Path) -> dict[str, Any]: + """解析明文 SQLite 头部关键字段,帮助定位输出库结构问题。""" + p = Path(path) + out: dict[str, Any] = {"path": str(p)} + try: + with p.open("rb") as f: + header = f.read(100) + out["header_len"] = len(header) + out["header_ok"] = header.startswith(SQLITE_HEADER) + out["header_hex"] = header[:32].hex() + if len(header) >= 100: + raw_page_size = struct.unpack(">H", header[16:18])[0] + out.update( + { + "page_size_header": 65536 if raw_page_size == 1 else int(raw_page_size), + "write_version": int(header[18]), + "read_version": int(header[19]), + "reserved_space": int(header[20]), + "max_payload_fraction": int(header[21]), + "min_payload_fraction": int(header[22]), + "leaf_payload_fraction": int(header[23]), + "file_change_counter": int.from_bytes(header[24:28], "big"), + "db_size_pages_header": int.from_bytes(header[28:32], "big"), + "freelist_trunk_page": int.from_bytes(header[32:36], "big"), + "freelist_pages": int.from_bytes(header[36:40], "big"), + "schema_cookie": int.from_bytes(header[40:44], "big"), + "schema_format": int.from_bytes(header[44:48], "big"), + "text_encoding": int.from_bytes(header[56:60], "big"), + } + ) + except Exception as exc: + out["error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}" + return out + + +def _plain_page_btree_debug(page_plain: bytes, page_num: int) -> dict[str, Any]: + """解析明文页 B-tree 页头摘要,不输出任何业务明文。""" + out: dict[str, Any] = {"page": int(page_num), "plain_sha256": _hash_prefix(page_plain, length=24)} + try: + hdr = 100 if int(page_num) == 1 else 0 + if len(page_plain) >= hdr + 12: + page_type = int(page_plain[hdr]) + out["btree_header_offset"] = int(hdr) + out["btree_page_type"] = page_type + out["btree_page_type_name"] = { + 2: "interior_index", + 5: "interior_table", + 10: "leaf_index", + 13: "leaf_table", + }.get(page_type, "unknown") + out["first_freeblock"] = int.from_bytes(page_plain[hdr + 1 : hdr + 3], "big") + out["cell_count"] = int.from_bytes(page_plain[hdr + 3 : hdr + 5], "big") + out["cell_content_area"] = int.from_bytes(page_plain[hdr + 5 : hdr + 7], "big") + out["fragmented_free_bytes"] = int(page_plain[hdr + 7]) + if page_type in (2, 5): + out["right_most_pointer"] = int.from_bytes(page_plain[hdr + 8 : hdr + 12], "big") + except Exception as exc: + out["btree_parse_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:160]}" + return out + + +def _build_page_anomaly_debug( + enc_key: bytes, + mac_key: bytes, + page: bytes, + page_num: int, + *, + stored_hmac: bytes | None = None, + expected_hmac: bytes | None = None, + reason: str = "hmac", +) -> dict[str, Any]: + """构造异常页诊断信息,默认只记录哈希/页头摘要。""" + page = bytes(page or b"") + stored = stored_hmac if stored_hmac is not None else page[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE] + expected = expected_hmac if expected_hmac is not None else _compute_page_hmac(mac_key, page, page_num) + iv = page[PAGE_SIZE - RESERVE_SIZE : PAGE_SIZE - RESERVE_SIZE + IV_SIZE] + encrypted_payload = page[SALT_SIZE if page_num == 1 else 0 : PAGE_SIZE - RESERVE_SIZE] + out: dict[str, Any] = { + "reason": str(reason), + "page": int(page_num), + "byte_start": int((int(page_num) - 1) * PAGE_SIZE), + "byte_end_exclusive": int(int(page_num) * PAGE_SIZE), + "page_size": int(len(page)), + "page_sha256": _hash_prefix(page, length=24), + "encrypted_payload_sha256": _hash_prefix(encrypted_payload, length=24), + "iv_hex": _hex_prefix(iv, length=16), + "stored_hmac_prefix": _hex_prefix(stored, length=16), + "expected_hmac_prefix": _hex_prefix(expected, length=16), + "hmac_match_current": bool(hmac.compare_digest(stored, expected)), + } + + variants: dict[str, bool] = {} + for candidate_page in (page_num - 1, page_num, page_num + 1): + if candidate_page <= 0: + continue + for endian in ("little", "big"): + for include_iv in (True, False): + key = f"page={candidate_page};endian={endian};include_iv={int(include_iv)}" + try: + variants[key] = bool( + hmac.compare_digest( + stored, + _compute_page_hmac_variant( + mac_key, + page, + int(candidate_page), + endian=endian, + include_iv=include_iv, + ), + ) + ) + except Exception: + variants[key] = False + out["hmac_variant_matches"] = [k for k, v in variants.items() if v] + + try: + plain_page = _decrypt_page(enc_key, page, int(page_num)) + out["aes_decrypt_ok"] = True + out["plain"] = _plain_page_btree_debug(plain_page, int(page_num)) + except Exception as exc: + out["aes_decrypt_ok"] = False + out["aes_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}" + + return out + + def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None: """Detect whether input key is raw enc_key or SQLCipher passphrase by page-1 HMAC.""" if len(page1) < PAGE_SIZE: @@ -228,8 +426,13 @@ def _resolve_db_storage_roots(storage_path: Path) -> list[Path]: def scan_account_databases_from_path(db_storage_path: str) -> dict: + from .logging_config import get_logger + + logger = get_logger(__name__) storage_path = Path(str(db_storage_path or "").strip()) + logger.info("[decrypt.scan] start db_storage_path=%s", str(storage_path)) if not storage_path.exists(): + logger.warning("[decrypt.scan] path_not_exists db_storage_path=%s", str(storage_path)) return { "status": "error", "message": f"指定的数据库路径不存在: {db_storage_path}", @@ -239,6 +442,10 @@ def scan_account_databases_from_path(db_storage_path: str) -> dict: } db_roots = _resolve_db_storage_roots(storage_path) + logger.info( + "[decrypt.scan] resolved_roots %s", + json.dumps([str(x) for x in db_roots], ensure_ascii=False), + ) if not db_roots: return { "status": "error", @@ -290,6 +497,30 @@ def scan_account_databases_from_path(db_storage_path: str) -> dict: } ) + logger.info( + "[decrypt.scan] databases_found %s", + json.dumps( + { + "account": account_name, + "db_storage_path": str(db_root), + "wxid_dir": str(db_root.parent), + "count": len(databases), + "files": [ + { + "name": str(item.get("name") or ""), + "relative": str(Path(str(item.get("path") or "")).relative_to(db_root)) + if str(item.get("path") or "").startswith(str(db_root)) + else str(item.get("path") or ""), + } + for item in databases[:80] + ], + "truncated": max(0, len(databases) - 80), + }, + ensure_ascii=False, + sort_keys=True, + ), + ) + if not databases: return { "status": "error", @@ -370,6 +601,18 @@ class WeChatDatabaseDecryptor: "failed_pages": 0, "failed_page_samples": [], "failure_reasons": {}, + "hmac_warning_pages": 0, + "hmac_warning_samples": [], + "hmac_debug_samples": [], + "aes_debug_samples": [], + "source_snapshot_before": {}, + "source_snapshot_after": {}, + "source_changed_during_read": False, + "read_ms": 0, + "key_mode": "", + "input_layout": {}, + "expected_output_size": 0, + "output_header_debug": {}, "diagnostics": {}, "diagnostic_status": "not_run", "error": "", @@ -386,6 +629,14 @@ class WeChatDatabaseDecryptor: item["error"] = err[:200] result["failed_page_samples"].append(item) + def _append_hmac_warning_page(page_num: int) -> None: + # 非首页 HMAC 异常不再直接丢弃页面:部分微信 4.x 大库在 1GiB 边界会出现 + # 单页 HMAC 不匹配,但页面本身仍可正常解密。丢页会导致后续页号整体错位。 + result["hmac_warning_pages"] = int(result.get("hmac_warning_pages") or 0) + 1 + if len(result["hmac_warning_samples"]) >= 8: + return + result["hmac_warning_samples"].append({"page": int(page_num), "reason": "hmac"}) + def _finalize(success: bool, error: str = "") -> bool: normalized_success = bool(success) result["success"] = normalized_success @@ -402,6 +653,7 @@ class WeChatDatabaseDecryptor: diagnostics = collect_sqlite_diagnostics(output_file, quick_check=True) result["diagnostics"] = diagnostics result["diagnostic_status"] = sqlite_diagnostics_status(diagnostics) + result["output_header_debug"] = _read_plain_sqlite_header_debug(output_file) if normalized_success: failure_message = _build_decrypt_failure_message(result) @@ -429,6 +681,18 @@ class WeChatDatabaseDecryptor: "failed_pages": result["failed_pages"], "failure_reasons": result["failure_reasons"], "failed_page_samples": result["failed_page_samples"], + "hmac_warning_pages": result["hmac_warning_pages"], + "hmac_warning_samples": result["hmac_warning_samples"], + "hmac_debug_samples": result["hmac_debug_samples"], + "aes_debug_samples": result["aes_debug_samples"], + "source_snapshot_before": result["source_snapshot_before"], + "source_snapshot_after": result["source_snapshot_after"], + "source_changed_during_read": result["source_changed_during_read"], + "read_ms": result["read_ms"], + "key_mode": result["key_mode"], + "input_layout": result["input_layout"], + "expected_output_size": result["expected_output_size"], + "output_header_debug": result["output_header_debug"], "diagnostic_status": result["diagnostic_status"], "diagnostics": result["diagnostics"], "error": result["error"], @@ -437,6 +701,7 @@ class WeChatDatabaseDecryptor: if ( (not result["success"]) or int(result["failed_pages"] or 0) > 0 + or int(result.get("hmac_warning_pages") or 0) > 0 or str(result["diagnostic_status"] or "") != "ok" ): log_fn = logger.warning @@ -447,11 +712,81 @@ class WeChatDatabaseDecryptor: logger.info(f"开始解密数据库: {db_path}") try: + source_snapshot_before = _safe_file_snapshot(db_path) + result["source_snapshot_before"] = source_snapshot_before + logger.info( + "[decrypt.pipeline] source_snapshot_before %s", + json.dumps( + { + "db_name": result["db_name"], + "snapshot": source_snapshot_before, + }, + ensure_ascii=False, + sort_keys=True, + ), + ) + + read_t0 = time.perf_counter() with open(db_path, 'rb') as f: encrypted_data = f.read() - + result["read_ms"] = round((time.perf_counter() - read_t0) * 1000.0, 1) + + source_snapshot_after = _safe_file_snapshot(db_path) + result["source_snapshot_after"] = source_snapshot_after + before_size = int(source_snapshot_before.get("size") or 0) + after_size = int(source_snapshot_after.get("size") or 0) + before_mtime = int(source_snapshot_before.get("mtime_ns") or 0) + after_mtime = int(source_snapshot_after.get("mtime_ns") or 0) + source_changed = bool(before_size != after_size or before_mtime != after_mtime) + result["source_changed_during_read"] = source_changed + logger.info( + "[decrypt.pipeline] source_snapshot_after %s", + json.dumps( + { + "db_name": result["db_name"], + "snapshot": source_snapshot_after, + "read_ms": result["read_ms"], + "source_changed_during_read": source_changed, + }, + ensure_ascii=False, + sort_keys=True, + ), + ) + if source_changed: + logger.warning( + "[decrypt.pipeline] source_changed_during_read db=%s before_size=%s after_size=%s before_mtime_ns=%s after_mtime_ns=%s", + result["db_name"], + before_size, + after_size, + before_mtime, + after_mtime, + ) + logger.info(f"读取文件大小: {len(encrypted_data)} bytes") result["input_size"] = int(len(encrypted_data)) + result["input_layout"] = { + "page_size": PAGE_SIZE, + "reserve_size": RESERVE_SIZE, + "iv_size": IV_SIZE, + "hmac_size": HMAC_SIZE, + "input_size": int(len(encrypted_data)), + "input_size_mod_page": int(len(encrypted_data) % PAGE_SIZE), + "total_pages_floor": int(len(encrypted_data) // PAGE_SIZE), + "total_pages_ceil": int((len(encrypted_data) + PAGE_SIZE - 1) // PAGE_SIZE), + "starts_with_sqlite_header": bool(encrypted_data.startswith(SQLITE_HEADER)), + "first16_hex": encrypted_data[:16].hex(), + } + logger.info( + "[decrypt.pipeline] input_layout %s", + json.dumps( + { + "db_name": result["db_name"], + "input_layout": result["input_layout"], + }, + ensure_ascii=False, + sort_keys=True, + ), + ) if len(encrypted_data) < 4096: logger.warning(f"文件太小,跳过解密: {db_path}") @@ -477,12 +812,33 @@ class WeChatDatabaseDecryptor: enc_key, mac_key, key_mode = resolved_key_material result["key_mode"] = key_mode logger.info("Page 1 HMAC verification passed: mode=%s path=%s", key_mode, db_path) + logger.info( + "[decrypt.pipeline] key_material_resolved %s", + json.dumps( + { + "db_name": result["db_name"], + "key_mode": key_mode, + "salt_sha256": _hash_prefix(page1[:SALT_SIZE], length=24), + "page1_stored_hmac_prefix": _hex_prefix(page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE], length=16), + "page1_expected_hmac_prefix": _hex_prefix(_compute_page_hmac(mac_key, page1, 1), length=16), + }, + ensure_ascii=False, + sort_keys=True, + ), + ) decrypted_data = bytearray() total_pages = (len(encrypted_data) + PAGE_SIZE - 1) // PAGE_SIZE successful_pages = 0 failed_pages = 0 result["total_pages"] = int(total_pages) + result["expected_output_size"] = int(total_pages * PAGE_SIZE) + logger.info( + "[decrypt.pipeline] page_loop_start db=%s total_pages=%s expected_output_size=%s", + result["db_name"], + int(total_pages), + int(result["expected_output_size"]), + ) for cur_page in range(total_pages): page_num = cur_page + 1 @@ -502,10 +858,30 @@ class WeChatDatabaseDecryptor: stored_hmac = page[PAGE_SIZE - HMAC_SIZE: PAGE_SIZE] expected_hmac = _compute_page_hmac(mac_key, page, page_num) if not hmac.compare_digest(stored_hmac, expected_hmac): - logger.warning("Page %s HMAC verification failed", page_num) - failed_pages += 1 - _append_failed_page(page_num, "hmac") - continue + logger.warning("Page %s HMAC verification failed; decrypting page anyway", page_num) + _append_hmac_warning_page(page_num) + anomaly_debug = _build_page_anomaly_debug( + enc_key, + mac_key, + page, + page_num, + stored_hmac=stored_hmac, + expected_hmac=expected_hmac, + reason="hmac", + ) + if len(result["hmac_debug_samples"]) < 8: + result["hmac_debug_samples"].append(anomaly_debug) + logger.warning( + "[decrypt.page_anomaly] %s", + json.dumps( + { + "db_name": result["db_name"], + "anomaly": anomaly_debug, + }, + ensure_ascii=False, + sort_keys=True, + ), + ) try: decrypted_data.extend(_decrypt_page(enc_key, page, page_num)) @@ -514,8 +890,44 @@ class WeChatDatabaseDecryptor: logger.error("Page %s AES decryption failed: %s", page_num, e) failed_pages += 1 _append_failed_page(page_num, "aes", str(e)) + aes_debug = _build_page_anomaly_debug( + enc_key, + mac_key, + page, + page_num, + stored_hmac=stored_hmac, + expected_hmac=expected_hmac, + reason="aes", + ) + if len(result["aes_debug_samples"]) < 8: + result["aes_debug_samples"].append(aes_debug) + logger.error( + "[decrypt.page_anomaly] %s", + json.dumps( + { + "db_name": result["db_name"], + "anomaly": aes_debug, + }, + ensure_ascii=False, + sort_keys=True, + ), + ) + # 保留页占位,避免后续页整体错位导致 SQLite 必然损坏。 + decrypted_data.extend(b"\x00" * PAGE_SIZE) continue + if total_pages >= 100000 and page_num % 50000 == 0: + logger.info( + "[decrypt.pipeline] page_loop_progress db=%s page=%s/%s successful_pages=%s failed_pages=%s hmac_warning_pages=%s output_bytes=%s", + result["db_name"], + int(page_num), + int(total_pages), + int(successful_pages), + int(failed_pages), + int(result.get("hmac_warning_pages") or 0), + int(len(decrypted_data)), + ) + result["successful_pages"] = int(successful_pages) result["failed_pages"] = int(failed_pages) @@ -524,6 +936,14 @@ class WeChatDatabaseDecryptor: f.write(decrypted_data) logger.info(f"解密文件大小: {len(decrypted_data)} bytes") + if int(len(decrypted_data)) != int(result["expected_output_size"]): + logger.warning( + "[decrypt.pipeline] output_size_mismatch db=%s output_size=%s expected_output_size=%s delta=%s", + result["db_name"], + int(len(decrypted_data)), + int(result["expected_output_size"]), + int(len(decrypted_data)) - int(result["expected_output_size"]), + ) if failed_pages > 0: logger.warning( "解密输出包含页失败: db=%s total_pages=%s failed_pages=%s failure_reasons=%s samples=%s", @@ -533,6 +953,14 @@ class WeChatDatabaseDecryptor: json.dumps(result["failure_reasons"], ensure_ascii=False, sort_keys=True), json.dumps(result["failed_page_samples"], ensure_ascii=False), ) + if int(result.get("hmac_warning_pages") or 0) > 0: + logger.warning( + "解密输出包含HMAC告警页但已保留页内容: db=%s total_pages=%s hmac_warning_pages=%s samples=%s", + result["db_name"], + int(total_pages), + int(result.get("hmac_warning_pages") or 0), + json.dumps(result["hmac_warning_samples"], ensure_ascii=False), + ) return _finalize(True) except Exception as e: @@ -721,6 +1149,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di if ( (not bool(db_diagnostic.get("success", ok))) or int(db_diagnostic.get("failed_pages") or 0) > 0 + or int(db_diagnostic.get("hmac_warning_pages") or 0) > 0 or str(db_diagnostic.get("diagnostic_status") or "") != "ok" ): account_diagnostic_warning_count += 1 diff --git a/tests/test_wechat_decrypt_key_modes.py b/tests/test_wechat_decrypt_key_modes.py index 8de1e46..ecb136e 100644 --- a/tests/test_wechat_decrypt_key_modes.py +++ b/tests/test_wechat_decrypt_key_modes.py @@ -80,3 +80,28 @@ def test_decrypt_database_keeps_sqlcipher_passphrase_compatibility(monkeypatch): encrypted_db += _encrypt_page(passphrase_key, page2, 2, salt, bytes.fromhex("3132333435363738393a3b3c3d3e3f40"), passphrase=True) assert _decrypt_sample(passphrase_key.hex(), encrypted_db, monkeypatch) == page1 + page2 + + +def test_decrypt_database_keeps_page_when_non_first_hmac_mismatch(monkeypatch): + raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") + salt = bytes.fromhex("60f4090ef6897e146f94109f13743e34") + page1 = _build_plain_page(0x61, first_page=True) + page2 = _build_plain_page(0x62, first_page=False) + + encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("4142434445464748494a4b4c4d4e4f50")) + encrypted_page2 = bytearray(_encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("5152535455565758595a5b5c5d5e5f60"))) + encrypted_page2[-1] ^= 0x01 # 只破坏 HMAC,不破坏密文和 IV。 + encrypted_db += bytes(encrypted_page2) + + with tempfile.TemporaryDirectory() as tmpdir: + src = Path(tmpdir) / "source.db" + dst = Path(tmpdir) / "out.db" + src.write_bytes(encrypted_db) + monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True}) + monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok") + + decryptor = WeChatDatabaseDecryptor(raw_key.hex()) + assert decryptor.decrypt_database(str(src), str(dst)) + assert dst.read_bytes() == page1 + page2 + assert decryptor.last_result["failed_pages"] == 0 + assert decryptor.last_result["hmac_warning_pages"] == 1