Files
WeChatDataAnalysis/tests/test_wechat_decrypt_key_modes.py
T
2977094657 8d2dda61d8 fix(decrypt): 非首页 HMAC 异常时保留页面避免后续页整体错位
- 微信 4.x 大库在 1GiB 边界会出现单页 HMAC 不匹配但页本身仍可解密的情况,原先直接丢弃页会让后续页号整体错位,最终导致 SQLite 必然损坏。改为:HMAC 不匹配时照常解密保留,AES 失败用零页占位,保证页号对齐。

- 解密管线全程增加诊断采样:源文件读取前后快照、输入 layout、key 模式、HMAC/AES 异常页 SHA256 与 HMAC 变体匹配,便于后续定位疑难库。

- 解密 SSE 流将 hmac_warning_pages / hmac_warning_samples 透出到前端并参与诊断告警判断,避免警告被静默吞掉。

- 新增回归测试覆盖非首页 HMAC 单字节翻转场景。
2026-04-29 17:24:40 +08:00

108 lines
4.9 KiB
Python

import hashlib
import hmac
import tempfile
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import wechat_decrypt_tool.wechat_decrypt as wechat_decrypt
from wechat_decrypt_tool.wechat_decrypt import (
HMAC_SIZE,
PAGE_SIZE,
RESERVE_SIZE,
SALT_SIZE,
SQLITE_HEADER,
WeChatDatabaseDecryptor,
_derive_mac_key,
_derive_sqlcipher_enc_key,
)
def _build_plain_page(fill: int, *, first_page: bool) -> bytes:
body = bytes([fill]) * (PAGE_SIZE - RESERVE_SIZE)
if first_page:
body = SQLITE_HEADER + body[len(SQLITE_HEADER):]
return body + (b"\x00" * RESERVE_SIZE)
def _encrypt_page(key_material: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes, *, passphrase: bool = False) -> bytes:
enc_key = _derive_sqlcipher_enc_key(key_material, salt) if passphrase else key_material
if page_num == 1:
encrypted_input = plain_page[SALT_SIZE: PAGE_SIZE - RESERVE_SIZE]
prefix = salt
else:
encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE]
prefix = b""
cipher = Cipher(algorithms.AES(enc_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
page_without_hmac = prefix + encrypted + iv
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0:])
mac.update(page_num.to_bytes(4, "little"))
return page_without_hmac + mac.digest()
def _decrypt_sample(key_hex: str, encrypted_db: bytes, monkeypatch) -> bytes:
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True})
monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok")
decryptor = WeChatDatabaseDecryptor(key_hex)
assert decryptor.decrypt_database(str(src), str(dst))
return dst.read_bytes()
def test_decrypt_database_accepts_raw_enc_key_like_weflow(monkeypatch):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x41, first_page=True)
page2 = _build_plain_page(0x42, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("0102030405060708090a0b0c0d0e0f10"))
encrypted_db += _encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("1112131415161718191a1b1c1d1e1f20"))
assert _decrypt_sample(raw_key.hex(), encrypted_db, monkeypatch) == page1 + page2
def test_decrypt_database_keeps_sqlcipher_passphrase_compatibility(monkeypatch):
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
salt = bytes.fromhex("40f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x51, first_page=True)
page2 = _build_plain_page(0x52, first_page=False)
encrypted_db = _encrypt_page(passphrase_key, page1, 1, salt, bytes.fromhex("2122232425262728292a2b2c2d2e2f30"), passphrase=True)
encrypted_db += _encrypt_page(passphrase_key, page2, 2, salt, bytes.fromhex("3132333435363738393a3b3c3d3e3f40"), passphrase=True)
assert _decrypt_sample(passphrase_key.hex(), encrypted_db, monkeypatch) == page1 + page2
def test_decrypt_database_keeps_page_when_non_first_hmac_mismatch(monkeypatch):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("60f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x61, first_page=True)
page2 = _build_plain_page(0x62, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("4142434445464748494a4b4c4d4e4f50"))
encrypted_page2 = bytearray(_encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("5152535455565758595a5b5c5d5e5f60")))
encrypted_page2[-1] ^= 0x01 # 只破坏 HMAC,不破坏密文和 IV。
encrypted_db += bytes(encrypted_page2)
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True})
monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok")
decryptor = WeChatDatabaseDecryptor(raw_key.hex())
assert decryptor.decrypt_database(str(src), str(dst))
assert dst.read_bytes() == page1 + page2
assert decryptor.last_result["failed_pages"] == 0
assert decryptor.last_result["hmac_warning_pages"] == 1