mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
Revert "fix(decrypt): 修复 SQLCipher 密钥兼容性回归"
This reverts commit 22399f4110.
This commit is contained in:
@@ -43,29 +43,6 @@ def _derive_mac_key(raw_key: bytes, salt: bytes) -> bytes:
|
||||
return hashlib.pbkdf2_hmac("sha512", raw_key, mac_salt, 2, dklen=KEY_SIZE)
|
||||
|
||||
|
||||
def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes:
|
||||
return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE)
|
||||
|
||||
|
||||
def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None:
|
||||
salt = page1[:SALT_SIZE]
|
||||
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
|
||||
|
||||
candidates = [
|
||||
("raw_enc_key", key_material, _derive_mac_key(key_material, salt)),
|
||||
]
|
||||
|
||||
derived_key = _derive_sqlcipher_enc_key(key_material, salt)
|
||||
candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt)))
|
||||
|
||||
for mode, enc_key, mac_key in candidates:
|
||||
expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1)
|
||||
if stored_page1_hmac == expected_page1_hmac:
|
||||
return enc_key, mac_key, mode
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes:
|
||||
offset = SALT_SIZE if page_num == 1 else 0
|
||||
data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE
|
||||
@@ -346,9 +323,8 @@ class WeChatDatabaseDecryptor:
|
||||
def decrypt_database(self, db_path: str, output_path: str) -> bool:
|
||||
"""解密微信4.x版本数据库
|
||||
|
||||
兼容两种输入形态:
|
||||
- raw enc_key(部分内存扫描/工具直接返回)
|
||||
- SQLCipher 口令/基础 key(需先用数据库 salt 做一轮 PBKDF2)
|
||||
这里传入的 key 已经是从微信进程内存提取出的 raw enc_key,
|
||||
不是 SQLCipher 的口令,因此不能再做一轮 PBKDF2。
|
||||
"""
|
||||
from .logging_config import get_logger
|
||||
logger = get_logger(__name__)
|
||||
@@ -394,14 +370,15 @@ class WeChatDatabaseDecryptor:
|
||||
tmp_output_path = ""
|
||||
return True
|
||||
|
||||
resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1)
|
||||
if resolved_key_material is None:
|
||||
salt = page1[:SALT_SIZE]
|
||||
mac_key = _derive_mac_key(self.key_bytes, salt)
|
||||
expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1)
|
||||
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
|
||||
if stored_page1_hmac != expected_page1_hmac:
|
||||
message = f"当前数据库密钥不正确,或该密钥不属于当前账号/当前设备: {db_path}"
|
||||
self._set_last_error("key_mismatch", message)
|
||||
logger.error(f"页面 1 HMAC验证失败,密钥与数据库不匹配: {db_path}")
|
||||
return False
|
||||
enc_key, mac_key, key_mode = resolved_key_material
|
||||
logger.info(f"页面 1 HMAC验证通过: mode={key_mode} path={db_path}")
|
||||
|
||||
total_pages = (file_size + PAGE_SIZE - 1) // PAGE_SIZE
|
||||
successful_pages = 0
|
||||
@@ -429,7 +406,7 @@ class WeChatDatabaseDecryptor:
|
||||
logger.error(f"页面 {page_num} HMAC验证失败,终止解密: {db_path}")
|
||||
return False
|
||||
|
||||
target.write(_decrypt_page(enc_key, page, page_num))
|
||||
target.write(_decrypt_page(self.key_bytes, page, page_num))
|
||||
successful_pages += 1
|
||||
|
||||
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 0 页")
|
||||
|
||||
@@ -19,22 +19,11 @@ from wechat_decrypt_tool.wechat_decrypt import (
|
||||
SQLITE_HEADER,
|
||||
WeChatDatabaseDecryptor,
|
||||
_derive_mac_key,
|
||||
_derive_sqlcipher_enc_key,
|
||||
decrypt_wechat_databases,
|
||||
)
|
||||
|
||||
|
||||
def _encrypt_page(
|
||||
raw_key: bytes,
|
||||
plain_page: bytes,
|
||||
page_num: int,
|
||||
salt: bytes,
|
||||
iv: bytes,
|
||||
*,
|
||||
sqlcipher_passphrase: bool = False,
|
||||
) -> bytes:
|
||||
enc_key = _derive_sqlcipher_enc_key(raw_key, salt) if sqlcipher_passphrase else raw_key
|
||||
|
||||
def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes:
|
||||
if page_num == 1:
|
||||
encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE]
|
||||
prefix = salt
|
||||
@@ -43,7 +32,7 @@ def _encrypt_page(
|
||||
prefix = b""
|
||||
|
||||
cipher = Cipher(
|
||||
algorithms.AES(enc_key),
|
||||
algorithms.AES(raw_key),
|
||||
modes.CBC(iv),
|
||||
backend=default_backend(),
|
||||
)
|
||||
@@ -51,7 +40,7 @@ def _encrypt_page(
|
||||
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
|
||||
|
||||
page_without_hmac = prefix + encrypted + iv
|
||||
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
|
||||
mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512)
|
||||
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :])
|
||||
mac.update(page_num.to_bytes(4, "little"))
|
||||
return page_without_hmac + mac.digest()
|
||||
@@ -85,39 +74,6 @@ class WeChatDecryptRawKeyTests(unittest.TestCase):
|
||||
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
|
||||
self.assertEqual(dst.read_bytes(), page1 + page2)
|
||||
|
||||
def test_decrypt_database_falls_back_to_sqlcipher_passphrase_mode(self):
|
||||
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
|
||||
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
|
||||
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
|
||||
iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20")
|
||||
|
||||
page1 = _build_plain_page(0x41, first_page=True)
|
||||
page2 = _build_plain_page(0x42, first_page=False)
|
||||
encrypted_db = _encrypt_page(
|
||||
passphrase_key,
|
||||
page1,
|
||||
1,
|
||||
salt,
|
||||
iv1,
|
||||
sqlcipher_passphrase=True,
|
||||
) + _encrypt_page(
|
||||
passphrase_key,
|
||||
page2,
|
||||
2,
|
||||
salt,
|
||||
iv2,
|
||||
sqlcipher_passphrase=True,
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
src = Path(tmpdir) / "source.db"
|
||||
dst = Path(tmpdir) / "out.db"
|
||||
src.write_bytes(encrypted_db)
|
||||
|
||||
decryptor = WeChatDatabaseDecryptor(passphrase_key.hex())
|
||||
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
|
||||
self.assertEqual(dst.read_bytes(), page1 + page2)
|
||||
|
||||
def test_decrypt_database_keeps_existing_output_on_hmac_failure(self):
|
||||
good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
|
||||
bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"
|
||||
|
||||
Reference in New Issue
Block a user