mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-06-18 15:54:08 +08:00
Compare commits
2 Commits
@@ -186,6 +186,178 @@ def _sql_literal(value: Any) -> str:
|
||||
return "'" + s.replace("'", "''") + "'"
|
||||
|
||||
|
||||
def _pick_case_insensitive_value(item: Any, *keys: str) -> Any:
|
||||
if not isinstance(item, dict):
|
||||
return None
|
||||
for key in keys:
|
||||
if key in item and item[key] is not None:
|
||||
return item[key]
|
||||
key_lc = str(key or "").strip().lower()
|
||||
for actual_key, actual_value in item.items():
|
||||
if str(actual_key or "").strip().lower() == key_lc and actual_value is not None:
|
||||
return actual_value
|
||||
return None
|
||||
|
||||
|
||||
def _table_exists_case_insensitive(conn: sqlite3.Connection, table_name: str) -> bool:
|
||||
try:
|
||||
row = conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower(?) LIMIT 1",
|
||||
(str(table_name or "").strip(),),
|
||||
).fetchone()
|
||||
return bool(row)
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _ensure_output_name2id_table(conn: sqlite3.Connection) -> bool:
|
||||
if _table_exists_case_insensitive(conn, "Name2Id"):
|
||||
return True
|
||||
try:
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE IF NOT EXISTS Name2Id (
|
||||
user_name TEXT,
|
||||
is_session INTEGER DEFAULT 1
|
||||
)
|
||||
"""
|
||||
)
|
||||
conn.commit()
|
||||
return True
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _best_effort_upsert_output_name2id_rows(
|
||||
conn: sqlite3.Connection,
|
||||
*,
|
||||
account_name: str,
|
||||
rows: list[dict[str, Any]],
|
||||
) -> bool:
|
||||
if not rows:
|
||||
return _table_exists_case_insensitive(conn, "Name2Id")
|
||||
if not _ensure_output_name2id_table(conn):
|
||||
return False
|
||||
try:
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)",
|
||||
(str(account_name or "").strip(), 1),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
wrote = False
|
||||
for row in rows:
|
||||
try:
|
||||
rid = int(row.get("real_sender_id") or 0)
|
||||
except Exception:
|
||||
rid = 0
|
||||
username = str(row.get("sender_username") or "").strip()
|
||||
if rid <= 0 or not username:
|
||||
continue
|
||||
try:
|
||||
conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)",
|
||||
(rid, username, 1),
|
||||
)
|
||||
wrote = True
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
if wrote:
|
||||
try:
|
||||
conn.commit()
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def _sync_output_name2id_from_live(
|
||||
conn: sqlite3.Connection,
|
||||
*,
|
||||
rt_conn: Any,
|
||||
msg_db_path_real: Path,
|
||||
) -> dict[str, Any]:
|
||||
if not _ensure_output_name2id_table(conn):
|
||||
return {"status": "missing_local_table", "rows": 0}
|
||||
|
||||
local_row = conn.execute("SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id").fetchone()
|
||||
try:
|
||||
local_count = int((local_row["c"] if isinstance(local_row, sqlite3.Row) else local_row[0]) or 0)
|
||||
except Exception:
|
||||
local_count = 0
|
||||
try:
|
||||
local_max = int((local_row["mx"] if isinstance(local_row, sqlite3.Row) else local_row[1]) or 0)
|
||||
except Exception:
|
||||
local_max = 0
|
||||
|
||||
sql_stats = "SELECT COUNT(1) AS c, COALESCE(MAX(rowid), 0) AS mx FROM Name2Id"
|
||||
with rt_conn.lock:
|
||||
live_stats_rows = _wcdb_exec_query(rt_conn.handle, kind="message", path=str(msg_db_path_real), sql=sql_stats)
|
||||
|
||||
live_stats = live_stats_rows[0] if live_stats_rows and isinstance(live_stats_rows[0], dict) else {}
|
||||
try:
|
||||
live_count = int(_pick_case_insensitive_value(live_stats, "c", "count") or 0)
|
||||
except Exception:
|
||||
live_count = 0
|
||||
try:
|
||||
live_max = int(_pick_case_insensitive_value(live_stats, "mx", "max_rowid", "max") or 0)
|
||||
except Exception:
|
||||
live_max = 0
|
||||
|
||||
if local_count == live_count and local_max == live_max:
|
||||
return {
|
||||
"status": "up_to_date",
|
||||
"rows": int(local_count),
|
||||
"localCount": int(local_count),
|
||||
"liveCount": int(live_count),
|
||||
"localMax": int(local_max),
|
||||
"liveMax": int(live_max),
|
||||
}
|
||||
|
||||
sql_rows = "SELECT rowid AS rowid, user_name AS user_name, COALESCE(is_session, 1) AS is_session FROM Name2Id ORDER BY rowid ASC"
|
||||
with rt_conn.lock:
|
||||
live_rows = _wcdb_exec_query(rt_conn.handle, kind="message", path=str(msg_db_path_real), sql=sql_rows)
|
||||
|
||||
values: list[tuple[int, str, int]] = []
|
||||
seen_rowids: set[int] = set()
|
||||
for item in live_rows:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
try:
|
||||
rid = int(_pick_case_insensitive_value(item, "rowid") or 0)
|
||||
except Exception:
|
||||
rid = 0
|
||||
username = str(_pick_case_insensitive_value(item, "user_name", "username") or "").strip()
|
||||
try:
|
||||
is_session = int(_pick_case_insensitive_value(item, "is_session") or 0)
|
||||
except Exception:
|
||||
is_session = 0
|
||||
if rid <= 0 or not username or rid in seen_rowids:
|
||||
continue
|
||||
seen_rowids.add(rid)
|
||||
values.append((rid, username, is_session))
|
||||
|
||||
if live_count > 0 and not values:
|
||||
raise ValueError("Live Name2Id rows could not be decoded.")
|
||||
|
||||
conn.execute("DELETE FROM Name2Id")
|
||||
if values:
|
||||
conn.executemany(
|
||||
"INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)",
|
||||
values,
|
||||
)
|
||||
conn.commit()
|
||||
return {
|
||||
"status": "refreshed",
|
||||
"rows": int(len(values)),
|
||||
"localCount": int(local_count),
|
||||
"liveCount": int(live_count),
|
||||
"localMax": int(local_max),
|
||||
"liveMax": int(live_max),
|
||||
}
|
||||
|
||||
|
||||
def _normalize_edit_value(col: str, value: Any, *, from_snapshot: bool = False) -> Any:
|
||||
c = str(col or "").strip().lower()
|
||||
if value is None:
|
||||
@@ -1271,6 +1443,7 @@ def sync_chat_realtime_messages(
|
||||
# Some sessions may not exist in the decrypted snapshot yet; create the missing Msg_<md5> table
|
||||
# so we can insert the realtime rows and make `/api/chat/messages` work after switching off realtime.
|
||||
msg_db_path, table_name = _ensure_decrypted_message_table(account_dir, username)
|
||||
msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem)
|
||||
logger.info(
|
||||
"[%s] resolved decrypted table account=%s username=%s db=%s table=%s",
|
||||
trace_id,
|
||||
@@ -1283,6 +1456,34 @@ def sync_chat_realtime_messages(
|
||||
msg_conn = sqlite3.connect(str(msg_db_path))
|
||||
msg_conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
name2id_synced = False
|
||||
try:
|
||||
sync_t0 = time.perf_counter()
|
||||
name2id_result = _sync_output_name2id_from_live(
|
||||
msg_conn,
|
||||
rt_conn=rt_conn,
|
||||
msg_db_path_real=msg_db_path_real,
|
||||
)
|
||||
sync_ms = (time.perf_counter() - sync_t0) * 1000.0
|
||||
name2id_synced = str(name2id_result.get("status") or "") in {"up_to_date", "refreshed"}
|
||||
logger.info(
|
||||
"[%s] Name2Id sync account=%s db=%s status=%s rows=%s ms=%.1f",
|
||||
trace_id,
|
||||
account_dir.name,
|
||||
msg_db_path.stem,
|
||||
str(name2id_result.get("status") or ""),
|
||||
int(name2id_result.get("rows") or 0),
|
||||
sync_ms,
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"[%s] Name2Id sync failed account=%s db=%s error=%s",
|
||||
trace_id,
|
||||
account_dir.name,
|
||||
msg_db_path.stem,
|
||||
str(e),
|
||||
)
|
||||
|
||||
quoted_table = _quote_ident(table_name)
|
||||
row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone()
|
||||
try:
|
||||
@@ -1425,42 +1626,12 @@ def sync_chat_realtime_messages(
|
||||
|
||||
inserted = 0
|
||||
backfilled = 0
|
||||
if new_rows:
|
||||
# Best-effort: keep Name2Id updated so decrypted queries can resolve sender usernames.
|
||||
# Rowid mapping is important (message.real_sender_id joins Name2Id.rowid).
|
||||
try:
|
||||
has_name2id = bool(
|
||||
msg_conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1"
|
||||
).fetchone()
|
||||
)
|
||||
except Exception:
|
||||
has_name2id = False
|
||||
|
||||
if has_name2id:
|
||||
try:
|
||||
msg_conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)",
|
||||
(str(account_dir.name), 1),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for r in new_rows:
|
||||
try:
|
||||
rid = int(r.get("real_sender_id") or 0)
|
||||
except Exception:
|
||||
rid = 0
|
||||
su = str(r.get("sender_username") or "").strip()
|
||||
if rid <= 0 or not su:
|
||||
continue
|
||||
try:
|
||||
msg_conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)",
|
||||
(rid, su, 1),
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
if new_rows and (not name2id_synced):
|
||||
_best_effort_upsert_output_name2id_rows(
|
||||
msg_conn,
|
||||
account_name=account_dir.name,
|
||||
rows=new_rows,
|
||||
)
|
||||
|
||||
# Insert older -> newer to keep sqlite btree locality similar to existing data.
|
||||
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
|
||||
@@ -1658,6 +1829,30 @@ def _sync_chat_realtime_messages_for_table(
|
||||
msg_conn = sqlite3.connect(str(msg_db_path))
|
||||
msg_conn.row_factory = sqlite3.Row
|
||||
try:
|
||||
msg_db_path_real, _res_db_path_real = _resolve_db_storage_message_paths(account_dir, msg_db_path.stem)
|
||||
name2id_synced = False
|
||||
try:
|
||||
name2id_result = _sync_output_name2id_from_live(
|
||||
msg_conn,
|
||||
rt_conn=rt_conn,
|
||||
msg_db_path_real=msg_db_path_real,
|
||||
)
|
||||
name2id_synced = str(name2id_result.get("status") or "") in {"up_to_date", "refreshed"}
|
||||
logger.info(
|
||||
"[realtime] Name2Id sync account=%s db=%s status=%s rows=%s",
|
||||
account_dir.name,
|
||||
msg_db_path.stem,
|
||||
str(name2id_result.get("status") or ""),
|
||||
int(name2id_result.get("rows") or 0),
|
||||
)
|
||||
except Exception as e:
|
||||
logger.warning(
|
||||
"[realtime] Name2Id sync failed account=%s db=%s error=%s",
|
||||
account_dir.name,
|
||||
msg_db_path.stem,
|
||||
str(e),
|
||||
)
|
||||
|
||||
quoted_table = _quote_ident(table_name)
|
||||
row = msg_conn.execute(f"SELECT MAX(local_id) AS mx FROM {quoted_table}").fetchone()
|
||||
try:
|
||||
@@ -1796,40 +1991,12 @@ def _sync_chat_realtime_messages_for_table(
|
||||
|
||||
inserted = 0
|
||||
backfilled = 0
|
||||
if new_rows:
|
||||
try:
|
||||
has_name2id = bool(
|
||||
msg_conn.execute(
|
||||
"SELECT name FROM sqlite_master WHERE type='table' AND lower(name)=lower('Name2Id') LIMIT 1"
|
||||
).fetchone()
|
||||
)
|
||||
except Exception:
|
||||
has_name2id = False
|
||||
|
||||
if has_name2id:
|
||||
try:
|
||||
msg_conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(user_name, is_session) VALUES (?, ?)",
|
||||
(str(account_dir.name), 1),
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for r in new_rows:
|
||||
try:
|
||||
rid = int(r.get("real_sender_id") or 0)
|
||||
except Exception:
|
||||
rid = 0
|
||||
su = str(r.get("sender_username") or "").strip()
|
||||
if rid <= 0 or not su:
|
||||
continue
|
||||
try:
|
||||
msg_conn.execute(
|
||||
"INSERT OR IGNORE INTO Name2Id(rowid, user_name, is_session) VALUES (?, ?, ?)",
|
||||
(rid, su, 1),
|
||||
)
|
||||
except Exception:
|
||||
continue
|
||||
if new_rows and (not name2id_synced):
|
||||
_best_effort_upsert_output_name2id_rows(
|
||||
msg_conn,
|
||||
account_name=account_dir.name,
|
||||
rows=new_rows,
|
||||
)
|
||||
|
||||
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
|
||||
insert_t0 = time.perf_counter()
|
||||
|
||||
@@ -43,6 +43,29 @@ def _derive_mac_key(raw_key: bytes, salt: bytes) -> bytes:
|
||||
return hashlib.pbkdf2_hmac("sha512", raw_key, mac_salt, 2, dklen=KEY_SIZE)
|
||||
|
||||
|
||||
def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes:
|
||||
return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE)
|
||||
|
||||
|
||||
def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None:
|
||||
salt = page1[:SALT_SIZE]
|
||||
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
|
||||
|
||||
candidates = [
|
||||
("raw_enc_key", key_material, _derive_mac_key(key_material, salt)),
|
||||
]
|
||||
|
||||
derived_key = _derive_sqlcipher_enc_key(key_material, salt)
|
||||
candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt)))
|
||||
|
||||
for mode, enc_key, mac_key in candidates:
|
||||
expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1)
|
||||
if stored_page1_hmac == expected_page1_hmac:
|
||||
return enc_key, mac_key, mode
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes:
|
||||
offset = SALT_SIZE if page_num == 1 else 0
|
||||
data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE
|
||||
@@ -323,8 +346,9 @@ class WeChatDatabaseDecryptor:
|
||||
def decrypt_database(self, db_path: str, output_path: str) -> bool:
|
||||
"""解密微信4.x版本数据库
|
||||
|
||||
这里传入的 key 已经是从微信进程内存提取出的 raw enc_key,
|
||||
不是 SQLCipher 的口令,因此不能再做一轮 PBKDF2。
|
||||
兼容两种输入形态:
|
||||
- raw enc_key(部分内存扫描/工具直接返回)
|
||||
- SQLCipher 口令/基础 key(需先用数据库 salt 做一轮 PBKDF2)
|
||||
"""
|
||||
from .logging_config import get_logger
|
||||
logger = get_logger(__name__)
|
||||
@@ -370,15 +394,14 @@ class WeChatDatabaseDecryptor:
|
||||
tmp_output_path = ""
|
||||
return True
|
||||
|
||||
salt = page1[:SALT_SIZE]
|
||||
mac_key = _derive_mac_key(self.key_bytes, salt)
|
||||
expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1)
|
||||
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
|
||||
if stored_page1_hmac != expected_page1_hmac:
|
||||
resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1)
|
||||
if resolved_key_material is None:
|
||||
message = f"当前数据库密钥不正确,或该密钥不属于当前账号/当前设备: {db_path}"
|
||||
self._set_last_error("key_mismatch", message)
|
||||
logger.error(f"页面 1 HMAC验证失败,密钥与数据库不匹配: {db_path}")
|
||||
return False
|
||||
enc_key, mac_key, key_mode = resolved_key_material
|
||||
logger.info(f"页面 1 HMAC验证通过: mode={key_mode} path={db_path}")
|
||||
|
||||
total_pages = (file_size + PAGE_SIZE - 1) // PAGE_SIZE
|
||||
successful_pages = 0
|
||||
@@ -406,7 +429,7 @@ class WeChatDatabaseDecryptor:
|
||||
logger.error(f"页面 {page_num} HMAC验证失败,终止解密: {db_path}")
|
||||
return False
|
||||
|
||||
target.write(_decrypt_page(self.key_bytes, page, page_num))
|
||||
target.write(_decrypt_page(enc_key, page, page_num))
|
||||
successful_pages += 1
|
||||
|
||||
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 0 页")
|
||||
|
||||
@@ -0,0 +1,117 @@
|
||||
import hashlib
|
||||
import sqlite3
|
||||
import sys
|
||||
import threading
|
||||
import unittest
|
||||
from pathlib import Path
|
||||
from tempfile import TemporaryDirectory
|
||||
from unittest.mock import patch
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parents[1]
|
||||
sys.path.insert(0, str(ROOT / "src"))
|
||||
|
||||
from wechat_decrypt_tool.routers import chat as chat_router
|
||||
|
||||
|
||||
class _DummyConn:
|
||||
def __init__(self) -> None:
|
||||
self.handle = 1
|
||||
self.lock = threading.Lock()
|
||||
|
||||
|
||||
class TestChatRealtimeName2IdSync(unittest.TestCase):
|
||||
def test_sync_repairs_name2id_even_without_new_messages(self):
|
||||
with TemporaryDirectory() as td:
|
||||
account_dir = Path(td) / "acc"
|
||||
account_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
username = "wxid_friend"
|
||||
table_name = f"Msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
|
||||
msg_db_path = account_dir / "message_0.db"
|
||||
|
||||
conn = sqlite3.connect(str(msg_db_path))
|
||||
try:
|
||||
conn.execute("CREATE TABLE Name2Id (user_name TEXT, is_session INTEGER DEFAULT 1)")
|
||||
conn.execute(
|
||||
"""
|
||||
CREATE TABLE "{table_name}" (
|
||||
local_id INTEGER PRIMARY KEY,
|
||||
server_id INTEGER,
|
||||
local_type INTEGER,
|
||||
sort_seq INTEGER,
|
||||
real_sender_id INTEGER,
|
||||
create_time INTEGER,
|
||||
message_content TEXT,
|
||||
compress_content BLOB,
|
||||
packed_info_data BLOB
|
||||
)
|
||||
""".format(table_name=table_name)
|
||||
)
|
||||
conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (1, ?, 1)", ("acc",))
|
||||
conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (2, ?, 1)", ("wxid_old",))
|
||||
conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (5, ?, 1)", ("wxid_gap_tail",))
|
||||
conn.execute(
|
||||
f'INSERT INTO "{table_name}" '
|
||||
"(local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content, packed_info_data) "
|
||||
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
(10, 10010, 1, 10, 3, 1710000010, "hello", None, None),
|
||||
)
|
||||
conn.commit()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
live_rows = [
|
||||
{"rowid": 1, "user_name": "acc", "is_session": 1},
|
||||
{"rowid": 2, "user_name": "wxid_old", "is_session": 1},
|
||||
{"rowid": 3, "user_name": "wxid_missing_a", "is_session": 1},
|
||||
{"rowid": 4, "user_name": "wxid_missing_b", "is_session": 1},
|
||||
{"rowid": 5, "user_name": "wxid_gap_tail", "is_session": 1},
|
||||
]
|
||||
|
||||
def _fake_exec_query(_handle, *, kind, path, sql):
|
||||
self.assertEqual(kind, "message")
|
||||
self.assertTrue(str(path).endswith("message_0.db"))
|
||||
if "COUNT(1)" in sql:
|
||||
return [{"c": len(live_rows), "mx": 5}]
|
||||
if "ORDER BY rowid ASC" in sql:
|
||||
return list(live_rows)
|
||||
raise AssertionError(f"Unexpected SQL: {sql}")
|
||||
|
||||
with (
|
||||
patch.object(chat_router, "_resolve_db_storage_message_paths", return_value=(Path(td) / "live_message_0.db", Path(td) / "message_resource.db")),
|
||||
patch.object(chat_router, "_wcdb_exec_query", side_effect=_fake_exec_query),
|
||||
patch.object(chat_router, "_wcdb_get_messages", return_value=[]),
|
||||
):
|
||||
result = chat_router._sync_chat_realtime_messages_for_table(
|
||||
account_dir=account_dir,
|
||||
rt_conn=_DummyConn(),
|
||||
username=username,
|
||||
msg_db_path=msg_db_path,
|
||||
table_name=table_name,
|
||||
max_scan=50,
|
||||
backfill_limit=0,
|
||||
)
|
||||
|
||||
self.assertEqual(result.get("inserted"), 0)
|
||||
|
||||
conn = sqlite3.connect(str(msg_db_path))
|
||||
try:
|
||||
rows = conn.execute("SELECT rowid, user_name FROM Name2Id ORDER BY rowid ASC").fetchall()
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
self.assertEqual(
|
||||
rows,
|
||||
[
|
||||
(1, "acc"),
|
||||
(2, "wxid_old"),
|
||||
(3, "wxid_missing_a"),
|
||||
(4, "wxid_missing_b"),
|
||||
(5, "wxid_gap_tail"),
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -19,11 +19,22 @@ from wechat_decrypt_tool.wechat_decrypt import (
|
||||
SQLITE_HEADER,
|
||||
WeChatDatabaseDecryptor,
|
||||
_derive_mac_key,
|
||||
_derive_sqlcipher_enc_key,
|
||||
decrypt_wechat_databases,
|
||||
)
|
||||
|
||||
|
||||
def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes:
|
||||
def _encrypt_page(
|
||||
raw_key: bytes,
|
||||
plain_page: bytes,
|
||||
page_num: int,
|
||||
salt: bytes,
|
||||
iv: bytes,
|
||||
*,
|
||||
sqlcipher_passphrase: bool = False,
|
||||
) -> bytes:
|
||||
enc_key = _derive_sqlcipher_enc_key(raw_key, salt) if sqlcipher_passphrase else raw_key
|
||||
|
||||
if page_num == 1:
|
||||
encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE]
|
||||
prefix = salt
|
||||
@@ -32,7 +43,7 @@ def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes,
|
||||
prefix = b""
|
||||
|
||||
cipher = Cipher(
|
||||
algorithms.AES(raw_key),
|
||||
algorithms.AES(enc_key),
|
||||
modes.CBC(iv),
|
||||
backend=default_backend(),
|
||||
)
|
||||
@@ -40,7 +51,7 @@ def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes,
|
||||
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
|
||||
|
||||
page_without_hmac = prefix + encrypted + iv
|
||||
mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512)
|
||||
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
|
||||
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :])
|
||||
mac.update(page_num.to_bytes(4, "little"))
|
||||
return page_without_hmac + mac.digest()
|
||||
@@ -74,6 +85,39 @@ class WeChatDecryptRawKeyTests(unittest.TestCase):
|
||||
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
|
||||
self.assertEqual(dst.read_bytes(), page1 + page2)
|
||||
|
||||
def test_decrypt_database_falls_back_to_sqlcipher_passphrase_mode(self):
|
||||
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
|
||||
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
|
||||
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
|
||||
iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20")
|
||||
|
||||
page1 = _build_plain_page(0x41, first_page=True)
|
||||
page2 = _build_plain_page(0x42, first_page=False)
|
||||
encrypted_db = _encrypt_page(
|
||||
passphrase_key,
|
||||
page1,
|
||||
1,
|
||||
salt,
|
||||
iv1,
|
||||
sqlcipher_passphrase=True,
|
||||
) + _encrypt_page(
|
||||
passphrase_key,
|
||||
page2,
|
||||
2,
|
||||
salt,
|
||||
iv2,
|
||||
sqlcipher_passphrase=True,
|
||||
)
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
src = Path(tmpdir) / "source.db"
|
||||
dst = Path(tmpdir) / "out.db"
|
||||
src.write_bytes(encrypted_db)
|
||||
|
||||
decryptor = WeChatDatabaseDecryptor(passphrase_key.hex())
|
||||
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
|
||||
self.assertEqual(dst.read_bytes(), page1 + page2)
|
||||
|
||||
def test_decrypt_database_keeps_existing_output_on_hmac_failure(self):
|
||||
good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
|
||||
bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"
|
||||
|
||||
Reference in New Issue
Block a user