diff --git a/frontend/pages/decrypt.vue b/frontend/pages/decrypt.vue index a13ffa0..4e90eed 100644 --- a/frontend/pages/decrypt.vue +++ b/frontend/pages/decrypt.vue @@ -73,9 +73,6 @@ 点击按钮将自动获取【数据库】与【图片】双重密钥。您也可以手动输入已知的64位密钥(使用wx_key等工具获取)。

-
- 提示:数据库密钥跟随“账号 + 设备”下发。同一账号在另一台电脑生成的聊天记录,复制到当前设备后,通常无法在当前设备重新获取原设备对应的密钥,因此也无法直接解密。 -
diff --git a/src/wechat_decrypt_tool/routers/decrypt.py b/src/wechat_decrypt_tool/routers/decrypt.py index 433c209..d797441 100644 --- a/src/wechat_decrypt_tool/routers/decrypt.py +++ b/src/wechat_decrypt_tool/routers/decrypt.py @@ -14,12 +14,7 @@ from ..app_paths import get_output_databases_dir from ..logging_config import get_logger from ..path_fix import PathFixRoute from ..key_store import upsert_account_keys_in_store -from ..wechat_decrypt import ( - WeChatDatabaseDecryptor, - build_decrypt_result_message, - decrypt_wechat_databases, - scan_account_databases_from_path, -) +from ..wechat_decrypt import WeChatDatabaseDecryptor, decrypt_wechat_databases, scan_account_databases_from_path logger = get_logger(__name__) @@ -81,7 +76,6 @@ async def decrypt_databases(request: DecryptRequest): "message": results["message"], "processed_files": results["processed_files"], "failed_files": results["failed_files"], - "failure_details": results.get("failure_details", []), "account_results": results.get("account_results", {}), } @@ -165,7 +159,6 @@ async def decrypt_databases_stream( fail_count = 0 processed_files: list[str] = [] failed_files: list[str] = [] - failure_details: list[dict] = [] account_results: dict = {} overall_current = 0 @@ -188,7 +181,6 @@ async def decrypt_databases_stream( account_success = 0 account_processed: list[str] = [] account_failed: list[str] = [] - account_failure_details: list[dict] = [] for db_info in dbs: if await request.is_disconnected(): @@ -240,20 +232,11 @@ async def decrypt_databases_stream( status = "success" msg = "解密成功" else: - failure_detail = { - "account": account, - "file": db_path, - "name": db_name, - "code": str(decryptor.last_error_code or "").strip(), - "reason": str(decryptor.last_error_message or "").strip() or "解密失败", - } account_failed.append(db_path) - account_failure_details.append(failure_detail) failed_files.append(db_path) - failure_details.append(failure_detail) fail_count += 1 status = "fail" - msg = failure_detail["reason"] + msg = "解密失败" yield _sse( { @@ -278,7 +261,6 @@ async def decrypt_databases_stream( "output_dir": str(account_output_dir), "processed_files": account_processed, "failed_files": account_failed, - "failure_details": account_failure_details, } # Build cache table (keep behavior consistent with the POST endpoint). @@ -325,15 +307,9 @@ async def decrypt_databases_stream( "success_count": success_count, "failure_count": total_databases - success_count, "output_directory": str(base_output_dir.absolute()), - "message": build_decrypt_result_message( - total_databases=total_databases, - success_count=success_count, - failed_count=total_databases - success_count, - failure_details=failure_details, - ), + "message": f"解密完成: 成功 {success_count}/{total_databases}", "processed_files": processed_files, "failed_files": failed_files, - "failure_details": failure_details, "account_results": account_results, } diff --git a/src/wechat_decrypt_tool/wechat_decrypt.py b/src/wechat_decrypt_tool/wechat_decrypt.py index a8fd440..8afcb13 100644 --- a/src/wechat_decrypt_tool/wechat_decrypt.py +++ b/src/wechat_decrypt_tool/wechat_decrypt.py @@ -13,12 +13,12 @@ import hashlib import hmac import os import json -import shutil -import tempfile from pathlib import Path from cryptography.hazmat.backends import default_backend +from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from .app_paths import get_output_databases_dir @@ -26,94 +26,6 @@ from .app_paths import get_output_databases_dir # SQLite文件头 SQLITE_HEADER = b"SQLite format 3\x00" -PAGE_SIZE = 4096 -KEY_SIZE = 32 -SALT_SIZE = 16 -IV_SIZE = 16 -HMAC_SIZE = 64 -RESERVE_SIZE = 80 -KEY_MISMATCH_GUIDANCE = ( - "请在当前设备登录该账号后重新获取密钥;" - "如果聊天记录是从另一台设备复制过来的,当前设备通常无法获取原设备对应的密钥。" -) - - -def _derive_mac_key(raw_key: bytes, salt: bytes) -> bytes: - mac_salt = bytes(b ^ 0x3A for b in salt) - return hashlib.pbkdf2_hmac("sha512", raw_key, mac_salt, 2, dklen=KEY_SIZE) - - -def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes: - offset = SALT_SIZE if page_num == 1 else 0 - data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE - mac = hmac.new(mac_key, digestmod=hashlib.sha512) - mac.update(page[offset:data_end]) - mac.update(page_num.to_bytes(4, "little")) - return mac.digest() - - -def _decrypt_page(raw_key: bytes, page: bytes, page_num: int) -> bytes: - iv = page[PAGE_SIZE - RESERVE_SIZE : PAGE_SIZE - RESERVE_SIZE + IV_SIZE] - offset = SALT_SIZE if page_num == 1 else 0 - encrypted = page[offset : PAGE_SIZE - RESERVE_SIZE] - - cipher = Cipher( - algorithms.AES(raw_key), - modes.CBC(iv), - backend=default_backend(), - ) - decryptor = cipher.decryptor() - decrypted = decryptor.update(encrypted) + decryptor.finalize() - - if page_num == 1: - return SQLITE_HEADER + decrypted + (b"\x00" * RESERVE_SIZE) - return decrypted + (b"\x00" * RESERVE_SIZE) - - -def _failure_matches_key_mismatch(detail: dict | None) -> bool: - if not isinstance(detail, dict): - return False - code = str(detail.get("code") or "").strip().lower() - reason = str(detail.get("reason") or "").strip() - if code == "key_mismatch": - return True - return ("密钥" in reason and "不匹配" in reason) or ("当前数据库密钥不正确" in reason) - - -def build_decrypt_result_message( - total_databases: int, - success_count: int, - failed_count: int, - failure_details: list[dict] | None = None, -) -> str: - total = max(int(total_databases or 0), 0) - success = max(int(success_count or 0), 0) - failed = max(int(failed_count or 0), 0) - details = list(failure_details or []) - - if total == 0: - return "未找到可解密的数据库文件" - - if failed == 0: - return f"解密完成: 成功 {success}/{total}" - - key_mismatch_count = sum(1 for item in details if _failure_matches_key_mismatch(item)) - - if success == 0 and failed == total: - if key_mismatch_count == failed: - return ( - f"解密失败:当前数据库密钥不正确,或该密钥不属于当前账号/当前设备(0/{total} 成功)。" - + KEY_MISMATCH_GUIDANCE - ) - return f"解密失败:0/{total} 个数据库解密成功,请检查密钥、账号与数据库路径是否匹配。" - - if key_mismatch_count > 0: - return ( - f"解密完成:成功 {success}/{total},失败 {failed}/{total}。" - "失败文件中包含密钥不匹配的数据库,请确认使用的是当前账号在当前设备上的密钥。" - ) - - return f"解密完成:成功 {success}/{total},失败 {failed}/{total}。" def _normalize_account_name(name: str) -> str: @@ -309,123 +221,153 @@ class WeChatDatabaseDecryptor: self.key_bytes = bytes.fromhex(key_hex) except ValueError: raise ValueError("密钥必须是有效的十六进制字符串") - self.last_error_code = "" - self.last_error_message = "" - - def _set_last_error(self, code: str, message: str) -> None: - self.last_error_code = str(code or "").strip() - self.last_error_message = str(message or "").strip() - - def _clear_last_error(self) -> None: - self.last_error_code = "" - self.last_error_message = "" def decrypt_database(self, db_path: str, output_path: str) -> bool: """解密微信4.x版本数据库 - 这里传入的 key 已经是从微信进程内存提取出的 raw enc_key, - 不是 SQLCipher 的口令,因此不能再做一轮 PBKDF2。 + 使用SQLCipher 4.0参数: + - PBKDF2-SHA512, 256000轮迭代 + - AES-256-CBC加密 + - HMAC-SHA512验证 + - 页面大小4096字节 """ from .logging_config import get_logger logger = get_logger(__name__) logger.info(f"开始解密数据库: {db_path}") - - tmp_output_path = "" - self._clear_last_error() + try: - file_size = os.path.getsize(db_path) - logger.info(f"读取文件大小: {file_size} bytes") + with open(db_path, 'rb') as f: + encrypted_data = f.read() + + logger.info(f"读取文件大小: {len(encrypted_data)} bytes") - if file_size < PAGE_SIZE: - message = f"数据库文件过小,无法解密: {db_path}" - self._set_last_error("file_too_small", message) - logger.warning(message) - return False - - output_dir = Path(output_path).parent - output_dir.mkdir(parents=True, exist_ok=True) - - with open(db_path, "rb") as source: - page1 = source.read(PAGE_SIZE) - - if len(page1) < PAGE_SIZE: - message = f"数据库首页大小不足,无法解密: {db_path}" - self._set_last_error("page_too_small", message) - logger.warning(message) + if len(encrypted_data) < 4096: + logger.warning(f"文件太小,跳过解密: {db_path}") return False # 检查是否已经是解密的数据库 - if page1.startswith(SQLITE_HEADER): + if encrypted_data.startswith(SQLITE_HEADER): logger.info(f"文件已是SQLite格式,直接复制: {db_path}") - fd, tmp_output_path = tempfile.mkstemp( - prefix=f".{Path(output_path).name}.", - suffix=".tmp", - dir=str(output_dir), - ) - os.close(fd) - with open(db_path, "rb") as src, open(tmp_output_path, "wb") as dst: - shutil.copyfileobj(src, dst, length=1024 * 1024) - os.replace(tmp_output_path, output_path) - tmp_output_path = "" + with open(output_path, 'wb') as f: + f.write(encrypted_data) return True - - salt = page1[:SALT_SIZE] - mac_key = _derive_mac_key(self.key_bytes, salt) - expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1) - stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE] - if stored_page1_hmac != expected_page1_hmac: - message = f"当前数据库密钥不正确,或该密钥不属于当前账号/当前设备: {db_path}" - self._set_last_error("key_mismatch", message) - logger.error(f"页面 1 HMAC验证失败,密钥与数据库不匹配: {db_path}") - return False - - total_pages = (file_size + PAGE_SIZE - 1) // PAGE_SIZE - successful_pages = 0 - fd, tmp_output_path = tempfile.mkstemp( - prefix=f".{Path(output_path).name}.", - suffix=".tmp", - dir=str(output_dir), + + # 提取salt (前16字节) + salt = encrypted_data[:16] + + # 计算mac_salt (salt XOR 0x3a) + mac_salt = bytes(b ^ 0x3a for b in salt) + + # 使用PBKDF2-SHA512派生密钥 + kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, + salt=salt, + iterations=256000, + backend=default_backend() ) - os.close(fd) - - with open(db_path, "rb") as source, open(tmp_output_path, "wb") as target: - for page_num in range(1, total_pages + 1): - page = source.read(PAGE_SIZE) - if not page: - break - if len(page) < PAGE_SIZE: - logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes,自动补齐到 {PAGE_SIZE} bytes") - page = page + (b"\x00" * (PAGE_SIZE - len(page))) - - stored_hmac = page[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE] - expected_hmac = _compute_page_hmac(mac_key, page, page_num) - if stored_hmac != expected_hmac: - message = f"数据库校验失败,文件可能损坏或密钥不匹配: {db_path}" - self._set_last_error("page_hmac_mismatch", message) - logger.error(f"页面 {page_num} HMAC验证失败,终止解密: {db_path}") - return False - - target.write(_decrypt_page(self.key_bytes, page, page_num)) + derived_key = kdf.derive(self.key_bytes) + + # 派生MAC密钥 + mac_kdf = PBKDF2HMAC( + algorithm=hashes.SHA512(), + length=32, + salt=mac_salt, + iterations=2, + backend=default_backend() + ) + mac_key = mac_kdf.derive(derived_key) + + # 解密数据 + decrypted_data = bytearray() + decrypted_data.extend(SQLITE_HEADER) + + page_size = 4096 + iv_size = 16 + hmac_size = 64 # SHA512的HMAC是64字节 + + # 计算保留区域大小 (对齐到AES块大小) + reserve_size = iv_size + hmac_size + if reserve_size % 16 != 0: + reserve_size = ((reserve_size // 16) + 1) * 16 + + total_pages = len(encrypted_data) // page_size + successful_pages = 0 + failed_pages = 0 + + # 逐页解密 + for cur_page in range(total_pages): + start = cur_page * page_size + end = start + page_size + page = encrypted_data[start:end] + + page_num = cur_page + 1 # 页面编号从1开始 + + if len(page) < page_size: + logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes") + break + + # 确定偏移量:第一页(cur_page == 0)需要跳过salt + offset = 16 if cur_page == 0 else 0 # SALT_SIZE = 16 + + # 提取存储的HMAC + hmac_start = page_size - reserve_size + iv_size + hmac_end = hmac_start + hmac_size + stored_hmac = page[hmac_start:hmac_end] + + # 按照wechat-dump-rs的方式验证HMAC + data_end = page_size - reserve_size + iv_size + hmac_data = page[offset:data_end] + + # 分步计算HMAC:先更新数据,再更新页面编号 + mac = hmac.new(mac_key, digestmod=hashlib.sha512) + mac.update(hmac_data) # 包含加密数据+IV + mac.update(page_num.to_bytes(4, 'little')) # 页面编号(小端序) + expected_hmac = mac.digest() + + if stored_hmac != expected_hmac: + logger.warning(f"页面 {page_num} HMAC验证失败") + failed_pages += 1 + continue + + # 提取IV和加密数据用于AES解密 + iv = page[page_size - reserve_size:page_size - reserve_size + iv_size] + encrypted_page = page[offset:page_size - reserve_size] + + # AES-CBC解密 + try: + cipher = Cipher( + algorithms.AES(derived_key), + modes.CBC(iv), + backend=default_backend() + ) + decryptor = cipher.decryptor() + decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize() + + # 按照wechat-dump-rs的方式重组页面数据 + decrypted_data.extend(decrypted_page) + decrypted_data.extend(page[page_size - reserve_size:]) # 保留区域 + successful_pages += 1 + + except Exception as e: + logger.error(f"页面 {page_num} AES解密失败: {e}") + failed_pages += 1 + continue - logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 0 页") - os.replace(tmp_output_path, output_path) - tmp_output_path = "" - logger.info(f"解密文件大小: {os.path.getsize(output_path)} bytes") - self._clear_last_error() + logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages} 页") + + # 写入解密后的文件 + with open(output_path, 'wb') as f: + f.write(decrypted_data) + + logger.info(f"解密文件大小: {len(decrypted_data)} bytes") return True except Exception as e: - self._set_last_error("exception", f"解密过程中发生异常: {e}") logger.error(f"解密失败: {db_path}, 错误: {e}") return False - finally: - if tmp_output_path: - try: - os.remove(tmp_output_path) - except OSError: - pass def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> dict: """ @@ -550,7 +492,6 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di success_count = 0 processed_files = [] failed_files = [] - failure_details = [] account_results = {} for account_name, databases in account_databases.items(): @@ -582,7 +523,6 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di account_success = 0 account_processed = [] account_failed = [] - account_failure_details = [] for db_info in databases: db_path = db_info['path'] @@ -602,16 +542,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di else: account_failed.append(db_path) failed_files.append(db_path) - failure_detail = { - "account": account_name, - "file": db_path, - "name": db_name, - "code": str(decryptor.last_error_code or "").strip(), - "reason": str(decryptor.last_error_message or "").strip() or "解密失败", - } - account_failure_details.append(failure_detail) - failure_details.append(failure_detail) - logger.error(f"解密失败: {account_name}/{db_name} reason={failure_detail['reason']}") + logger.error(f"解密失败: {account_name}/{db_name}") # 记录账号解密结果 account_results[account_name] = { @@ -620,8 +551,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di "failed": len(databases) - account_success, "output_dir": str(account_output_dir), "processed_files": account_processed, - "failed_files": account_failed, - "failure_details": account_failure_details, + "failed_files": account_failed } # 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表 @@ -645,23 +575,15 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di logger.info(f"账号 {account_name} 解密完成: 成功 {account_success}/{len(databases)}") # 返回结果 - failed_count = total_databases - success_count - message = build_decrypt_result_message( - total_databases=total_databases, - success_count=success_count, - failed_count=failed_count, - failure_details=failure_details, - ) result = { "status": "success" if success_count > 0 else "error", - "message": message, + "message": f"解密完成: 成功 {success_count}/{total_databases}", "total_databases": total_databases, "successful_count": success_count, - "failed_count": failed_count, + "failed_count": total_databases - success_count, "output_directory": str(base_output_dir.absolute()), "processed_files": processed_files, "failed_files": failed_files, - "failure_details": failure_details, "account_results": account_results, # 新增:按账号的详细结果 "detected_accounts": detected_accounts, } @@ -669,9 +591,8 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di logger.info("=" * 60) logger.info("解密任务完成!") logger.info(f"成功: {success_count}/{total_databases}") - logger.info(f"失败: {failed_count}/{total_databases}") + logger.info(f"失败: {total_databases - success_count}/{total_databases}") logger.info(f"输出目录: {base_output_dir.absolute()}") - logger.info(f"结果说明: {message}") logger.info("=" * 60) return result diff --git a/tests/test_decrypt_stream_sse.py b/tests/test_decrypt_stream_sse.py index f39d04c..c041630 100644 --- a/tests/test_decrypt_stream_sse.py +++ b/tests/test_decrypt_stream_sse.py @@ -3,44 +3,14 @@ import os import sys import unittest import importlib -import hashlib -import hmac from pathlib import Path from tempfile import TemporaryDirectory -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - ROOT = Path(__file__).resolve().parents[1] sys.path.insert(0, str(ROOT / "src")) -def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes: - from wechat_decrypt_tool.wechat_decrypt import PAGE_SIZE, RESERVE_SIZE, SALT_SIZE, _derive_mac_key - - if page_num == 1: - encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE] - prefix = salt - else: - encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE] - prefix = b"" - - cipher = Cipher( - algorithms.AES(raw_key), - modes.CBC(iv), - backend=default_backend(), - ) - encryptor = cipher.encryptor() - encrypted = encryptor.update(encrypted_input) + encryptor.finalize() - - page_without_hmac = prefix + encrypted + iv - mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512) - mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :]) - mac.update(page_num.to_bytes(4, "little")) - return page_without_hmac + mac.digest() - - class TestDecryptStreamSSE(unittest.TestCase): def test_decrypt_stream_reports_progress(self): from fastapi import FastAPI @@ -115,76 +85,6 @@ class TestDecryptStreamSSE(unittest.TestCase): else: os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = prev_build_cache - def test_decrypt_stream_reports_key_scope_error_for_wrong_key(self): - from fastapi import FastAPI - from fastapi.testclient import TestClient - - from wechat_decrypt_tool.wechat_decrypt import PAGE_SIZE, RESERVE_SIZE, SQLITE_HEADER - - good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") - bad_key = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210" - salt = bytes.fromhex("11223344556677889900aabbccddeeff") - iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") - plain_page = SQLITE_HEADER + (b"A" * (PAGE_SIZE - RESERVE_SIZE - len(SQLITE_HEADER))) + (b"\x00" * RESERVE_SIZE) - encrypted_db = _encrypt_page(good_key, plain_page, 1, salt, iv1) - - with TemporaryDirectory() as td: - root = Path(td) - - prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR") - prev_build_cache = os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE") - try: - os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) - os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = "0" - - import wechat_decrypt_tool.app_paths as app_paths - import wechat_decrypt_tool.routers.decrypt as decrypt_router - - importlib.reload(app_paths) - importlib.reload(decrypt_router) - - db_storage = root / "xwechat_files" / "wxid_wrong_key_user" / "db_storage" - db_storage.mkdir(parents=True, exist_ok=True) - (db_storage / "MSG0.db").write_bytes(encrypted_db) - - app = FastAPI() - app.include_router(decrypt_router.router) - client = TestClient(app) - - events: list[dict] = [] - with client.stream( - "GET", - "/api/decrypt_stream", - params={"key": bad_key, "db_storage_path": str(db_storage)}, - ) as resp: - self.assertEqual(resp.status_code, 200) - for line in resp.iter_lines(): - if not line: - continue - if isinstance(line, bytes): - line = line.decode("utf-8", errors="ignore") - line = str(line) - if line.startswith(":") or not line.startswith("data: "): - continue - payload = json.loads(line[len("data: ") :]) - events.append(payload) - if payload.get("type") in {"complete", "error"}: - break - - self.assertEqual(events[-1].get("type"), "complete") - self.assertEqual(events[-1].get("status"), "failed") - self.assertIn("当前数据库密钥不正确", events[-1].get("message", "")) - self.assertIn("另一台设备复制", events[-1].get("message", "")) - finally: - if prev_data_dir is None: - os.environ.pop("WECHAT_TOOL_DATA_DIR", None) - else: - os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data_dir - if prev_build_cache is None: - os.environ.pop("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", None) - else: - os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = prev_build_cache - if __name__ == "__main__": unittest.main() diff --git a/tests/test_wechat_decrypt_raw_key.py b/tests/test_wechat_decrypt_raw_key.py deleted file mode 100644 index 4588ad2..0000000 --- a/tests/test_wechat_decrypt_raw_key.py +++ /dev/null @@ -1,128 +0,0 @@ -import hashlib -import hmac -import os -import sys -import tempfile -import unittest -from pathlib import Path - -from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes - -ROOT = Path(__file__).resolve().parents[1] -sys.path.insert(0, str(ROOT / "src")) - -from wechat_decrypt_tool.wechat_decrypt import ( - PAGE_SIZE, - RESERVE_SIZE, - SALT_SIZE, - SQLITE_HEADER, - WeChatDatabaseDecryptor, - _derive_mac_key, - decrypt_wechat_databases, -) - - -def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes: - if page_num == 1: - encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE] - prefix = salt - else: - encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE] - prefix = b"" - - cipher = Cipher( - algorithms.AES(raw_key), - modes.CBC(iv), - backend=default_backend(), - ) - encryptor = cipher.encryptor() - encrypted = encryptor.update(encrypted_input) + encryptor.finalize() - - page_without_hmac = prefix + encrypted + iv - mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512) - mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :]) - mac.update(page_num.to_bytes(4, "little")) - return page_without_hmac + mac.digest() - - -def _build_plain_page(body_byte: int, *, first_page: bool) -> bytes: - if first_page: - payload = SQLITE_HEADER + bytes([body_byte]) * (PAGE_SIZE - RESERVE_SIZE - len(SQLITE_HEADER)) - else: - payload = bytes([body_byte]) * (PAGE_SIZE - RESERVE_SIZE) - return payload + (b"\x00" * RESERVE_SIZE) - - -class WeChatDecryptRawKeyTests(unittest.TestCase): - def test_decrypt_database_uses_raw_enc_key(self): - raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") - salt = bytes.fromhex("11223344556677889900aabbccddeeff") - iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") - iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20") - - page1 = _build_plain_page(0x41, first_page=True) - page2 = _build_plain_page(0x42, first_page=False) - encrypted_db = _encrypt_page(raw_key, page1, 1, salt, iv1) + _encrypt_page(raw_key, page2, 2, salt, iv2) - - with tempfile.TemporaryDirectory() as tmpdir: - src = Path(tmpdir) / "source.db" - dst = Path(tmpdir) / "out.db" - src.write_bytes(encrypted_db) - - decryptor = WeChatDatabaseDecryptor(raw_key.hex()) - self.assertTrue(decryptor.decrypt_database(str(src), str(dst))) - self.assertEqual(dst.read_bytes(), page1 + page2) - - def test_decrypt_database_keeps_existing_output_on_hmac_failure(self): - good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") - bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210" - salt = bytes.fromhex("11223344556677889900aabbccddeeff") - iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") - - page1 = _build_plain_page(0x41, first_page=True) - encrypted_db = _encrypt_page(good_key, page1, 1, salt, iv1) - - with tempfile.TemporaryDirectory() as tmpdir: - src = Path(tmpdir) / "source.db" - dst = Path(tmpdir) / "out.db" - src.write_bytes(encrypted_db) - dst.write_bytes(b"keep-existing-output") - - decryptor = WeChatDatabaseDecryptor(bad_key_hex) - self.assertFalse(decryptor.decrypt_database(str(src), str(dst))) - self.assertEqual(dst.read_bytes(), b"keep-existing-output") - - def test_decrypt_wechat_databases_reports_key_scope_message(self): - good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef") - bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210" - salt = bytes.fromhex("11223344556677889900aabbccddeeff") - iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10") - - page1 = _build_plain_page(0x41, first_page=True) - encrypted_db = _encrypt_page(good_key, page1, 1, salt, iv1) - - with tempfile.TemporaryDirectory() as tmpdir: - root = Path(tmpdir) - db_storage = root / "xwechat_files" / "wxid_scope_user" / "db_storage" - db_storage.mkdir(parents=True, exist_ok=True) - (db_storage / "MSG0.db").write_bytes(encrypted_db) - - prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR") - try: - os.environ["WECHAT_TOOL_DATA_DIR"] = str(root) - result = decrypt_wechat_databases(str(db_storage), bad_key_hex) - finally: - if prev_data_dir is None: - os.environ.pop("WECHAT_TOOL_DATA_DIR", None) - else: - os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data_dir - - self.assertEqual(result["status"], "error") - self.assertIn("当前数据库密钥不正确", result["message"]) - self.assertIn("账号/当前设备", result["message"]) - self.assertIn("另一台设备复制", result["message"]) - - -if __name__ == "__main__": - unittest.main()