diff --git a/src/wechat_decrypt_tool/wechat_decrypt.py b/src/wechat_decrypt_tool/wechat_decrypt.py index a8fd440..1b3b7f2 100644 --- a/src/wechat_decrypt_tool/wechat_decrypt.py +++ b/src/wechat_decrypt_tool/wechat_decrypt.py @@ -43,6 +43,29 @@ def _derive_mac_key(raw_key: bytes, salt: bytes) -> bytes: return hashlib.pbkdf2_hmac("sha512", raw_key, mac_salt, 2, dklen=KEY_SIZE) +def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes: + return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE) + + +def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None: + salt = page1[:SALT_SIZE] + stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE] + + candidates = [ + ("raw_enc_key", key_material, _derive_mac_key(key_material, salt)), + ] + + derived_key = _derive_sqlcipher_enc_key(key_material, salt) + candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt))) + + for mode, enc_key, mac_key in candidates: + expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1) + if stored_page1_hmac == expected_page1_hmac: + return enc_key, mac_key, mode + + return None + + def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes: offset = SALT_SIZE if page_num == 1 else 0 data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE @@ -323,8 +346,9 @@ class WeChatDatabaseDecryptor: def decrypt_database(self, db_path: str, output_path: str) -> bool: """解密微信4.x版本数据库 - 这里传入的 key 已经是从微信进程内存提取出的 raw enc_key, - 不是 SQLCipher 的口令,因此不能再做一轮 PBKDF2。 + 兼容两种输入形态: + - raw enc_key(部分内存扫描/工具直接返回) + - SQLCipher 口令/基础 key(需先用数据库 salt 做一轮 PBKDF2) """ from .logging_config import get_logger logger = get_logger(__name__) @@ -370,15 +394,14 @@ class WeChatDatabaseDecryptor: tmp_output_path = "" return True - salt = page1[:SALT_SIZE] - mac_key = _derive_mac_key(self.key_bytes, salt) - expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1) - stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE] - if stored_page1_hmac != expected_page1_hmac: + resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1) + if resolved_key_material is None: message = f"当前数据库密钥不正确,或该密钥不属于当前账号/当前设备: {db_path}" self._set_last_error("key_mismatch", message) logger.error(f"页面 1 HMAC验证失败,密钥与数据库不匹配: {db_path}") return False + enc_key, mac_key, key_mode = resolved_key_material + logger.info(f"页面 1 HMAC验证通过: mode={key_mode} path={db_path}") total_pages = (file_size + PAGE_SIZE - 1) // PAGE_SIZE successful_pages = 0 @@ -406,7 +429,7 @@ class WeChatDatabaseDecryptor: logger.error(f"页面 {page_num} HMAC验证失败,终止解密: {db_path}") return False - target.write(_decrypt_page(self.key_bytes, page, page_num)) + target.write(_decrypt_page(enc_key, page, page_num)) successful_pages += 1 logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 0 页") diff --git a/tests/test_wechat_decrypt_raw_key.py b/tests/test_wechat_decrypt_raw_key.py index 4588ad2..64bd992 100644 --- a/tests/test_wechat_decrypt_raw_key.py +++ b/tests/test_wechat_decrypt_raw_key.py @@ -19,11 +19,22 @@ from wechat_decrypt_tool.wechat_decrypt import ( SQLITE_HEADER, WeChatDatabaseDecryptor, _derive_mac_key, + _derive_sqlcipher_enc_key, decrypt_wechat_databases, ) -def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes: +def _encrypt_page( + raw_key: bytes, + plain_page: bytes, + page_num: int, + salt: bytes, + iv: bytes, + *, + sqlcipher_passphrase: bool = False, +) -> bytes: + enc_key = _derive_sqlcipher_enc_key(raw_key, salt) if sqlcipher_passphrase else raw_key + if page_num == 1: encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE] prefix = salt @@ -32,7 +43,7 @@ def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, prefix = b"" cipher = Cipher( - algorithms.AES(raw_key), + algorithms.AES(enc_key), modes.CBC(iv), backend=default_backend(), ) @@ -40,7 +51,7 @@ def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, encrypted = encryptor.update(encrypted_input) + encryptor.finalize() page_without_hmac = prefix + encrypted + iv - mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512) + mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512) mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :]) mac.update(page_num.to_bytes(4, "little")) return page_without_hmac + mac.digest() @@ -74,6 +85,39 @@ class WeChatDecryptRawKeyTests(unittest.TestCase): self.assertTrue(decryptor.decrypt_database(str(src), str(dst))) self.assertEqual(dst.read_bytes(), page1 + page2) + def test_decrypt_database_falls_back_to_sqlcipher_passphrase_mode(self): + passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd") + salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34") + iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") + iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20") + + page1 = _build_plain_page(0x41, first_page=True) + page2 = _build_plain_page(0x42, first_page=False) + encrypted_db = _encrypt_page( + passphrase_key, + page1, + 1, + salt, + iv1, + sqlcipher_passphrase=True, + ) + _encrypt_page( + passphrase_key, + page2, + 2, + salt, + iv2, + sqlcipher_passphrase=True, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + src = Path(tmpdir) / "source.db" + dst = Path(tmpdir) / "out.db" + src.write_bytes(encrypted_db) + + decryptor = WeChatDatabaseDecryptor(passphrase_key.hex()) + self.assertTrue(decryptor.decrypt_database(str(src), str(dst))) + self.assertEqual(dst.read_bytes(), page1 + page2) + def test_decrypt_database_keeps_existing_output_on_hmac_failure(self): good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"