From ebc68de8a86814ac0f8e787d0c17ced0bb868d08 Mon Sep 17 00:00:00 2001 From: 2977094657 <2977094657@qq.com> Date: Wed, 17 Dec 2025 16:59:49 +0800 Subject: [PATCH] =?UTF-8?q?chore(tools):=20=E6=B7=BB=E5=8A=A0=E8=A7=A3?= =?UTF-8?q?=E5=AF=86=E4=B8=8E=E8=B5=84=E6=BA=90=E8=B0=83=E8=AF=95=E8=84=9A?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 增加解密/资源/表情/媒体定位等调试脚本,便于本地排查与验证 --- tools/debug_decrypt_file.py | 145 ++++++++++ tools/debug_decrypt_keys.py | 164 ++++++++++++ tools/debug_emoji_content.py | 66 +++++ tools/debug_image_lookup.py | 74 ++++++ tools/debug_media_lookup.py | 159 +++++++++++ tools/debug_message_types.py | 30 +++ tools/export_database_schema_json.py | 138 ++++++++++ tools/extract_media_keys.py | 100 +++++++ tools/generate_wechat_db_config.py | 381 +++++++++++++++++++++++++++ tools/test_image_api.py | 17 ++ 10 files changed, 1274 insertions(+) create mode 100644 tools/debug_decrypt_file.py create mode 100644 tools/debug_decrypt_keys.py create mode 100644 tools/debug_emoji_content.py create mode 100644 tools/debug_image_lookup.py create mode 100644 tools/debug_media_lookup.py create mode 100644 tools/debug_message_types.py create mode 100644 tools/export_database_schema_json.py create mode 100644 tools/extract_media_keys.py create mode 100644 tools/generate_wechat_db_config.py create mode 100644 tools/test_image_api.py diff --git a/tools/debug_decrypt_file.py b/tools/debug_decrypt_file.py new file mode 100644 index 0000000..6be95f6 --- /dev/null +++ b/tools/debug_decrypt_file.py @@ -0,0 +1,145 @@ +#!/usr/bin/env python3 +"""直接测试文件解密逻辑""" + +import sys +sys.path.insert(0, "src") + +import json +import struct +from pathlib import Path + +# 测试参数 +ACCOUNT_DIR = Path(r"d:\abc\PycharmProjects\WeChatDataAnalysis\output\databases\wxid_v4mbduwqtzpt22") +TEST_FILE = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a\msg\attach\0d6a4127daada32c5e407ae7201e785a\2025-12\Img\0923ad357c321cf286b794f8e5a66333.dat") +WXID_DIR = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a") + +# ========== 1. 读取密钥 ========== +print("[1] 读取密钥文件") +keys_file = ACCOUNT_DIR / "_media_keys.json" +if keys_file.exists(): + with open(keys_file, "r", encoding="utf-8") as f: + keys = json.load(f) + print(f" keys = {keys}") + xor_key = keys.get("xor") + aes_str = str(keys.get("aes") or "").strip() + aes_key16 = aes_str.encode("ascii", errors="ignore")[:16] if aes_str else b"" + print(f" xor_key = {xor_key}") + print(f" aes_key16 = {aes_key16}") +else: + print(" [ERROR] 密钥文件不存在") + sys.exit(1) + +# ========== 2. 读取测试文件 ========== +print(f"\n[2] 读取测试文件: {TEST_FILE}") +with open(TEST_FILE, "rb") as f: + data = f.read() +print(f" 文件大小: {len(data)} bytes") +print(f" 前 16 字节: {data[:16].hex()}") + +# ========== 3. 检测版本 ========== +print("\n[3] 检测文件版本") +sig = data[:6] +if sig == b"\x07\x08V1\x08\x07": + version = 1 + print(" 版本: V1") +elif sig == b"\x07\x08V2\x08\x07": + version = 2 + print(" 版本: V2") +else: + version = 0 + print(" 版本: V0 (纯 XOR)") + +# ========== 4. 尝试解密 ========== +print("\n[4] 尝试解密") + +from Crypto.Cipher import AES +from Crypto.Util import Padding + +def decrypt_v4(data: bytes, xor_key: int, aes_key: bytes) -> bytes: + """使用 api.py 相同的解密逻辑""" + header, rest = data[:0xF], data[0xF:] + print(f" 头部 (15 bytes): {header.hex()}") + + signature, aes_size, xor_size = struct.unpack("<6sLLx", header) + print(f" signature: {signature}") + print(f" aes_size: {aes_size}") + print(f" xor_size: {xor_size}") + + # 对齐到 AES 块大小 + aes_size_aligned = aes_size + (AES.block_size - aes_size % AES.block_size) if aes_size % AES.block_size != 0 else aes_size + print(f" aes_size_aligned: {aes_size_aligned}") + + aes_data = rest[:aes_size_aligned] + print(f" aes_data 长度: {len(aes_data)}") + print(f" aes_data 前 16 字节: {aes_data[:16].hex()}") + + cipher = AES.new(aes_key[:16], AES.MODE_ECB) + decrypted_aes_raw = cipher.decrypt(aes_data) + print(f" 解密后 (带 padding) 前 16 字节: {decrypted_aes_raw[:16].hex()}") + + try: + decrypted_data = Padding.unpad(decrypted_aes_raw, AES.block_size) + print(f" 去 padding 后长度: {len(decrypted_data)}") + except Exception as e: + print(f" [WARN] unpad 失败: {e}, 使用原始数据") + decrypted_data = decrypted_aes_raw + + if xor_size > 0: + raw_data = rest[aes_size_aligned:-xor_size] + xor_data = rest[-xor_size:] + xored_data = bytes(b ^ xor_key for b in xor_data) + print(f" raw_data 长度: {len(raw_data)}") + print(f" xor_data 长度: {len(xor_data)}") + else: + raw_data = rest[aes_size_aligned:] + xored_data = b"" + print(f" raw_data 长度: {len(raw_data)}") + + result = decrypted_data + raw_data + xored_data + print(f" 最终结果长度: {len(result)}") + print(f" 结果前 16 字节: {result[:16].hex()}") + + # 检查是否是有效图片 + if result[:3] == b"\xff\xd8\xff": + print(" [OK] 解密成功! 是 JPEG 图片") + elif result[:8] == b"\x89PNG\r\n\x1a\n": + print(" [OK] 解密成功! 是 PNG 图片") + else: + print(" [WARN] 解密后不是有效图片头") + + return result + +if version == 2 and xor_key is not None and aes_key16: + print("\n[4.1] 使用本地 decrypt_v4 函数:") + decrypted = decrypt_v4(data, xor_key, aes_key16) + + # 保存解密后的文件 + output_file = Path("test_decrypted_manual.jpg") + with open(output_file, "wb") as f: + f.write(decrypted) + print(f" 已保存: {output_file} ({len(decrypted)} bytes)") + + # 使用 WxDatDecrypt 的函数 + print("\n[4.2] 使用 WxDatDecrypt 的 decrypt_dat_v4:") + sys.path.insert(0, "WxDatDecrypt") + from decrypt import decrypt_dat_v4 as wx_decrypt_v4 + + decrypted_wx = wx_decrypt_v4(TEST_FILE, xor_key, aes_key16) + print(f" 结果长度: {len(decrypted_wx)}") + print(f" 结果前 16 字节: {decrypted_wx[:16].hex()}") + + if decrypted_wx[:3] == b"\xff\xd8\xff": + print(" [OK] 解密成功! 是 JPEG 图片") + elif decrypted_wx[:8] == b"\x89PNG\r\n\x1a\n": + print(" [OK] 解密成功! 是 PNG 图片") + else: + print(" [WARN] 解密后不是有效图片头") + + output_file2 = Path("test_decrypted_wxdat.jpg") + with open(output_file2, "wb") as f: + f.write(decrypted_wx) + print(f" 已保存: {output_file2} ({len(decrypted_wx)} bytes)") +else: + print(" [ERROR] 无法解密: 缺少必要参数") + +print("\n[Done]") diff --git a/tools/debug_decrypt_keys.py b/tools/debug_decrypt_keys.py new file mode 100644 index 0000000..ef8d21f --- /dev/null +++ b/tools/debug_decrypt_keys.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3 +"""调试媒体文件解密密钥检测""" + +import sys +sys.path.insert(0, "src") + +from pathlib import Path +from collections import Counter +import re + +WXID_DIR = Path(r"D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a") +TEST_FILE = WXID_DIR / "msg" / "attach" / "0d6a4127daada32c5e407ae7201e785a" / "2025-12" / "Img" / "0923ad357c321cf286b794f8e5a66333.dat" + +def extract_yyyymm_for_sort(p: Path) -> str: + m = re.search(r"(\d{4}-\d{2})", str(p)) + return m.group(1) if m else "0000-00" + +# ========== 检查测试文件 ========== +print(f"[1] 检查测试文件: {TEST_FILE}") +if TEST_FILE.exists(): + with open(TEST_FILE, "rb") as f: + head = f.read(64) + print(f" 存在, 大小: {TEST_FILE.stat().st_size} bytes") + print(f" 前 16 字节: {head[:16].hex()}") + sig = head[:6] + if sig == b"\x07\x08V1\x08\x07": + print(" 版本: V1") + elif sig == b"\x07\x08V2\x08\x07": + print(" 版本: V2") + else: + print(" 版本: V0 (XOR only) 或未知") +else: + print(" [ERROR] 文件不存在") + +# ========== 查找 _t.dat 模板文件 ========== +print(f"\n[2] 查找 _t.dat 模板文件") +try: + template_files = list(WXID_DIR.rglob("*_t.dat")) + print(f" 找到 {len(template_files)} 个模板文件") + template_files.sort(key=extract_yyyymm_for_sort, reverse=True) + for tf in template_files[:5]: + print(f" - {tf}") +except Exception as e: + print(f" [ERROR] {e}") + template_files = [] + +# ========== 计算 most_common_last2 ========== +print(f"\n[3] 计算模板文件末尾 2 字节的众数") +last_bytes_list = [] +for file in template_files[:16]: + try: + with open(file, "rb") as f: + f.seek(-2, 2) + b2 = f.read(2) + if b2 and len(b2) == 2: + last_bytes_list.append(b2) + except Exception: + continue + +if last_bytes_list: + most_common = Counter(last_bytes_list).most_common(1)[0][0] + print(f" 众数: {most_common.hex()} ({most_common})") +else: + most_common = None + print(" [ERROR] 没有有效的模板文件") + +# ========== 计算 XOR key ========== +print(f"\n[4] 计算 XOR key") +if most_common and len(most_common) == 2: + x, y = most_common[0], most_common[1] + xor_key = x ^ 0xFF + check = y ^ 0xD9 + print(f" x=0x{x:02x}, y=0x{y:02x}") + print(f" xor_key = x ^ 0xFF = 0x{xor_key:02x} ({xor_key})") + print(f" check = y ^ 0xD9 = 0x{check:02x} ({check})") + if xor_key == check: + print(f" [OK] XOR key 验证通过: {xor_key}") + else: + print(f" [ERROR] XOR key 验证失败") + xor_key = None +else: + xor_key = None + print(" [ERROR] 无法计算") + +# ========== 查找 V2 密文 ========== +print(f"\n[5] 查找 V2 密文 (用于 AES key 提取)") +ciphertext = None +sig = b"\x07\x08V2\x08\x07" +for file in template_files: + try: + with open(file, "rb") as f: + if f.read(6) != sig: + continue + f.seek(-2, 2) + if most_common and f.read(2) != most_common: + continue + f.seek(0xF) + ct = f.read(16) + if ct and len(ct) == 16: + ciphertext = ct + print(f" 找到密文: {ct.hex()}") + print(f" 来自文件: {file}") + break + except Exception: + continue + +if not ciphertext: + print(" [ERROR] 未找到 V2 密文") + +# ========== 检查 pycryptodome ========== +print(f"\n[6] 检查 pycryptodome") +try: + from Crypto.Cipher import AES + print(" [OK] pycryptodome 已安装") +except ImportError: + print(" [ERROR] pycryptodome 未安装, 运行: uv add pycryptodome") + +# ========== 尝试手动解密 ========== +print(f"\n[7] 尝试解密测试文件 (如果有 xor_key)") +if xor_key is not None and TEST_FILE.exists(): + with open(TEST_FILE, "rb") as f: + data = f.read() + + sig = data[:6] + print(f" 文件签名: {sig}") + + if sig == b"\x07\x08V2\x08\x07": + print(" 这是 V2 文件, 需要 AES key") + # 检查是否可以从内存提取 AES key + try: + import psutil + print(" psutil 已安装") + + # 查找微信进程 + weixin_pid = None + for p in psutil.process_iter(["name"]): + name = (p.info.get("name") or "").lower() + if name in {"weixin.exe", "wechat.exe"}: + weixin_pid = p.pid + break + + if weixin_pid: + print(f" 找到微信进程: PID={weixin_pid}") + print(" 需要从进程内存提取 AES key (需要管理员权限)") + else: + print(" [WARN] 未找到微信进程, 无法自动提取 AES key") + print(" 请确保微信正在运行") + except ImportError: + print(" [ERROR] psutil 未安装") + elif sig == b"\x07\x08V1\x08\x07": + print(" 这是 V1 文件, 尝试使用 xor_key + 固定 AES key 解密") + else: + print(" 这是 V0 文件, 尝试纯 XOR 解密") + decrypted = bytes(b ^ xor_key for b in data) + # 检查解密后的魔数 + if decrypted[:3] == b"\xff\xd8\xff": + print(" [OK] 解密成功! 是 JPEG 图片") + elif decrypted[:8] == b"\x89PNG\r\n\x1a\n": + print(" [OK] 解密成功! 是 PNG 图片") + else: + print(f" 解密后前 16 字节: {decrypted[:16].hex()}") + print(" [WARN] 解密后不是有效图片") + +print("\n[Done]") diff --git a/tools/debug_emoji_content.py b/tools/debug_emoji_content.py new file mode 100644 index 0000000..3a08d6b --- /dev/null +++ b/tools/debug_emoji_content.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3 +"""调试表情消息内容""" + +import sqlite3 +from pathlib import Path + +db_path = Path(r'd:\abc\PycharmProjects\WeChatDataAnalysis\output\databases\wxid_v4mbduwqtzpt22') +msg_dbs = list(db_path.glob('message_*.db')) +print(f'Found {len(msg_dbs)} message databases') + +for db in msg_dbs[:1]: + print(f'\nDatabase: {db.name}') + conn = sqlite3.connect(str(db)) + conn.row_factory = sqlite3.Row + + # 先查看表结构 + tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + print(f'Tables: {[t[0] for t in tables]}') + + # 找到消息表 + for t in tables: + tname = t[0] + if 'msg' in tname.lower(): + # 查看列名 + cols = conn.execute(f"PRAGMA table_info({tname})").fetchall() + col_names = [c[1] for c in cols] + print(f'Table {tname} columns: {col_names}') + + # 查找 type=47 的消息 + type_col = 'local_type' if 'local_type' in col_names else 'type' + content_col = 'message_content' if 'message_content' in col_names else 'content' + compress_col = 'compress_content' if 'compress_content' in col_names else None + + query = f"SELECT * FROM {tname} WHERE {type_col} = 47 LIMIT 3" + try: + rows = conn.execute(query).fetchall() + print(f'Found {len(rows)} emoji messages') + import zstandard as zstd + for r in rows: + d = dict(r) + content = d.get('message_content') or d.get('content') or b'' + + # 尝试解压 message_content + if isinstance(content, bytes) and content.startswith(b'\x28\xb5\x2f\xfd'): + try: + dctx = zstd.ZstdDecompressor() + content = dctx.decompress(content).decode('utf-8', errors='replace') + except Exception as e: + print(f' zstd decompress message_content failed: {e}') + + print(f' Decompressed content (first 800):') + print(f' {str(content)[:800]}') + + # 提取 md5 和 cdnurl + import re + md5_match = re.search(r'md5="([^"]+)"', str(content)) + cdnurl_match = re.search(r'cdnurl="([^"]+)"', str(content)) + thumburl_match = re.search(r'thumburl="([^"]+)"', str(content)) + + print(f' md5: {md5_match.group(1) if md5_match else "NOT FOUND"}') + print(f' cdnurl: {cdnurl_match.group(1)[:80] if cdnurl_match else "NOT FOUND"}') + print(f' thumburl: {thumburl_match.group(1)[:80] if thumburl_match else "NOT FOUND"}') + break + except Exception as e: + print(f'Query failed: {e}') + conn.close() diff --git a/tools/debug_image_lookup.py b/tools/debug_image_lookup.py new file mode 100644 index 0000000..03f66d6 --- /dev/null +++ b/tools/debug_image_lookup.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +"""调试图片查找""" +import sqlite3 +from pathlib import Path + +account = 'wxid_v4mbduwqtzpt22' +md5 = '8753fcd3b1f8c4470b53551e13c5fbc1' + +db_dir = Path(r'd:\abc\PycharmProjects\WeChatDataAnalysis\output\databases') / account +hardlink_db = db_dir / 'hardlink.db' + +print(f'Hardlink DB exists: {hardlink_db.exists()}') + +if hardlink_db.exists(): + conn = sqlite3.connect(str(hardlink_db)) + conn.row_factory = sqlite3.Row + + # List tables + tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + print(f'Tables: {[t[0] for t in tables]}') + + # Find image hardlink table + for t in tables: + tname = t[0] + if 'image' in tname.lower() and 'hardlink' in tname.lower(): + print(f'\nChecking table: {tname}') + cols = conn.execute(f"PRAGMA table_info({tname})").fetchall() + print(f'Columns: {[c[1] for c in cols]}') + + # Search for the md5 + row = conn.execute(f"SELECT * FROM [{tname}] WHERE md5 = ? LIMIT 1", (md5,)).fetchone() + if row: + print(f'Found: {dict(row)}') + dir1 = row['dir1'] + dir2 = row['dir2'] + file_name = row['file_name'] + + # Check dir2id table structure + dir2id_cols = conn.execute("PRAGMA table_info(dir2id)").fetchall() + print(f'dir2id columns: {[c[1] for c in dir2id_cols]}') + + # Get sample from dir2id + dir2id_sample = conn.execute("SELECT * FROM dir2id LIMIT 3").fetchall() + print(f'dir2id sample: {[dict(r) for r in dir2id_sample]}') + + # Try to find matching dir2 value using rowid + dir2id_row = conn.execute("SELECT rowid, username FROM dir2id WHERE rowid = ? LIMIT 1", (dir2,)).fetchone() + print(f'dir2id lookup for rowid={dir2}: {dict(dir2id_row) if dir2id_row else "NOT FOUND"}') + + # Try to construct the path + weixin_root = Path(r'D:\abc\wechatMSG\xwechat_files\wxid_v4mbduwqtzpt22_1e7a') + if dir2id_row: + dir_name = dir2id_row['username'] # In WeChat 4.x, username column is the folder name + else: + dir_name = str(dir2) + + possible_path = weixin_root / str(dir1) / dir_name / file_name + print(f'Possible path: {possible_path}') + print(f'Path exists: {possible_path.exists()}') + + # Also try _h.dat variant + h_path = possible_path.with_name(possible_path.stem + '_h.dat') + print(f'_h.dat path: {h_path}') + print(f'_h.dat exists: {h_path.exists()}') + else: + print(f'MD5 {md5} not found in {tname}') + + # Show sample data + sample = conn.execute(f"SELECT md5, dir1, dir2, file_name FROM [{tname}] LIMIT 3").fetchall() + print(f'Sample data:') + for s in sample: + print(f' md5={s[0]}, dir1={s[1]}, dir2={s[2]}, file_name={s[3]}') + + conn.close() diff --git a/tools/debug_media_lookup.py b/tools/debug_media_lookup.py new file mode 100644 index 0000000..d15184f --- /dev/null +++ b/tools/debug_media_lookup.py @@ -0,0 +1,159 @@ +#!/usr/bin/env python3 +"""调试媒体文件查找逻辑""" + +import sqlite3 +from pathlib import Path + +# ========== 配置 ========== +ACCOUNT = "wxid_v4mbduwqtzpt22" +MD5 = "0923ad357c321cf286b794f8e5a66333" +USERNAME = "wxid_qmzc7q0xfm0j22" + +REPO_ROOT = Path(__file__).resolve().parents[1] +OUTPUT_DB_DIR = REPO_ROOT / "output" / "databases" / ACCOUNT + +# ========== 读取 _source.json ========== +import json + +source_json = OUTPUT_DB_DIR / "_source.json" +print(f"[1] 检查 _source.json: {source_json}") +if source_json.exists(): + with open(source_json, "r", encoding="utf-8") as f: + source = json.load(f) + wxid_dir = source.get("wxid_dir", "") + db_storage_path = source.get("db_storage_path", "") + print(f" wxid_dir: {wxid_dir}") + print(f" db_storage_path: {db_storage_path}") +else: + print(" [ERROR] _source.json 不存在!") + wxid_dir = "" + db_storage_path = "" + +# ========== 检查 hardlink.db ========== +hardlink_db = OUTPUT_DB_DIR / "hardlink.db" +print(f"\n[2] 检查 hardlink.db: {hardlink_db}") +rows = [] +dir2id_map = {} + +if not hardlink_db.exists(): + print(" [ERROR] hardlink.db 不存在!") +else: + print(" [OK] 文件存在") + conn = sqlite3.connect(str(hardlink_db)) + + # 先列出所有表 + print(f"\n[2.1] 列出所有表:") + tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall() + for t in tables: + print(f" - {t[0]}") + # 列出表的列 + cols = conn.execute(f"PRAGMA table_info({t[0]})").fetchall() + col_names = [c[1] for c in cols] + print(f" 列: {col_names}") + + # 尝试不同的表名查询 + print(f"\n[3] 查询 hardlink 表 (md5={MD5})") + possible_tables = ["image_hardlink_info", "HardLinkImageAttribute", "HardLinkImageAttribute2"] + for tbl in possible_tables: + try: + # 先检查表是否存在 + exists = conn.execute(f"SELECT name FROM sqlite_master WHERE type='table' AND name=?", (tbl,)).fetchone() + if not exists: + continue + print(f" 尝试表: {tbl}") + # 获取列名 + cols = conn.execute(f"PRAGMA table_info({tbl})").fetchall() + col_names = [c[1] for c in cols] + print(f" 列: {col_names}") + # 查询 md5 + if "Md5" in col_names: + rows = conn.execute(f"SELECT * FROM {tbl} WHERE Md5 = ? LIMIT 5", (MD5,)).fetchall() + elif "md5" in col_names: + rows = conn.execute(f"SELECT * FROM {tbl} WHERE md5 = ? LIMIT 5", (MD5,)).fetchall() + else: + print(f" [WARN] 没有 md5 列") + continue + if rows: + print(f" 找到 {len(rows)} 条记录:") + for i, row in enumerate(rows): + print(f" [{i}] {dict(zip(col_names, row))}") + else: + print(f" [WARN] 没有匹配记录") + except Exception as e: + print(f" [ERROR] 查询 {tbl} 失败: {e}") + + # 查询 dir2id 映射 + print(f"\n[4] 查询 dir2id 表") + try: + # 先检查列名 + cols = conn.execute("PRAGMA table_info(dir2id)").fetchall() + col_names = [c[1] for c in cols] + print(f" 列: {col_names}") + dir2id_rows = conn.execute("SELECT * FROM dir2id LIMIT 10").fetchall() + print(f" 共 {len(dir2id_rows)} 条(最多显示10条):") + for row in dir2id_rows: + print(f" {dict(zip(col_names, row))}") + # 构建映射 + if len(col_names) >= 2: + dir2id_map = {row[0]: row[1] for row in dir2id_rows} + except Exception as e: + print(f" [ERROR] 查询失败: {e}") + dir2id_map = {} + + conn.close() + +# ========== 尝试拼接路径并检查文件是否存在 ========== +print(f"\n[5] 尝试拼接路径并检查文件") +if wxid_dir and rows: + wxid_path = Path(wxid_dir) + for i, row in enumerate(rows): + dir1, dir2, file_name, _ = row + dir_name = dir2id_map.get(dir2, str(dir2)) + + # 尝试多个根目录 + roots = [ + wxid_path, + wxid_path / "msg" / "attach", + wxid_path / "msg" / "file", + wxid_path / "msg" / "video", + wxid_path / "cache", + ] + + for root in roots: + candidate = root / dir1 / dir_name / file_name + exists = candidate.exists() + print(f" [{i}] {candidate}") + print(f" 存在: {exists}") + if exists: + print(f" [FOUND!] 大小: {candidate.stat().st_size} bytes") + +# ========== 直接搜索 md5 文件 ========== +print(f"\n[6] 直接在 wxid_dir 下搜索 md5 文件") +if wxid_dir: + wxid_path = Path(wxid_dir) + search_dirs = [ + wxid_path / "msg" / "attach", + wxid_path / "msg" / "file", + wxid_path / "msg" / "video", + wxid_path / "cache", + ] + patterns = [f"{MD5}*.dat", f"{MD5}*.jpg", f"{MD5}*.png"] + + found_any = False + for d in search_dirs: + if not d.exists(): + print(f" [SKIP] {d} 不存在") + continue + for pat in patterns: + try: + matches = list(d.rglob(pat)) + for m in matches: + print(f" [FOUND] {m} ({m.stat().st_size} bytes)") + found_any = True + except Exception as e: + print(f" [ERROR] 搜索 {d}/{pat} 失败: {e}") + + if not found_any: + print(" [WARN] 没有找到任何匹配文件") + +print("\n[Done]") diff --git a/tools/debug_message_types.py b/tools/debug_message_types.py new file mode 100644 index 0000000..5002a57 --- /dev/null +++ b/tools/debug_message_types.py @@ -0,0 +1,30 @@ +#!/usr/bin/env python3 +"""调试消息类型返回值""" + +import requests + +resp = requests.get('http://localhost:8000/api/chat/messages', params={ + 'account': 'wxid_v4mbduwqtzpt22', + 'username': 'wxid_qmzc7q0xfm0j22', + 'limit': 100 +}) +data = resp.json() +messages = data.get('messages', []) + +# 找出不同类型的消息 +types_found = {} +for m in messages: + rt = m.get('renderType', 'text') + if rt not in types_found: + types_found[rt] = m + +print('找到的消息类型:') +for rt, m in types_found.items(): + content = str(m.get('content') or '')[:50] + print(f" {rt}: type={m.get('type')}, content={content}") + if rt == 'emoji': + print(f" emojiMd5={m.get('emojiMd5')}") + print(f" emojiUrl={m.get('emojiUrl')}") + if rt == 'image': + print(f" imageMd5={m.get('imageMd5')}") + print(f" imageUrl={str(m.get('imageUrl') or '')[:80]}") diff --git a/tools/export_database_schema_json.py b/tools/export_database_schema_json.py new file mode 100644 index 0000000..261e8fd --- /dev/null +++ b/tools/export_database_schema_json.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +导出微信数据库分析结果为 JSON: +- 基于 analyze_wechat_databases.WeChatDatabaseAnalyzer +- 联合 wechat_db_config.json(含 ohmywechat 常见类型与启发式)补全字段含义 +- 生成汇总 JSON 与按库拆分的 JSON 文件 + +用法: + python tools/export_database_schema_json.py \ + --databases-path output/databases \ + --output-dir output/schema_json \ + --config wechat_db_config.json +""" + +from __future__ import annotations + +import argparse +import json +from datetime import datetime +from pathlib import Path +from typing import Any, Dict +import sys + +# 项目根目录 +ROOT = Path(__file__).resolve().parents[1] +# 确保能导入项目根目录下的 analyze_wechat_databases.py +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) + + +def export_analysis(databases_path: Path, output_dir: Path, config_file: Path) -> int: + # 延迟导入分析器 + from analyze_wechat_databases import WeChatDatabaseAnalyzer + + output_dir.mkdir(parents=True, exist_ok=True) + + analyzer = WeChatDatabaseAnalyzer(databases_path=str(databases_path), config_file=str(config_file)) + results = analyzer.analyze_all_databases() # dict[db_name] = db_info + + meta = { + "generated_time": datetime.now().isoformat(), + "source": "analyze_wechat_databases.py", + "config_used": str(config_file), + "databases_root": str(databases_path), + "note": "字段含义来自 wechat_db_config.json 与启发式推断(结合 ohmywechat 常见类型)", + } + + combined: Dict[str, Any] = {"_metadata": meta, "databases": {}} + + count_dbs = 0 + for db_name, db_info in results.items(): + count_dbs += 1 + db_out: Dict[str, Any] = { + "database_name": db_info.get("database_name", db_name), + "database_path": db_info.get("database_path"), + "database_size": db_info.get("database_size"), + "description": db_info.get("description"), + "table_count": db_info.get("table_count"), + "tables": {}, + } + + tables = db_info.get("tables", {}) + for table_name, table in tables.items(): + # 列增强:补充 meaning + cols_out = [] + for col in table.get("columns", []): + name = col.get("name") + meaning = analyzer.get_field_meaning(name, table_name) if name else "" + cols_out.append({ + "name": name, + "type": col.get("type"), + "notnull": col.get("notnull"), + "default": col.get("dflt_value"), + "pk": col.get("pk"), + "meaning": meaning, + }) + + tbl_out = { + "row_count": table.get("row_count", 0), + "columns": cols_out, + "indexes": table.get("indexes", []), + "foreign_keys": table.get("foreign_keys", []), + "create_sql": table.get("create_sql"), + "sample_data": table.get("sample_data", []), + # 相似组标记(如 Msg_* 合并) + "is_representative": table.get("is_representative", False), + "similar_group": table.get("similar_group", {}), + } + + db_out["tables"][table_name] = tbl_out + + # 写入单库 JSON + single_path = output_dir / f"{db_name}.schema.json" + with single_path.open("w", encoding="utf-8") as f: + json.dump(db_out, f, ensure_ascii=False, indent=2) + + combined["databases"][db_name] = db_out + + print(f"[OK] 写出数据库JSON: {single_path.name}") + + # 汇总文件 + combined_path = output_dir / "all_databases.schema.json" + with combined_path.open("w", encoding="utf-8") as f: + json.dump(combined, f, ensure_ascii=False, indent=2) + + print(f"[OK] 汇总JSON: {combined_path} (数据库数: {count_dbs})") + return count_dbs + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument("--databases-path", default=str(ROOT / "output" / "databases"), + help="解密后的数据库根目录(按账号分目录)") + parser.add_argument("--output-dir", default=str(ROOT / "output" / "schema_json"), + help="JSON 输出目录") + parser.add_argument("--config", default=str(ROOT / "wechat_db_config.json"), + help="字段含义配置 JSON(由 tools/generate_wechat_db_config.py 生成)") + args = parser.parse_args() + + db_root = Path(args.databases_path) + out_dir = Path(args.output_dir) + cfg = Path(args.config) + + if not cfg.exists(): + raise FileNotFoundError(f"未找到配置文件: {cfg},请先运行 tools/generate_wechat_db_config.py") + + if not db_root.exists(): + print(f"[WARN] 数据库目录不存在: {db_root},仍将生成空汇总文件。") + + count = export_analysis(db_root, out_dir, cfg) + if count == 0: + print("[INFO] 未检测到可分析数据库(可先运行解密流程或确认路径)") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/extract_media_keys.py b/tools/extract_media_keys.py new file mode 100644 index 0000000..8a3add5 --- /dev/null +++ b/tools/extract_media_keys.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python3 +""" +提取微信 4.x 媒体解密密钥 (需要管理员权限运行) + +用法: +1. 确保微信正在运行 +2. 以管理员身份运行 PowerShell +3. cd 到项目目录 +4. 运行: uv run python tools/extract_media_keys.py +""" + +import sys +sys.path.insert(0, "src") +sys.path.insert(0, "WxDatDecrypt") + +import json +from pathlib import Path + +try: + from key import find_key +except ImportError as e: + print(f"[ERROR] 无法导入 WxDatDecrypt: {e}") + print("请确保 pymem, yara-python, pycryptodome 已安装") + sys.exit(1) + +# ========== 配置 ========== +REPO_ROOT = Path(__file__).resolve().parents[1] +OUTPUT_DB_DIR = REPO_ROOT / "output" / "databases" + + +def main(): + print("=" * 60) + print("微信 4.x 媒体解密密钥提取工具") + print("=" * 60) + + # 1. 列出所有账号 + print("\n[1] 列出已解密账号...") + if not OUTPUT_DB_DIR.exists(): + print("[ERROR] output/databases 目录不存在") + sys.exit(1) + + accounts = [] + for p in OUTPUT_DB_DIR.iterdir(): + if p.is_dir() and (p / "_source.json").exists(): + accounts.append(p.name) + + if not accounts: + print("[ERROR] 没有找到已解密的账号") + sys.exit(1) + + print(f" 找到 {len(accounts)} 个账号") + + # 2. 处理每个账号 + for account in accounts: + print(f"\n[2] 处理账号: {account}") + account_dir = OUTPUT_DB_DIR / account + + # 读取 _source.json + source_json = account_dir / "_source.json" + with open(source_json, "r", encoding="utf-8") as f: + source = json.load(f) + + wxid_dir_str = source.get("wxid_dir", "") + if not wxid_dir_str: + print(" [SKIP] 没有 wxid_dir") + continue + + wxid_dir = Path(wxid_dir_str) + if not wxid_dir.exists(): + print(f" [SKIP] wxid_dir 不存在: {wxid_dir}") + continue + + # 使用 WxDatDecrypt 的 find_key 函数 + print(f" wxid_dir: {wxid_dir}") + print(" 正在提取密钥 (需要微信正在运行且有管理员权限)...") + + try: + xor_key, aes_key = find_key(wxid_dir, version=4) + + # 保存到 _media_keys.json + keys_file = account_dir / "_media_keys.json" + keys_data = { + "xor": xor_key, + "aes": aes_key.decode("ascii") if isinstance(aes_key, bytes) else str(aes_key), + } + with open(keys_file, "w", encoding="utf-8") as f: + json.dump(keys_data, f, indent=2) + print(f" [OK] 密钥已保存到: {keys_file}") + print(f" XOR key: {xor_key}") + print(f" AES key: {keys_data['aes']}") + except Exception as e: + print(f" [ERROR] 提取失败: {e}") + + print("\n" + "=" * 60) + print("完成!请重启后端服务以使密钥生效。") + print("=" * 60) + + +if __name__ == "__main__": + main() diff --git a/tools/generate_wechat_db_config.py b/tools/generate_wechat_db_config.py new file mode 100644 index 0000000..d622368 --- /dev/null +++ b/tools/generate_wechat_db_config.py @@ -0,0 +1,381 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +生成 wechat_db_config.json: +- 读取 wechat_db_config_template.json +- 融合本项目 analyze_wechat_databases 的启发式 + ohmywechat 常见字段/消息类型 +- 批量为每个表字段补全中文含义,并写出 wechat_db_config.json +""" + +from __future__ import annotations + +import json +import re +from pathlib import Path +from datetime import datetime + +ROOT = Path(__file__).resolve().parents[1] +TEMPLATE_PATH = ROOT / "wechat_db_config_template.json" +OUTPUT_MAIN = ROOT / "wechat_db_config.json" +OUTPUT_DIR = ROOT / "output" / "configs" +OUTPUT_COPY = OUTPUT_DIR / "wechat_db_config.generated.json" + +# 尝试导入分析器以复用其启发式 +AnalyzerCls = None +try: + from analyze_wechat_databases import WeChatDatabaseAnalyzer # type: ignore + AnalyzerCls = WeChatDatabaseAnalyzer +except Exception: + AnalyzerCls = None + + +def build_db_descriptions() -> dict[str, str]: + return { + "message": "聊天记录核心数据库", + "message_3": "聊天消息分表数据库(示例或分片)", + "message_fts": "聊天消息全文索引数据库(FTS)", + "message_resource": "消息资源索引数据库(图片/文件/视频等)", + "contact": "联系人数据库(好友/群/公众号基础信息)", + "session": "会话数据库(会话列表与未读统计)", + "sns": "朋友圈数据库(动态与互动)", + "favorite": "收藏数据库", + "emoticon": "表情包数据库", + "head_image": "头像数据数据库", + "hardlink": "硬链接索引数据库(资源去重/快速定位)", + "media_0": "媒体数据数据库(含语音SILK等)", + "unspportmsg": "不支持消息数据库(客户端不支持的消息类型)", + "general": "通用/系统数据库(新消息通知/支付等)", + } + + +def build_message_types_from_ohmywechat() -> dict[str, str]: + """ + 参考 ohmywechat 等资料补充 PC/公众号常见 local_type → 含义 + 使用 (Type,SubType) 形式的字符串键;子类型未知时置 0 + """ + return { + "1,0": "文本消息", + "3,0": "图片消息", + "34,0": "语音消息", + "42,0": "名片消息", + "43,0": "视频消息", + "47,0": "动画表情", + "48,0": "位置消息", + "244813135921,0": "引用消息", + "17179869233,0": "卡片式链接(带描述)", + "21474836529,0": "卡片式链接/图文消息(公众号,mmreader XML)", + "154618822705,0": "小程序分享", + "12884901937,0": "音乐卡片", + "8594229559345,0": "红包卡片", + "81604378673,0": "聊天记录合并转发消息", + "266287972401,0": "拍一拍消息", + "8589934592049,0": "转账卡片", + "270582939697,0": "视频号直播卡片", + "25769803825,0": "文件消息", + "10000,0": "系统消息(撤回/入群提示等)", + } + + +KNOWN_FIELD_MEANINGS = { + # 通用主键/标识 + "id": "标识符字段(主键/索引)", + "local_id": "本地自增ID(主键/定位用)", + "server_id": "服务器消息ID(唯一且全局递增)", + "svr_id": "服务器消息ID(同server_id)", + "message_id": "消息ID(表内主键或消息级索引)", + "resource_id": "资源ID(资源明细主键)", + "history_id": "历史消息ID(系统消息/历史消息关联键)", + + # 会话/用户/群聊 + "username": "用户名/会话标识(wxid_xxx 或 xxx@chatroom)", + "user_name": "用户名/会话标识(wxid_xxx 或 xxx@chatroom)", + "sender_id": "发送者内部ID(与Name2Id映射)", + "real_sender_id": "真实发送者ID(群聊内消息具体成员)", + "chat_id": "会话内部ID(与ChatName2Id映射)", + "chat_name_id": "会话内部ID(与ChatName2Id映射)", + "session_id": "会话ID(FTS/资源维度的会话映射)", + "session_name": "会话名(username 文本值)", + "session_name_id": "会话内部ID(username 的数值映射)", + "talker_id": "会话/房间ID(Name2Id 对照)", + + # 消息结构/状态 + "local_type": "本地消息类型(local_type)", + "type": "类型标识(上下文相关:消息/表情/配置)", + "sub_type": "子类型标识(同一主类型细分)", + "status": "状态标志位(发送/接收/已读/撤回等)", + "upload_status": "上传状态(媒体/资源上行状态)", + "download_status": "下载状态(媒体/资源下行状态)", + "server_seq": "服务器序列号(消息顺序校验)", + "origin_source": "消息来源标识(客户端/转发/系统)", + "source": "来源附加信息(XML/JSON 等)", + "msg_status": "消息状态(扩展)", + + # 消息内容 + "message_content": "消息内容(部分类型为zstd压缩的XML:mmreader)", + "compress_content": "压缩内容(多见zstd,可能存放富文本XML)", + "packed_info_data": "打包扩展信息(二进制,消息元数据)", + "packed_info": "打包扩展信息(二进制/文本混合)", + "data_index": "数据分片/索引(媒体片段定位)", + + # 时间 + "create_time": "创建时间(Unix时间戳,秒)", + "last_update_time": "最后更新时间(Unix时间戳)", + "last_modified_time": "最后修改时间(Unix时间戳)", + "update_time": "更新时间(Unix时间戳)", + "invalid_time": "失效时间(Unix时间戳)", + "access_time": "访问时间(Unix时间戳)", + "last_timestamp": "最后消息时间(会话)", + "sort_timestamp": "排序时间(会话排序)", + "timestamp": "时间戳(Unix时间戳)", + + # 排序/去重 + "sort_seq": "排序序列(单会话内消息排序/去重)", + "server_seq_": "服务器序列号(扩展)", + + # 联系人/群聊 + "alias": "别名(用户自定义标识)", + "encrypt_username": "加密用户名", + "flag": "标志位(多用途:联系人/公众号/配置)", + "delete_flag": "删除标志(软删除)", + "verify_flag": "认证标志(公众号/企业认证等)", + "remark": "备注名", + "remark_quan_pin": "备注名全拼", + "remark_pin_yin_initial": "备注名拼音首字母", + "nick_name": "昵称", + "pin_yin_initial": "昵称拼音首字母", + "quan_pin": "昵称全拼", + "description": "描述/个性签名/备注", + "extra_buffer": "扩展缓冲区(二进制/序列化)", + "ext_buffer": "扩展缓冲区(二进制/序列化)", + "ext_buffer_": "扩展缓冲区(二进制/序列化)", + "chat_room_type": "群类型标志", + "owner": "群主 username", + + # 头像/媒体 + "big_head_url": "头像大图URL", + "small_head_url": "头像小图URL", + "head_img_md5": "头像MD5", + "image_buffer": "头像二进制数据", + "voice_data": "语音二进制数据(多为SILK)", + + # FTS / 内部表 + "acontent": "FTS检索内容(分词后文本)", + "block": "FTS内部块数据(二进制)", + "segid": "FTS分段ID", + "term": "FTS分词条目", + "pgno": "FTS页号", + "c0": "FTS列c0(内部结构)", + "c1": "FTS列c1(内部结构)", + "c2": "FTS列c2(内部结构)", + "c3": "FTS列c3(内部结构)", + "c4": "FTS列c4(内部结构)", + "c5": "FTS列c5(内部结构)", + "c6": "FTS列c6(内部结构)", + "sz": "FTS文档大小信息", + "_rowid_": "SQLite内部行ID", + + # 资源/硬链接 + "md5": "资源MD5", + "md5_hash": "MD5哈希整数映射(快速索引)", + "file_name": "文件名(相对/逻辑名)", + "file_size": "文件大小(字节)", + "dir1": "资源路径一级目录编号(分桶)", + "dir2": "资源路径二级目录编号(分桶)", + "modify_time": "文件修改时间戳", + + # 会话统计 + "unread_count": "未读计数", + "unread_first_msg_srv_id": "会话未读区间首个消息SvrID", + "is_hidden": "会话隐藏标志", + "summary": "会话摘要(最近消息摘要)", + "draft": "草稿内容", + "status_": "状态/标志(上下文)", + "last_clear_unread_timestamp": "上次清空未读时间", + "last_msg_locald_id": "最后一条消息的本地ID(拼写原样保留)", + "last_msg_type": "最后一条消息类型", + "last_msg_sub_type": "最后一条消息子类型", + "last_msg_sender": "最后一条消息发送者username", + "last_sender_display_name": "最后一条消息发送者显示名", + "last_msg_ext_type": "最后一条消息扩展类型", + + # WCDB 压缩控制 + "WCDB_CT_message_content": "WCDB压缩标记(message_content列)", + "WCDB_CT_source": "WCDB压缩标记(source列)", +} + + +def simple_heuristic(field_name: str, table_name: str) -> str: + """简易兜底启发式,避免完全空白""" + f = field_name.lower() + t = table_name.lower() + if f.endswith("id") or f in {"_rowid_", "rowid"} or f == "id": + return "标识符字段" + if "time" in f or "timestamp" in f: + return "时间戳字段" + if f in {"name", "user_name", "username"}: + return "用户名/会话名" + if f in {"content", "message_content", "compress_content"}: + return "内容/正文字段" + if "md5" in f: + return "MD5哈希字段" + if "status" in f: + return "状态位/状态码" + if f.startswith("is_"): + return "布尔标志字段" + if f.startswith("wcdb_ct_"): + return "WCDB压缩控制字段" + if "buf" in f or "buffer" in f or "blob" in f: + return "二进制缓冲数据" + if "url" in f: + return "URL链接" + if "size" in f or "count" in f: + return "数量/大小字段" + if "seq" in f: + return "序列号/排序字段" + # 针对 Msg_* 常见列 + if t.startswith("msg_"): + if f == "source": + return "消息来源附加信息(XML/JSON)" + if f == "local_type": + return "本地消息类型(local_type)" + return "未知用途字段" + + +def compute_field_meaning(analyzer, table_name: str, field_name: str) -> str: + # 优先精确已知映射 + if field_name in KNOWN_FIELD_MEANINGS: + return KNOWN_FIELD_MEANINGS[field_name] + lf = field_name.lower() + if lf in KNOWN_FIELD_MEANINGS: + return KNOWN_FIELD_MEANINGS[lf] + + # 额外针对 mmreader/zstd 提示 + if lf in {"message_content", "compress_content"}: + return "消息内容(部分类型为zstd压缩XML:mmreader)" + + # 借用项目内启发式 + if analyzer is not None: + try: + return analyzer.get_field_meaning(field_name, table_name) + except Exception: + pass + + # 简易兜底 + return simple_heuristic(field_name, table_name) + + +def guess_table_desc(analyzer, table_name: str) -> str: + if analyzer is not None: + try: + return analyzer.guess_table_function(table_name) + except Exception: + pass + # 简易猜测 + tl = table_name.lower() + if tl == "msg" or tl.startswith("msg_"): + return "某会话的消息表(聊天消息数据)" + if "name2id" in tl: + return "用户名到内部ID映射表" + if "contact" in tl: + return "联系人/群聊信息表" + if "session" in tl: + return "会话信息/未读统计表" + if "fts" in tl: + return "全文检索(FTS)内部表" + if "resource" in tl: + return "消息资源/附件索引表" + return "未知功能表" + + +def fill_config(template: dict) -> dict: + # 创建一个分析器实例,仅用于启发式(使用默认配置) + analyzer = None + if AnalyzerCls is not None: + try: + analyzer = AnalyzerCls(databases_path=str(ROOT / "output" / "databases"), + config_file="nonexistent_config.json") + except Exception: + analyzer = None + + # 数据库描述补齐 + db_desc_map = build_db_descriptions() + + databases = template.get("databases", {}) + for db_name, db in databases.items(): + if isinstance(db, dict): + # 数据库级描述 + if not db.get("description"): + # 用已知映射或尝试推断 + db["description"] = db_desc_map.get(db_name, db.get("description", "")) or "未知用途数据库" + + # 遍历表 + tables = db.get("tables", {}) + for table_name, table in tables.items(): + if not isinstance(table, dict): + continue + + # 表功能描述 + if not table.get("description"): + table["description"] = guess_table_desc(analyzer, table_name) + + # 字段含义补齐 + fields = table.get("fields", {}) + if isinstance(fields, dict): + for field_name, field_meta in fields.items(): + if not isinstance(field_meta, dict): + continue + meaning = field_meta.get("meaning", "") + if not meaning: + field_meta["meaning"] = compute_field_meaning(analyzer, table_name, field_name) + + # 消息类型映射补充(保留模板 instructional 字段,另外插入真实映射键) + mt_real = build_message_types_from_ohmywechat() + message_types = template.get("message_types", {}) + # 合并:新增真实键 + for k, v in mt_real.items(): + message_types[k] = v + template["message_types"] = message_types + + # 元数据刷新 + meta = template.get("_metadata", {}) + meta["version"] = "1.1" + meta["generated_time"] = datetime.now().isoformat() + meta["description"] = "微信数据库字段配置(由模板自动补全,融合启发式与ohmywechat常见类型)" + template["_metadata"] = meta + + return template + + +def main(): + if not TEMPLATE_PATH.exists(): + raise FileNotFoundError(f"Template not found: {TEMPLATE_PATH}") + + with TEMPLATE_PATH.open("r", encoding="utf-8") as f: + template = json.load(f) + + filled = fill_config(template) + + # 写主配置(供分析器默认加载) + with OUTPUT_MAIN.open("w", encoding="utf-8") as f: + json.dump(filled, f, ensure_ascii=False, indent=2) + + # 备份写入 output/configs + OUTPUT_DIR.mkdir(parents=True, exist_ok=True) + with OUTPUT_COPY.open("w", encoding="utf-8") as f: + json.dump(filled, f, ensure_ascii=False, indent=2) + + print("[OK] 生成完成") + print(f"- 主配置: {OUTPUT_MAIN}") + print(f"- 备份: {OUTPUT_COPY}") + + # 简要统计 + dbs = filled.get("databases", {}) + db_count = len(dbs) + tbl_count = sum(len(d.get("tables", {})) for d in dbs.values() if isinstance(d, dict)) + print(f"- 数据库数: {db_count}, 表数: {tbl_count}") + print(f"- 消息类型键数: {len(filled.get('message_types', {}))}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/tools/test_image_api.py b/tools/test_image_api.py new file mode 100644 index 0000000..cac32d9 --- /dev/null +++ b/tools/test_image_api.py @@ -0,0 +1,17 @@ +#!/usr/bin/env python3 +"""测试图片 API""" +import requests + +r = requests.get( + 'http://localhost:8000/api/chat/media/image', + params={ + 'account': 'wxid_v4mbduwqtzpt22', + 'md5': '8753fcd3b1f8c4470b53551e13c5fbc1', + 'username': 'wxid_qmzc7q0xfm0j22' + } +) +print(f'Status: {r.status_code}') +print(f'Content-Type: {r.headers.get("content-type")}') +print(f'Content-Length: {len(r.content)}') +if r.status_code != 200: + print(f'Response: {r.text[:500]}')