chore(tools): 添加解密与资源调试脚本

- 增加解密/资源/表情/媒体定位等调试脚本,便于本地排查与验证
This commit is contained in:
2977094657
2025-12-17 16:59:49 +08:00
parent 1583c28ebe
commit ebc68de8a8
10 changed files with 1274 additions and 0 deletions

145
tools/debug_decrypt_file.py Normal file
View File

@@ -0,0 +1,145 @@
#!/usr/bin/env python3
"""直接测试文件解密逻辑"""
import sys
sys.path.insert(0, "src")
import json
import struct
from pathlib import Path
# 测试参数
ACCOUNT_DIR = Path(r"d:\abc\PycharmProjects\WeChatDataAnalysis\output\databases\wxid_v4mbduwqtzpt22")
TEST_FILE = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a\msg\attach\0d6a4127daada32c5e407ae7201e785a\2025-12\Img\0923ad357c321cf286b794f8e5a66333.dat")
WXID_DIR = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a")
# ========== 1. 读取密钥 ==========
print("[1] 读取密钥文件")
keys_file = ACCOUNT_DIR / "_media_keys.json"
if keys_file.exists():
with open(keys_file, "r", encoding="utf-8") as f:
keys = json.load(f)
print(f" keys = {keys}")
xor_key = keys.get("xor")
aes_str = str(keys.get("aes") or "").strip()
aes_key16 = aes_str.encode("ascii", errors="ignore")[:16] if aes_str else b""
print(f" xor_key = {xor_key}")
print(f" aes_key16 = {aes_key16}")
else:
print(" [ERROR] 密钥文件不存在")
sys.exit(1)
# ========== 2. 读取测试文件 ==========
print(f"\n[2] 读取测试文件: {TEST_FILE}")
with open(TEST_FILE, "rb") as f:
data = f.read()
print(f" 文件大小: {len(data)} bytes")
print(f" 前 16 字节: {data[:16].hex()}")
# ========== 3. 检测版本 ==========
print("\n[3] 检测文件版本")
sig = data[:6]
if sig == b"\x07\x08V1\x08\x07":
version = 1
print(" 版本: V1")
elif sig == b"\x07\x08V2\x08\x07":
version = 2
print(" 版本: V2")
else:
version = 0
print(" 版本: V0 (纯 XOR)")
# ========== 4. 尝试解密 ==========
print("\n[4] 尝试解密")
from Crypto.Cipher import AES
from Crypto.Util import Padding
def decrypt_v4(data: bytes, xor_key: int, aes_key: bytes) -> bytes:
"""使用 api.py 相同的解密逻辑"""
header, rest = data[:0xF], data[0xF:]
print(f" 头部 (15 bytes): {header.hex()}")
signature, aes_size, xor_size = struct.unpack("<6sLLx", header)
print(f" signature: {signature}")
print(f" aes_size: {aes_size}")
print(f" xor_size: {xor_size}")
# 对齐到 AES 块大小
aes_size_aligned = aes_size + (AES.block_size - aes_size % AES.block_size) if aes_size % AES.block_size != 0 else aes_size
print(f" aes_size_aligned: {aes_size_aligned}")
aes_data = rest[:aes_size_aligned]
print(f" aes_data 长度: {len(aes_data)}")
print(f" aes_data 前 16 字节: {aes_data[:16].hex()}")
cipher = AES.new(aes_key[:16], AES.MODE_ECB)
decrypted_aes_raw = cipher.decrypt(aes_data)
print(f" 解密后 (带 padding) 前 16 字节: {decrypted_aes_raw[:16].hex()}")
try:
decrypted_data = Padding.unpad(decrypted_aes_raw, AES.block_size)
print(f" 去 padding 后长度: {len(decrypted_data)}")
except Exception as e:
print(f" [WARN] unpad 失败: {e}, 使用原始数据")
decrypted_data = decrypted_aes_raw
if xor_size > 0:
raw_data = rest[aes_size_aligned:-xor_size]
xor_data = rest[-xor_size:]
xored_data = bytes(b ^ xor_key for b in xor_data)
print(f" raw_data 长度: {len(raw_data)}")
print(f" xor_data 长度: {len(xor_data)}")
else:
raw_data = rest[aes_size_aligned:]
xored_data = b""
print(f" raw_data 长度: {len(raw_data)}")
result = decrypted_data + raw_data + xored_data
print(f" 最终结果长度: {len(result)}")
print(f" 结果前 16 字节: {result[:16].hex()}")
# 检查是否是有效图片
if result[:3] == b"\xff\xd8\xff":
print(" [OK] 解密成功! 是 JPEG 图片")
elif result[:8] == b"\x89PNG\r\n\x1a\n":
print(" [OK] 解密成功! 是 PNG 图片")
else:
print(" [WARN] 解密后不是有效图片头")
return result
if version == 2 and xor_key is not None and aes_key16:
print("\n[4.1] 使用本地 decrypt_v4 函数:")
decrypted = decrypt_v4(data, xor_key, aes_key16)
# 保存解密后的文件
output_file = Path("test_decrypted_manual.jpg")
with open(output_file, "wb") as f:
f.write(decrypted)
print(f" 已保存: {output_file} ({len(decrypted)} bytes)")
# 使用 WxDatDecrypt 的函数
print("\n[4.2] 使用 WxDatDecrypt 的 decrypt_dat_v4:")
sys.path.insert(0, "WxDatDecrypt")
from decrypt import decrypt_dat_v4 as wx_decrypt_v4
decrypted_wx = wx_decrypt_v4(TEST_FILE, xor_key, aes_key16)
print(f" 结果长度: {len(decrypted_wx)}")
print(f" 结果前 16 字节: {decrypted_wx[:16].hex()}")
if decrypted_wx[:3] == b"\xff\xd8\xff":
print(" [OK] 解密成功! 是 JPEG 图片")
elif decrypted_wx[:8] == b"\x89PNG\r\n\x1a\n":
print(" [OK] 解密成功! 是 PNG 图片")
else:
print(" [WARN] 解密后不是有效图片头")
output_file2 = Path("test_decrypted_wxdat.jpg")
with open(output_file2, "wb") as f:
f.write(decrypted_wx)
print(f" 已保存: {output_file2} ({len(decrypted_wx)} bytes)")
else:
print(" [ERROR] 无法解密: 缺少必要参数")
print("\n[Done]")

164
tools/debug_decrypt_keys.py Normal file
View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""调试媒体文件解密密钥检测"""
import sys
sys.path.insert(0, "src")
from pathlib import Path
from collections import Counter
import re
WXID_DIR = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a")
TEST_FILE = WXID_DIR / "msg" / "attach" / "0d6a4127daada32c5e407ae7201e785a" / "2025-12" / "Img" / "0923ad357c321cf286b794f8e5a66333.dat"
def extract_yyyymm_for_sort(p: Path) -> str:
m = re.search(r"(\d{4}-\d{2})", str(p))
return m.group(1) if m else "0000-00"
# ========== 检查测试文件 ==========
print(f"[1] 检查测试文件: {TEST_FILE}")
if TEST_FILE.exists():
with open(TEST_FILE, "rb") as f:
head = f.read(64)
print(f" 存在, 大小: {TEST_FILE.stat().st_size} bytes")
print(f" 前 16 字节: {head[:16].hex()}")
sig = head[:6]
if sig == b"\x07\x08V1\x08\x07":
print(" 版本: V1")
elif sig == b"\x07\x08V2\x08\x07":
print(" 版本: V2")
else:
print(" 版本: V0 (XOR only) 或未知")
else:
print(" [ERROR] 文件不存在")
# ========== 查找 _t.dat 模板文件 ==========
print(f"\n[2] 查找 _t.dat 模板文件")
try:
template_files = list(WXID_DIR.rglob("*_t.dat"))
print(f" 找到 {len(template_files)} 个模板文件")
template_files.sort(key=extract_yyyymm_for_sort, reverse=True)
for tf in template_files[:5]:
print(f" - {tf}")
except Exception as e:
print(f" [ERROR] {e}")
template_files = []
# ========== 计算 most_common_last2 ==========
print(f"\n[3] 计算模板文件末尾 2 字节的众数")
last_bytes_list = []
for file in template_files[:16]:
try:
with open(file, "rb") as f:
f.seek(-2, 2)
b2 = f.read(2)
if b2 and len(b2) == 2:
last_bytes_list.append(b2)
except Exception:
continue
if last_bytes_list:
most_common = Counter(last_bytes_list).most_common(1)[0][0]
print(f" 众数: {most_common.hex()} ({most_common})")
else:
most_common = None
print(" [ERROR] 没有有效的模板文件")
# ========== 计算 XOR key ==========
print(f"\n[4] 计算 XOR key")
if most_common and len(most_common) == 2:
x, y = most_common[0], most_common[1]
xor_key = x ^ 0xFF
check = y ^ 0xD9
print(f" x=0x{x:02x}, y=0x{y:02x}")
print(f" xor_key = x ^ 0xFF = 0x{xor_key:02x} ({xor_key})")
print(f" check = y ^ 0xD9 = 0x{check:02x} ({check})")
if xor_key == check:
print(f" [OK] XOR key 验证通过: {xor_key}")
else:
print(f" [ERROR] XOR key 验证失败")
xor_key = None
else:
xor_key = None
print(" [ERROR] 无法计算")
# ========== 查找 V2 密文 ==========
print(f"\n[5] 查找 V2 密文 (用于 AES key 提取)")
ciphertext = None
sig = b"\x07\x08V2\x08\x07"
for file in template_files:
try:
with open(file, "rb") as f:
if f.read(6) != sig:
continue
f.seek(-2, 2)
if most_common and f.read(2) != most_common:
continue
f.seek(0xF)
ct = f.read(16)
if ct and len(ct) == 16:
ciphertext = ct
print(f" 找到密文: {ct.hex()}")
print(f" 来自文件: {file}")
break
except Exception:
continue
if not ciphertext:
print(" [ERROR] 未找到 V2 密文")
# ========== 检查 pycryptodome ==========
print(f"\n[6] 检查 pycryptodome")
try:
from Crypto.Cipher import AES
print(" [OK] pycryptodome 已安装")
except ImportError:
print(" [ERROR] pycryptodome 未安装, 运行: uv add pycryptodome")
# ========== 尝试手动解密 ==========
print(f"\n[7] 尝试解密测试文件 (如果有 xor_key)")
if xor_key is not None and TEST_FILE.exists():
with open(TEST_FILE, "rb") as f:
data = f.read()
sig = data[:6]
print(f" 文件签名: {sig}")
if sig == b"\x07\x08V2\x08\x07":
print(" 这是 V2 文件, 需要 AES key")
# 检查是否可以从内存提取 AES key
try:
import psutil
print(" psutil 已安装")
# 查找微信进程
weixin_pid = None
for p in psutil.process_iter(["name"]):
name = (p.info.get("name") or "").lower()
if name in {"weixin.exe", "wechat.exe"}:
weixin_pid = p.pid
break
if weixin_pid:
print(f" 找到微信进程: PID={weixin_pid}")
print(" 需要从进程内存提取 AES key (需要管理员权限)")
else:
print(" [WARN] 未找到微信进程, 无法自动提取 AES key")
print(" 请确保微信正在运行")
except ImportError:
print(" [ERROR] psutil 未安装")
elif sig == b"\x07\x08V1\x08\x07":
print(" 这是 V1 文件, 尝试使用 xor_key + 固定 AES key 解密")
else:
print(" 这是 V0 文件, 尝试纯 XOR 解密")
decrypted = bytes(b ^ xor_key for b in data)
# 检查解密后的魔数
if decrypted[:3] == b"\xff\xd8\xff":
print(" [OK] 解密成功! 是 JPEG 图片")
elif decrypted[:8] == b"\x89PNG\r\n\x1a\n":
print(" [OK] 解密成功! 是 PNG 图片")
else:
print(f" 解密后前 16 字节: {decrypted[:16].hex()}")
print(" [WARN] 解密后不是有效图片")
print("\n[Done]")

View File

@@ -0,0 +1,66 @@
#!/usr/bin/env python3
"""调试表情消息内容"""
import sqlite3
from pathlib import Path
db_path = Path(r'd:\abc\PycharmProjects\WeChatDataAnalysis\output\databases\wxid_v4mbduwqtzpt22')
msg_dbs = list(db_path.glob('message_*.db'))
print(f'Found {len(msg_dbs)} message databases')
for db in msg_dbs[:1]:
print(f'\nDatabase: {db.name}')
conn = sqlite3.connect(str(db))
conn.row_factory = sqlite3.Row
# 先查看表结构
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
print(f'Tables: {[t[0] for t in tables]}')
# 找到消息表
for t in tables:
tname = t[0]
if 'msg' in tname.lower():
# 查看列名
cols = conn.execute(f"PRAGMA table_info({tname})").fetchall()
col_names = [c[1] for c in cols]
print(f'Table {tname} columns: {col_names}')
# 查找 type=47 的消息
type_col = 'local_type' if 'local_type' in col_names else 'type'
content_col = 'message_content' if 'message_content' in col_names else 'content'
compress_col = 'compress_content' if 'compress_content' in col_names else None
query = f"SELECT * FROM {tname} WHERE {type_col} = 47 LIMIT 3"
try:
rows = conn.execute(query).fetchall()
print(f'Found {len(rows)} emoji messages')
import zstandard as zstd
for r in rows:
d = dict(r)
content = d.get('message_content') or d.get('content') or b''
# 尝试解压 message_content
if isinstance(content, bytes) and content.startswith(b'\x28\xb5\x2f\xfd'):
try:
dctx = zstd.ZstdDecompressor()
content = dctx.decompress(content).decode('utf-8', errors='replace')
except Exception as e:
print(f' zstd decompress message_content failed: {e}')
print(f' Decompressed content (first 800):')
print(f' {str(content)[:800]}')
# 提取 md5 和 cdnurl
import re
md5_match = re.search(r'md5="([^"]+)"', str(content))
cdnurl_match = re.search(r'cdnurl="([^"]+)"', str(content))
thumburl_match = re.search(r'thumburl="([^"]+)"', str(content))
print(f' md5: {md5_match.group(1) if md5_match else "NOT FOUND"}')
print(f' cdnurl: {cdnurl_match.group(1)[:80] if cdnurl_match else "NOT FOUND"}')
print(f' thumburl: {thumburl_match.group(1)[:80] if thumburl_match else "NOT FOUND"}')
break
except Exception as e:
print(f'Query failed: {e}')
conn.close()

View File

@@ -0,0 +1,74 @@
#!/usr/bin/env python3
"""调试图片查找"""
import sqlite3
from pathlib import Path
account = 'wxid_v4mbduwqtzpt22'
md5 = '8753fcd3b1f8c4470b53551e13c5fbc1'
db_dir = Path(r'd:\abc\PycharmProjects\WeChatDataAnalysis\output\databases') / account
hardlink_db = db_dir / 'hardlink.db'
print(f'Hardlink DB exists: {hardlink_db.exists()}')
if hardlink_db.exists():
conn = sqlite3.connect(str(hardlink_db))
conn.row_factory = sqlite3.Row
# List tables
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
print(f'Tables: {[t[0] for t in tables]}')
# Find image hardlink table
for t in tables:
tname = t[0]
if 'image' in tname.lower() and 'hardlink' in tname.lower():
print(f'\nChecking table: {tname}')
cols = conn.execute(f"PRAGMA table_info({tname})").fetchall()
print(f'Columns: {[c[1] for c in cols]}')
# Search for the md5
row = conn.execute(f"SELECT * FROM [{tname}] WHERE md5 = ? LIMIT 1", (md5,)).fetchone()
if row:
print(f'Found: {dict(row)}')
dir1 = row['dir1']
dir2 = row['dir2']
file_name = row['file_name']
# Check dir2id table structure
dir2id_cols = conn.execute("PRAGMA table_info(dir2id)").fetchall()
print(f'dir2id columns: {[c[1] for c in dir2id_cols]}')
# Get sample from dir2id
dir2id_sample = conn.execute("SELECT * FROM dir2id LIMIT 3").fetchall()
print(f'dir2id sample: {[dict(r) for r in dir2id_sample]}')
# Try to find matching dir2 value using rowid
dir2id_row = conn.execute("SELECT rowid, username FROM dir2id WHERE rowid = ? LIMIT 1", (dir2,)).fetchone()
print(f'dir2id lookup for rowid={dir2}: {dict(dir2id_row) if dir2id_row else "NOT FOUND"}')
# Try to construct the path
weixin_root = Path(r'D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a')
if dir2id_row:
dir_name = dir2id_row['username'] # In WeChat 4.x, username column is the folder name
else:
dir_name = str(dir2)
possible_path = weixin_root / str(dir1) / dir_name / file_name
print(f'Possible path: {possible_path}')
print(f'Path exists: {possible_path.exists()}')
# Also try _h.dat variant
h_path = possible_path.with_name(possible_path.stem + '_h.dat')
print(f'_h.dat path: {h_path}')
print(f'_h.dat exists: {h_path.exists()}')
else:
print(f'MD5 {md5} not found in {tname}')
# Show sample data
sample = conn.execute(f"SELECT md5, dir1, dir2, file_name FROM [{tname}] LIMIT 3").fetchall()
print(f'Sample data:')
for s in sample:
print(f' md5={s[0]}, dir1={s[1]}, dir2={s[2]}, file_name={s[3]}')
conn.close()

159
tools/debug_media_lookup.py Normal file
View File

@@ -0,0 +1,159 @@
#!/usr/bin/env python3
"""调试媒体文件查找逻辑"""
import sqlite3
from pathlib import Path
# ========== 配置 ==========
ACCOUNT = "wxid_v4mbduwqtzpt22"
MD5 = "0923ad357c321cf286b794f8e5a66333"
USERNAME = "wxid_qmzc7q0xfm0j22"
REPO_ROOT = Path(__file__).resolve().parents[1]
OUTPUT_DB_DIR = REPO_ROOT / "output" / "databases" / ACCOUNT
# ========== 读取 _source.json ==========
import json
source_json = OUTPUT_DB_DIR / "_source.json"
print(f"[1] 检查 _source.json: {source_json}")
if source_json.exists():
with open(source_json, "r", encoding="utf-8") as f:
source = json.load(f)
wxid_dir = source.get("wxid_dir", "")
db_storage_path = source.get("db_storage_path", "")
print(f" wxid_dir: {wxid_dir}")
print(f" db_storage_path: {db_storage_path}")
else:
print(" [ERROR] _source.json 不存在!")
wxid_dir = ""
db_storage_path = ""
# ========== 检查 hardlink.db ==========
hardlink_db = OUTPUT_DB_DIR / "hardlink.db"
print(f"\n[2] 检查 hardlink.db: {hardlink_db}")
rows = []
dir2id_map = {}
if not hardlink_db.exists():
print(" [ERROR] hardlink.db 不存在!")
else:
print(" [OK] 文件存在")
conn = sqlite3.connect(str(hardlink_db))
# 先列出所有表
print(f"\n[2.1] 列出所有表:")
tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
for t in tables:
print(f" - {t[0]}")
# 列出表的列
cols = conn.execute(f"PRAGMA table_info({t[0]})").fetchall()
col_names = [c[1] for c in cols]
print(f" 列: {col_names}")
# 尝试不同的表名查询
print(f"\n[3] 查询 hardlink 表 (md5={MD5})")
possible_tables = ["image_hardlink_info", "HardLinkImageAttribute", "HardLinkImageAttribute2"]
for tbl in possible_tables:
try:
# 先检查表是否存在
exists = conn.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (tbl,)).fetchone()
if not exists:
continue
print(f" 尝试表: {tbl}")
# 获取列名
cols = conn.execute(f"PRAGMA table_info({tbl})").fetchall()
col_names = [c[1] for c in cols]
print(f" 列: {col_names}")
# 查询 md5
if "Md5" in col_names:
rows = conn.execute(f"SELECT * FROM {tbl} WHERE Md5 = ? LIMIT 5", (MD5,)).fetchall()
elif "md5" in col_names:
rows = conn.execute(f"SELECT * FROM {tbl} WHERE md5 = ? LIMIT 5", (MD5,)).fetchall()
else:
print(f" [WARN] 没有 md5 列")
continue
if rows:
print(f" 找到 {len(rows)} 条记录:")
for i, row in enumerate(rows):
print(f" [{i}] {dict(zip(col_names, row))}")
else:
print(f" [WARN] 没有匹配记录")
except Exception as e:
print(f" [ERROR] 查询 {tbl} 失败: {e}")
# 查询 dir2id 映射
print(f"\n[4] 查询 dir2id 表")
try:
# 先检查列名
cols = conn.execute("PRAGMA table_info(dir2id)").fetchall()
col_names = [c[1] for c in cols]
print(f" 列: {col_names}")
dir2id_rows = conn.execute("SELECT * FROM dir2id LIMIT 10").fetchall()
print(f"{len(dir2id_rows)} 条(最多显示10条):")
for row in dir2id_rows:
print(f" {dict(zip(col_names, row))}")
# 构建映射
if len(col_names) >= 2:
dir2id_map = {row[0]: row[1] for row in dir2id_rows}
except Exception as e:
print(f" [ERROR] 查询失败: {e}")
dir2id_map = {}
conn.close()
# ========== 尝试拼接路径并检查文件是否存在 ==========
print(f"\n[5] 尝试拼接路径并检查文件")
if wxid_dir and rows:
wxid_path = Path(wxid_dir)
for i, row in enumerate(rows):
dir1, dir2, file_name, _ = row
dir_name = dir2id_map.get(dir2, str(dir2))
# 尝试多个根目录
roots = [
wxid_path,
wxid_path / "msg" / "attach",
wxid_path / "msg" / "file",
wxid_path / "msg" / "video",
wxid_path / "cache",
]
for root in roots:
candidate = root / dir1 / dir_name / file_name
exists = candidate.exists()
print(f" [{i}] {candidate}")
print(f" 存在: {exists}")
if exists:
print(f" [FOUND!] 大小: {candidate.stat().st_size} bytes")
# ========== 直接搜索 md5 文件 ==========
print(f"\n[6] 直接在 wxid_dir 下搜索 md5 文件")
if wxid_dir:
wxid_path = Path(wxid_dir)
search_dirs = [
wxid_path / "msg" / "attach",
wxid_path / "msg" / "file",
wxid_path / "msg" / "video",
wxid_path / "cache",
]
patterns = [f"{MD5}*.dat", f"{MD5}*.jpg", f"{MD5}*.png"]
found_any = False
for d in search_dirs:
if not d.exists():
print(f" [SKIP] {d} 不存在")
continue
for pat in patterns:
try:
matches = list(d.rglob(pat))
for m in matches:
print(f" [FOUND] {m} ({m.stat().st_size} bytes)")
found_any = True
except Exception as e:
print(f" [ERROR] 搜索 {d}/{pat} 失败: {e}")
if not found_any:
print(" [WARN] 没有找到任何匹配文件")
print("\n[Done]")

View File

@@ -0,0 +1,30 @@
#!/usr/bin/env python3
"""调试消息类型返回值"""
import requests
resp = requests.get('http://localhost:8000/api/chat/messages', params={
'account': 'wxid_v4mbduwqtzpt22',
'username': 'wxid_qmzc7q0xfm0j22',
'limit': 100
})
data = resp.json()
messages = data.get('messages', [])
# 找出不同类型的消息
types_found = {}
for m in messages:
rt = m.get('renderType', 'text')
if rt not in types_found:
types_found[rt] = m
print('找到的消息类型:')
for rt, m in types_found.items():
content = str(m.get('content') or '')[:50]
print(f" {rt}: type={m.get('type')}, content={content}")
if rt == 'emoji':
print(f" emojiMd5={m.get('emojiMd5')}")
print(f" emojiUrl={m.get('emojiUrl')}")
if rt == 'image':
print(f" imageMd5={m.get('imageMd5')}")
print(f" imageUrl={str(m.get('imageUrl') or '')[:80]}")

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
导出微信数据库分析结果为 JSON
- 基于 analyze_wechat_databases.WeChatDatabaseAnalyzer
- 联合 wechat_db_config.json含 ohmywechat 常见类型与启发式)补全字段含义
- 生成汇总 JSON 与按库拆分的 JSON 文件
用法:
python tools/export_database_schema_json.py \
--databases-path output/databases \
--output-dir output/schema_json \
--config wechat_db_config.json
"""
from __future__ import annotations
import argparse
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict
import sys
# 项目根目录
ROOT = Path(__file__).resolve().parents[1]
# 确保能导入项目根目录下的 analyze_wechat_databases.py
if str(ROOT) not in sys.path:
sys.path.insert(0, str(ROOT))
def export_analysis(databases_path: Path, output_dir: Path, config_file: Path) -> int:
# 延迟导入分析器
from analyze_wechat_databases import WeChatDatabaseAnalyzer
output_dir.mkdir(parents=True, exist_ok=True)
analyzer = WeChatDatabaseAnalyzer(databases_path=str(databases_path), config_file=str(config_file))
results = analyzer.analyze_all_databases() # dict[db_name] = db_info
meta = {
"generated_time": datetime.now().isoformat(),
"source": "analyze_wechat_databases.py",
"config_used": str(config_file),
"databases_root": str(databases_path),
"note": "字段含义来自 wechat_db_config.json 与启发式推断(结合 ohmywechat 常见类型)",
}
combined: Dict[str, Any] = {"_metadata": meta, "databases": {}}
count_dbs = 0
for db_name, db_info in results.items():
count_dbs += 1
db_out: Dict[str, Any] = {
"database_name": db_info.get("database_name", db_name),
"database_path": db_info.get("database_path"),
"database_size": db_info.get("database_size"),
"description": db_info.get("description"),
"table_count": db_info.get("table_count"),
"tables": {},
}
tables = db_info.get("tables", {})
for table_name, table in tables.items():
# 列增强:补充 meaning
cols_out = []
for col in table.get("columns", []):
name = col.get("name")
meaning = analyzer.get_field_meaning(name, table_name) if name else ""
cols_out.append({
"name": name,
"type": col.get("type"),
"notnull": col.get("notnull"),
"default": col.get("dflt_value"),
"pk": col.get("pk"),
"meaning": meaning,
})
tbl_out = {
"row_count": table.get("row_count", 0),
"columns": cols_out,
"indexes": table.get("indexes", []),
"foreign_keys": table.get("foreign_keys", []),
"create_sql": table.get("create_sql"),
"sample_data": table.get("sample_data", []),
# 相似组标记(如 Msg_* 合并)
"is_representative": table.get("is_representative", False),
"similar_group": table.get("similar_group", {}),
}
db_out["tables"][table_name] = tbl_out
# 写入单库 JSON
single_path = output_dir / f"{db_name}.schema.json"
with single_path.open("w", encoding="utf-8") as f:
json.dump(db_out, f, ensure_ascii=False, indent=2)
combined["databases"][db_name] = db_out
print(f"[OK] 写出数据库JSON: {single_path.name}")
# 汇总文件
combined_path = output_dir / "all_databases.schema.json"
with combined_path.open("w", encoding="utf-8") as f:
json.dump(combined, f, ensure_ascii=False, indent=2)
print(f"[OK] 汇总JSON: {combined_path} (数据库数: {count_dbs}")
return count_dbs
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--databases-path", default=str(ROOT / "output" / "databases"),
help="解密后的数据库根目录(按账号分目录)")
parser.add_argument("--output-dir", default=str(ROOT / "output" / "schema_json"),
help="JSON 输出目录")
parser.add_argument("--config", default=str(ROOT / "wechat_db_config.json"),
help="字段含义配置 JSON由 tools/generate_wechat_db_config.py 生成)")
args = parser.parse_args()
db_root = Path(args.databases_path)
out_dir = Path(args.output_dir)
cfg = Path(args.config)
if not cfg.exists():
raise FileNotFoundError(f"未找到配置文件: {cfg},请先运行 tools/generate_wechat_db_config.py")
if not db_root.exists():
print(f"[WARN] 数据库目录不存在: {db_root},仍将生成空汇总文件。")
count = export_analysis(db_root, out_dir, cfg)
if count == 0:
print("[INFO] 未检测到可分析数据库(可先运行解密流程或确认路径)")
if __name__ == "__main__":
main()

100
tools/extract_media_keys.py Normal file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
提取微信 4.x 媒体解密密钥 (需要管理员权限运行)
用法:
1. 确保微信正在运行
2. 以管理员身份运行 PowerShell
3. cd 到项目目录
4. 运行: uv run python tools/extract_media_keys.py
"""
import sys
sys.path.insert(0, "src")
sys.path.insert(0, "WxDatDecrypt")
import json
from pathlib import Path
try:
from key import find_key
except ImportError as e:
print(f"[ERROR] 无法导入 WxDatDecrypt: {e}")
print("请确保 pymem, yara-python, pycryptodome 已安装")
sys.exit(1)
# ========== 配置 ==========
REPO_ROOT = Path(__file__).resolve().parents[1]
OUTPUT_DB_DIR = REPO_ROOT / "output" / "databases"
def main():
print("=" * 60)
print("微信 4.x 媒体解密密钥提取工具")
print("=" * 60)
# 1. 列出所有账号
print("\n[1] 列出已解密账号...")
if not OUTPUT_DB_DIR.exists():
print("[ERROR] output/databases 目录不存在")
sys.exit(1)
accounts = []
for p in OUTPUT_DB_DIR.iterdir():
if p.is_dir() and (p / "_source.json").exists():
accounts.append(p.name)
if not accounts:
print("[ERROR] 没有找到已解密的账号")
sys.exit(1)
print(f" 找到 {len(accounts)} 个账号")
# 2. 处理每个账号
for account in accounts:
print(f"\n[2] 处理账号: {account}")
account_dir = OUTPUT_DB_DIR / account
# 读取 _source.json
source_json = account_dir / "_source.json"
with open(source_json, "r", encoding="utf-8") as f:
source = json.load(f)
wxid_dir_str = source.get("wxid_dir", "")
if not wxid_dir_str:
print(" [SKIP] 没有 wxid_dir")
continue
wxid_dir = Path(wxid_dir_str)
if not wxid_dir.exists():
print(f" [SKIP] wxid_dir 不存在: {wxid_dir}")
continue
# 使用 WxDatDecrypt 的 find_key 函数
print(f" wxid_dir: {wxid_dir}")
print(" 正在提取密钥 (需要微信正在运行且有管理员权限)...")
try:
xor_key, aes_key = find_key(wxid_dir, version=4)
# 保存到 _media_keys.json
keys_file = account_dir / "_media_keys.json"
keys_data = {
"xor": xor_key,
"aes": aes_key.decode("ascii") if isinstance(aes_key, bytes) else str(aes_key),
}
with open(keys_file, "w", encoding="utf-8") as f:
json.dump(keys_data, f, indent=2)
print(f" [OK] 密钥已保存到: {keys_file}")
print(f" XOR key: {xor_key}")
print(f" AES key: {keys_data['aes']}")
except Exception as e:
print(f" [ERROR] 提取失败: {e}")
print("\n" + "=" * 60)
print("完成!请重启后端服务以使密钥生效。")
print("=" * 60)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,381 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
生成 wechat_db_config.json:
- 读取 wechat_db_config_template.json
- 融合本项目 analyze_wechat_databases 的启发式 + ohmywechat 常见字段/消息类型
- 批量为每个表字段补全中文含义,并写出 wechat_db_config.json
"""
from __future__ import annotations
import json
import re
from pathlib import Path
from datetime import datetime
ROOT = Path(__file__).resolve().parents[1]
TEMPLATE_PATH = ROOT / "wechat_db_config_template.json"
OUTPUT_MAIN = ROOT / "wechat_db_config.json"
OUTPUT_DIR = ROOT / "output" / "configs"
OUTPUT_COPY = OUTPUT_DIR / "wechat_db_config.generated.json"
# 尝试导入分析器以复用其启发式
AnalyzerCls = None
try:
from analyze_wechat_databases import WeChatDatabaseAnalyzer # type: ignore
AnalyzerCls = WeChatDatabaseAnalyzer
except Exception:
AnalyzerCls = None
def build_db_descriptions() -> dict[str, str]:
return {
"message": "聊天记录核心数据库",
"message_3": "聊天消息分表数据库(示例或分片)",
"message_fts": "聊天消息全文索引数据库FTS",
"message_resource": "消息资源索引数据库(图片/文件/视频等)",
"contact": "联系人数据库(好友/群/公众号基础信息)",
"session": "会话数据库(会话列表与未读统计)",
"sns": "朋友圈数据库(动态与互动)",
"favorite": "收藏数据库",
"emoticon": "表情包数据库",
"head_image": "头像数据数据库",
"hardlink": "硬链接索引数据库(资源去重/快速定位)",
"media_0": "媒体数据数据库含语音SILK等",
"unspportmsg": "不支持消息数据库(客户端不支持的消息类型)",
"general": "通用/系统数据库(新消息通知/支付等)",
}
def build_message_types_from_ohmywechat() -> dict[str, str]:
"""
参考 ohmywechat 等资料补充 PC/公众号常见 local_type → 含义
使用 (Type,SubType) 形式的字符串键;子类型未知时置 0
"""
return {
"1,0": "文本消息",
"3,0": "图片消息",
"34,0": "语音消息",
"42,0": "名片消息",
"43,0": "视频消息",
"47,0": "动画表情",
"48,0": "位置消息",
"244813135921,0": "引用消息",
"17179869233,0": "卡片式链接(带描述)",
"21474836529,0": "卡片式链接/图文消息公众号mmreader XML",
"154618822705,0": "小程序分享",
"12884901937,0": "音乐卡片",
"8594229559345,0": "红包卡片",
"81604378673,0": "聊天记录合并转发消息",
"266287972401,0": "拍一拍消息",
"8589934592049,0": "转账卡片",
"270582939697,0": "视频号直播卡片",
"25769803825,0": "文件消息",
"10000,0": "系统消息(撤回/入群提示等)",
}
KNOWN_FIELD_MEANINGS = {
# 通用主键/标识
"id": "标识符字段(主键/索引)",
"local_id": "本地自增ID主键/定位用)",
"server_id": "服务器消息ID唯一且全局递增",
"svr_id": "服务器消息ID同server_id",
"message_id": "消息ID表内主键或消息级索引",
"resource_id": "资源ID资源明细主键",
"history_id": "历史消息ID系统消息/历史消息关联键)",
# 会话/用户/群聊
"username": "用户名/会话标识wxid_xxx 或 xxx@chatroom",
"user_name": "用户名/会话标识wxid_xxx 或 xxx@chatroom",
"sender_id": "发送者内部ID与Name2Id映射",
"real_sender_id": "真实发送者ID群聊内消息具体成员",
"chat_id": "会话内部ID与ChatName2Id映射",
"chat_name_id": "会话内部ID与ChatName2Id映射",
"session_id": "会话IDFTS/资源维度的会话映射)",
"session_name": "会话名username 文本值)",
"session_name_id": "会话内部IDusername 的数值映射)",
"talker_id": "会话/房间IDName2Id 对照)",
# 消息结构/状态
"local_type": "本地消息类型local_type",
"type": "类型标识(上下文相关:消息/表情/配置)",
"sub_type": "子类型标识(同一主类型细分)",
"status": "状态标志位(发送/接收/已读/撤回等)",
"upload_status": "上传状态(媒体/资源上行状态)",
"download_status": "下载状态(媒体/资源下行状态)",
"server_seq": "服务器序列号(消息顺序校验)",
"origin_source": "消息来源标识(客户端/转发/系统)",
"source": "来源附加信息XML/JSON 等)",
"msg_status": "消息状态(扩展)",
# 消息内容
"message_content": "消息内容部分类型为zstd压缩的XMLmmreader",
"compress_content": "压缩内容多见zstd可能存放富文本XML",
"packed_info_data": "打包扩展信息(二进制,消息元数据)",
"packed_info": "打包扩展信息(二进制/文本混合)",
"data_index": "数据分片/索引(媒体片段定位)",
# 时间
"create_time": "创建时间Unix时间戳",
"last_update_time": "最后更新时间Unix时间戳",
"last_modified_time": "最后修改时间Unix时间戳",
"update_time": "更新时间Unix时间戳",
"invalid_time": "失效时间Unix时间戳",
"access_time": "访问时间Unix时间戳",
"last_timestamp": "最后消息时间(会话)",
"sort_timestamp": "排序时间(会话排序)",
"timestamp": "时间戳Unix时间戳",
# 排序/去重
"sort_seq": "排序序列(单会话内消息排序/去重)",
"server_seq_": "服务器序列号(扩展)",
# 联系人/群聊
"alias": "别名(用户自定义标识)",
"encrypt_username": "加密用户名",
"flag": "标志位(多用途:联系人/公众号/配置)",
"delete_flag": "删除标志(软删除)",
"verify_flag": "认证标志(公众号/企业认证等)",
"remark": "备注名",
"remark_quan_pin": "备注名全拼",
"remark_pin_yin_initial": "备注名拼音首字母",
"nick_name": "昵称",
"pin_yin_initial": "昵称拼音首字母",
"quan_pin": "昵称全拼",
"description": "描述/个性签名/备注",
"extra_buffer": "扩展缓冲区(二进制/序列化)",
"ext_buffer": "扩展缓冲区(二进制/序列化)",
"ext_buffer_": "扩展缓冲区(二进制/序列化)",
"chat_room_type": "群类型标志",
"owner": "群主 username",
# 头像/媒体
"big_head_url": "头像大图URL",
"small_head_url": "头像小图URL",
"head_img_md5": "头像MD5",
"image_buffer": "头像二进制数据",
"voice_data": "语音二进制数据多为SILK",
# FTS / 内部表
"acontent": "FTS检索内容分词后文本",
"block": "FTS内部块数据二进制",
"segid": "FTS分段ID",
"term": "FTS分词条目",
"pgno": "FTS页号",
"c0": "FTS列c0内部结构",
"c1": "FTS列c1内部结构",
"c2": "FTS列c2内部结构",
"c3": "FTS列c3内部结构",
"c4": "FTS列c4内部结构",
"c5": "FTS列c5内部结构",
"c6": "FTS列c6内部结构",
"sz": "FTS文档大小信息",
"_rowid_": "SQLite内部行ID",
# 资源/硬链接
"md5": "资源MD5",
"md5_hash": "MD5哈希整数映射快速索引",
"file_name": "文件名(相对/逻辑名)",
"file_size": "文件大小(字节)",
"dir1": "资源路径一级目录编号(分桶)",
"dir2": "资源路径二级目录编号(分桶)",
"modify_time": "文件修改时间戳",
# 会话统计
"unread_count": "未读计数",
"unread_first_msg_srv_id": "会话未读区间首个消息SvrID",
"is_hidden": "会话隐藏标志",
"summary": "会话摘要(最近消息摘要)",
"draft": "草稿内容",
"status_": "状态/标志(上下文)",
"last_clear_unread_timestamp": "上次清空未读时间",
"last_msg_locald_id": "最后一条消息的本地ID拼写原样保留",
"last_msg_type": "最后一条消息类型",
"last_msg_sub_type": "最后一条消息子类型",
"last_msg_sender": "最后一条消息发送者username",
"last_sender_display_name": "最后一条消息发送者显示名",
"last_msg_ext_type": "最后一条消息扩展类型",
# WCDB 压缩控制
"WCDB_CT_message_content": "WCDB压缩标记message_content列",
"WCDB_CT_source": "WCDB压缩标记source列",
}
def simple_heuristic(field_name: str, table_name: str) -> str:
"""简易兜底启发式,避免完全空白"""
f = field_name.lower()
t = table_name.lower()
if f.endswith("id") or f in {"_rowid_", "rowid"} or f == "id":
return "标识符字段"
if "time" in f or "timestamp" in f:
return "时间戳字段"
if f in {"name", "user_name", "username"}:
return "用户名/会话名"
if f in {"content", "message_content", "compress_content"}:
return "内容/正文字段"
if "md5" in f:
return "MD5哈希字段"
if "status" in f:
return "状态位/状态码"
if f.startswith("is_"):
return "布尔标志字段"
if f.startswith("wcdb_ct_"):
return "WCDB压缩控制字段"
if "buf" in f or "buffer" in f or "blob" in f:
return "二进制缓冲数据"
if "url" in f:
return "URL链接"
if "size" in f or "count" in f:
return "数量/大小字段"
if "seq" in f:
return "序列号/排序字段"
# 针对 Msg_* 常见列
if t.startswith("msg_"):
if f == "source":
return "消息来源附加信息XML/JSON"
if f == "local_type":
return "本地消息类型local_type"
return "未知用途字段"
def compute_field_meaning(analyzer, table_name: str, field_name: str) -> str:
# 优先精确已知映射
if field_name in KNOWN_FIELD_MEANINGS:
return KNOWN_FIELD_MEANINGS[field_name]
lf = field_name.lower()
if lf in KNOWN_FIELD_MEANINGS:
return KNOWN_FIELD_MEANINGS[lf]
# 额外针对 mmreader/zstd 提示
if lf in {"message_content", "compress_content"}:
return "消息内容部分类型为zstd压缩XMLmmreader"
# 借用项目内启发式
if analyzer is not None:
try:
return analyzer.get_field_meaning(field_name, table_name)
except Exception:
pass
# 简易兜底
return simple_heuristic(field_name, table_name)
def guess_table_desc(analyzer, table_name: str) -> str:
if analyzer is not None:
try:
return analyzer.guess_table_function(table_name)
except Exception:
pass
# 简易猜测
tl = table_name.lower()
if tl == "msg" or tl.startswith("msg_"):
return "某会话的消息表(聊天消息数据)"
if "name2id" in tl:
return "用户名到内部ID映射表"
if "contact" in tl:
return "联系人/群聊信息表"
if "session" in tl:
return "会话信息/未读统计表"
if "fts" in tl:
return "全文检索FTS内部表"
if "resource" in tl:
return "消息资源/附件索引表"
return "未知功能表"
def fill_config(template: dict) -> dict:
# 创建一个分析器实例,仅用于启发式(使用默认配置)
analyzer = None
if AnalyzerCls is not None:
try:
analyzer = AnalyzerCls(databases_path=str(ROOT / "output" / "databases"),
config_file="nonexistent_config.json")
except Exception:
analyzer = None
# 数据库描述补齐
db_desc_map = build_db_descriptions()
databases = template.get("databases", {})
for db_name, db in databases.items():
if isinstance(db, dict):
# 数据库级描述
if not db.get("description"):
# 用已知映射或尝试推断
db["description"] = db_desc_map.get(db_name, db.get("description", "")) or "未知用途数据库"
# 遍历表
tables = db.get("tables", {})
for table_name, table in tables.items():
if not isinstance(table, dict):
continue
# 表功能描述
if not table.get("description"):
table["description"] = guess_table_desc(analyzer, table_name)
# 字段含义补齐
fields = table.get("fields", {})
if isinstance(fields, dict):
for field_name, field_meta in fields.items():
if not isinstance(field_meta, dict):
continue
meaning = field_meta.get("meaning", "")
if not meaning:
field_meta["meaning"] = compute_field_meaning(analyzer, table_name, field_name)
# 消息类型映射补充(保留模板 instructional 字段,另外插入真实映射键)
mt_real = build_message_types_from_ohmywechat()
message_types = template.get("message_types", {})
# 合并:新增真实键
for k, v in mt_real.items():
message_types[k] = v
template["message_types"] = message_types
# 元数据刷新
meta = template.get("_metadata", {})
meta["version"] = "1.1"
meta["generated_time"] = datetime.now().isoformat()
meta["description"] = "微信数据库字段配置由模板自动补全融合启发式与ohmywechat常见类型"
template["_metadata"] = meta
return template
def main():
if not TEMPLATE_PATH.exists():
raise FileNotFoundError(f"Template not found: {TEMPLATE_PATH}")
with TEMPLATE_PATH.open("r", encoding="utf-8") as f:
template = json.load(f)
filled = fill_config(template)
# 写主配置(供分析器默认加载)
with OUTPUT_MAIN.open("w", encoding="utf-8") as f:
json.dump(filled, f, ensure_ascii=False, indent=2)
# 备份写入 output/configs
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
with OUTPUT_COPY.open("w", encoding="utf-8") as f:
json.dump(filled, f, ensure_ascii=False, indent=2)
print("[OK] 生成完成")
print(f"- 主配置: {OUTPUT_MAIN}")
print(f"- 备份: {OUTPUT_COPY}")
# 简要统计
dbs = filled.get("databases", {})
db_count = len(dbs)
tbl_count = sum(len(d.get("tables", {})) for d in dbs.values() if isinstance(d, dict))
print(f"- 数据库数: {db_count}, 表数: {tbl_count}")
print(f"- 消息类型键数: {len(filled.get('message_types', {}))}")
if __name__ == "__main__":
main()

17
tools/test_image_api.py Normal file
View File

@@ -0,0 +1,17 @@
#!/usr/bin/env python3
"""测试图片 API"""
import requests
r = requests.get(
'http://localhost:8000/api/chat/media/image',
params={
'account': 'wxid_v4mbduwqtzpt22',
'md5': '8753fcd3b1f8c4470b53551e13c5fbc1',
'username': 'wxid_qmzc7q0xfm0j22'
}
)
print(f'Status: {r.status_code}')
print(f'Content-Type: {r.headers.get("content-type")}')
print(f'Content-Length: {len(r.content)}')
if r.status_code != 200:
print(f'Response: {r.text[:500]}')