fix(key): 修复数据库密钥模式识别逻辑

- 新增原始加密密钥与 SQLCipher passphrase 两种密钥模式自动识别
- 通过首页 HMAC 校验确认实际密钥模式,避免错误派生导致解密失败
- 解密输出时清理加密页尾部的 IV/HMAC 保留区
- 增加解密失败时的 key_mode、HMAC 与 SQLite 诊断信息
- 新增单测覆盖两种密钥模式的解密路径
- 更新 README 补充数据库解密兼容性说明
This commit is contained in:
2977094657
2026-04-27 13:15:44 +08:00
Unverified
parent ad29c8b251
commit e93423db42
3 changed files with 186 additions and 97 deletions
-4
View File
@@ -176,10 +176,6 @@ npm run dist
## 使用指南
### 导出
侧边栏提供“导出”入口,点击下载图标可打开统一导出弹窗。当前导出界面只保留两个选项:导出数据库、导出资源文件;用户选择导出目录后会生成一个账号归档 ZIP。当两个选项都勾选时,导出会按账号目录直接归档,使用 ZIP 存储模式打包,尽量避免二次压缩带来的耗时。
### 获取解密密钥
在使用本工具之前,您需要先获取微信数据库的解密密钥。推荐使用以下工具:
+104 -93
View File
@@ -16,9 +16,7 @@ import json
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .app_paths import get_output_databases_dir
from .database_filters import should_skip_source_database
@@ -28,6 +26,75 @@ from .sqlite_diagnostics import collect_sqlite_diagnostics, sqlite_diagnostics_s
# SQLite文件头
SQLITE_HEADER = b"SQLite format 3\x00"
PAGE_SIZE = 4096
KEY_SIZE = 32
SALT_SIZE = 16
IV_SIZE = 16
HMAC_SIZE = 64
# WeChat 4.x SQLCipher/WCDB pages reserve IV + HMAC at the tail.
# When exporting to plain SQLite, do not keep encrypted IV/HMAC bytes in output pages.
RESERVE_SIZE = IV_SIZE + HMAC_SIZE
def _derive_mac_key(enc_key: bytes, salt: bytes) -> bytes:
"""Derive SQLCipher/WCDB page HMAC key."""
mac_salt = bytes(b ^ 0x3A for b in salt)
return hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SIZE)
def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes:
"""Derive AES enc_key from SQLCipher passphrase/base key."""
return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE)
def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes:
offset = SALT_SIZE if page_num == 1 else 0
data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(page[offset:data_end])
mac.update(page_num.to_bytes(4, "little"))
return mac.digest()
def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None:
"""Detect whether input key is raw enc_key or SQLCipher passphrase by page-1 HMAC."""
if len(page1) < PAGE_SIZE:
return None
salt = page1[:SALT_SIZE]
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE: PAGE_SIZE]
candidates = [
("raw_enc_key", key_material, _derive_mac_key(key_material, salt)),
]
derived_key = _derive_sqlcipher_enc_key(key_material, salt)
candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt)))
for mode, enc_key, mac_key in candidates:
if hmac.compare_digest(stored_page1_hmac, _compute_page_hmac(mac_key, page1, 1)):
return enc_key, mac_key, mode
return None
def _decrypt_page(enc_key: bytes, page: bytes, page_num: int) -> bytes:
iv = page[PAGE_SIZE - RESERVE_SIZE: PAGE_SIZE - RESERVE_SIZE + IV_SIZE]
offset = SALT_SIZE if page_num == 1 else 0
encrypted_page = page[offset: PAGE_SIZE - RESERVE_SIZE]
cipher = Cipher(
algorithms.AES(enc_key),
modes.CBC(iv),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize()
# Plain SQLite pages do not carry SQLCipher/WCDB IV/HMAC reserve bytes.
# Keep page size stable by zero-filling the reserve tail.
if page_num == 1:
return SQLITE_HEADER + decrypted_page + (b"\x00" * RESERVE_SIZE)
return decrypted_page + (b"\x00" * RESERVE_SIZE)
def _normalize_account_name(name: str) -> str:
@@ -398,113 +465,57 @@ class WeChatDatabaseDecryptor:
result["copied_as_sqlite"] = True
return _finalize(True)
# 提取salt (前16字节)
salt = encrypted_data[:16]
# 计算mac_salt (salt XOR 0x3a)
mac_salt = bytes(b ^ 0x3a for b in salt)
# 使用PBKDF2-SHA512派生密钥
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=salt,
iterations=256000,
backend=default_backend()
)
derived_key = kdf.derive(self.key_bytes)
# 派生MAC密钥
mac_kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=mac_salt,
iterations=2,
backend=default_backend()
)
mac_key = mac_kdf.derive(derived_key)
# 解密数据
page1 = encrypted_data[:PAGE_SIZE]
resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1)
if resolved_key_material is None:
_append_failed_page(1, "hmac")
result["total_pages"] = int(len(encrypted_data) // PAGE_SIZE)
result["failed_pages"] = 1
logger.warning("Page 1 HMAC verification failed; key does not match database: %s", db_path)
return _finalize(False, "key_mismatch")
enc_key, mac_key, key_mode = resolved_key_material
result["key_mode"] = key_mode
logger.info("Page 1 HMAC verification passed: mode=%s path=%s", key_mode, db_path)
decrypted_data = bytearray()
decrypted_data.extend(SQLITE_HEADER)
page_size = 4096
iv_size = 16
hmac_size = 64 # SHA512的HMAC是64字节
# 计算保留区域大小 (对齐到AES块大小)
reserve_size = iv_size + hmac_size
if reserve_size % 16 != 0:
reserve_size = ((reserve_size // 16) + 1) * 16
total_pages = len(encrypted_data) // page_size
total_pages = (len(encrypted_data) + PAGE_SIZE - 1) // PAGE_SIZE
successful_pages = 0
failed_pages = 0
result["total_pages"] = int(total_pages)
# 逐页解密
for cur_page in range(total_pages):
start = cur_page * page_size
end = start + page_size
page = encrypted_data[start:end]
page_num = cur_page + 1 # 页面编号从1开始
if len(page) < page_size:
logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes")
page_num = cur_page + 1
start = cur_page * PAGE_SIZE
page = encrypted_data[start:start + PAGE_SIZE]
if not page:
break
# 确定偏移量:第一页(cur_page == 0)需要跳过salt
offset = 16 if cur_page == 0 else 0 # SALT_SIZE = 16
# 提取存储的HMAC
hmac_start = page_size - reserve_size + iv_size
hmac_end = hmac_start + hmac_size
stored_hmac = page[hmac_start:hmac_end]
# 按照wechat-dump-rs的方式验证HMAC
data_end = page_size - reserve_size + iv_size
hmac_data = page[offset:data_end]
# 分步计算HMAC:先更新数据,再更新页面编号
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(hmac_data) # 包含加密数据+IV
mac.update(page_num.to_bytes(4, 'little')) # 页面编号(小端序)
expected_hmac = mac.digest()
if stored_hmac != expected_hmac:
logger.warning(f"页面 {page_num} HMAC验证失败")
if len(page) < PAGE_SIZE:
logger.warning(
"Page %s is short: %s bytes; padding to %s bytes",
page_num,
len(page),
PAGE_SIZE,
)
page = page + (b"\x00" * (PAGE_SIZE - len(page)))
stored_hmac = page[PAGE_SIZE - HMAC_SIZE: PAGE_SIZE]
expected_hmac = _compute_page_hmac(mac_key, page, page_num)
if not hmac.compare_digest(stored_hmac, expected_hmac):
logger.warning("Page %s HMAC verification failed", page_num)
failed_pages += 1
_append_failed_page(page_num, "hmac")
continue
# 提取IV和加密数据用于AES解密
iv = page[page_size - reserve_size:page_size - reserve_size + iv_size]
encrypted_page = page[offset:page_size - reserve_size]
# AES-CBC解密
try:
cipher = Cipher(
algorithms.AES(derived_key),
modes.CBC(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize()
# 按照wechat-dump-rs的方式重组页面数据
decrypted_data.extend(decrypted_page)
decrypted_data.extend(page[page_size - reserve_size:]) # 保留区域
decrypted_data.extend(_decrypt_page(enc_key, page, page_num))
successful_pages += 1
except Exception as e:
logger.error(f"页面 {page_num} AES解密失败: {e}")
logger.error("Page %s AES decryption failed: %s", page_num, e)
failed_pages += 1
_append_failed_page(page_num, "aes", str(e))
continue
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages}")
result["successful_pages"] = int(successful_pages)
result["failed_pages"] = int(failed_pages)
+82
View File
@@ -0,0 +1,82 @@
import hashlib
import hmac
import tempfile
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import wechat_decrypt_tool.wechat_decrypt as wechat_decrypt
from wechat_decrypt_tool.wechat_decrypt import (
HMAC_SIZE,
PAGE_SIZE,
RESERVE_SIZE,
SALT_SIZE,
SQLITE_HEADER,
WeChatDatabaseDecryptor,
_derive_mac_key,
_derive_sqlcipher_enc_key,
)
def _build_plain_page(fill: int, *, first_page: bool) -> bytes:
body = bytes([fill]) * (PAGE_SIZE - RESERVE_SIZE)
if first_page:
body = SQLITE_HEADER + body[len(SQLITE_HEADER):]
return body + (b"\x00" * RESERVE_SIZE)
def _encrypt_page(key_material: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes, *, passphrase: bool = False) -> bytes:
enc_key = _derive_sqlcipher_enc_key(key_material, salt) if passphrase else key_material
if page_num == 1:
encrypted_input = plain_page[SALT_SIZE: PAGE_SIZE - RESERVE_SIZE]
prefix = salt
else:
encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE]
prefix = b""
cipher = Cipher(algorithms.AES(enc_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
page_without_hmac = prefix + encrypted + iv
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0:])
mac.update(page_num.to_bytes(4, "little"))
return page_without_hmac + mac.digest()
def _decrypt_sample(key_hex: str, encrypted_db: bytes, monkeypatch) -> bytes:
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True})
monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok")
decryptor = WeChatDatabaseDecryptor(key_hex)
assert decryptor.decrypt_database(str(src), str(dst))
return dst.read_bytes()
def test_decrypt_database_accepts_raw_enc_key_like_weflow(monkeypatch):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x41, first_page=True)
page2 = _build_plain_page(0x42, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("0102030405060708090a0b0c0d0e0f10"))
encrypted_db += _encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("1112131415161718191a1b1c1d1e1f20"))
assert _decrypt_sample(raw_key.hex(), encrypted_db, monkeypatch) == page1 + page2
def test_decrypt_database_keeps_sqlcipher_passphrase_compatibility(monkeypatch):
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
salt = bytes.fromhex("40f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x51, first_page=True)
page2 = _build_plain_page(0x52, first_page=False)
encrypted_db = _encrypt_page(passphrase_key, page1, 1, salt, bytes.fromhex("2122232425262728292a2b2c2d2e2f30"), passphrase=True)
encrypted_db += _encrypt_page(passphrase_key, page2, 2, salt, bytes.fromhex("3132333435363738393a3b3c3d3e3f40"), passphrase=True)
assert _decrypt_sample(passphrase_key.hex(), encrypted_db, monkeypatch) == page1 + page2