mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-02-02 05:50:50 +08:00
chore: 更新配置和其他小改动
- 更新nuxt配置 - 优化首页样式 - 调整主程序和解密逻辑 - 添加数据库分析脚本
This commit is contained in:
3
.gitignore
vendored
3
.gitignore
vendored
@@ -13,3 +13,6 @@ wheels/
|
||||
/.history/
|
||||
/.augment/
|
||||
/CLAUDE.md
|
||||
|
||||
# Local config templates
|
||||
/wechat_db_config_template.json
|
||||
1590
analyze_wechat_databases.py
Normal file
1590
analyze_wechat_databases.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -39,6 +39,11 @@ export default defineNuxtConfig({
|
||||
'@nuxtjs/tailwindcss',
|
||||
'@pinia/nuxt'
|
||||
],
|
||||
|
||||
// 启用组件自动导入
|
||||
components: [
|
||||
{ path: '~/components', pathPrefix: false }
|
||||
],
|
||||
|
||||
// Tailwind配置
|
||||
tailwindcss: {
|
||||
|
||||
@@ -41,6 +41,14 @@
|
||||
</svg>
|
||||
<span>直接解密</span>
|
||||
</NuxtLink>
|
||||
|
||||
<NuxtLink to="/chat"
|
||||
class="group inline-flex items-center px-12 py-4 bg-white text-[#10AEEF] border border-[#10AEEF] rounded-lg text-lg font-medium hover:bg-[#F7F7F7] transform hover:scale-105 transition-all duration-200">
|
||||
<svg class="w-6 h-6 mr-3 transition-transform duration-200" fill="none" stroke="currentColor" viewBox="0 0 24 24">
|
||||
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2.5" d="M8 10h8M8 14h5M4 6h16v12a2 2 0 01-2 2H6a2 2 0 01-2-2V6z"/>
|
||||
</svg>
|
||||
<span>聊天预览</span>
|
||||
</NuxtLink>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
380
generate_config_template.py
Normal file
380
generate_config_template.py
Normal file
@@ -0,0 +1,380 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
生成微信数据库字段配置模板
|
||||
基于实际数据库结构生成JSON模板,供人工填写字段含义
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import json
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any
|
||||
from collections import defaultdict
|
||||
import re
|
||||
|
||||
class ConfigTemplateGenerator:
|
||||
"""配置模板生成器"""
|
||||
|
||||
def __init__(self, databases_path: str = "output/databases"):
|
||||
"""初始化生成器
|
||||
|
||||
Args:
|
||||
databases_path: 数据库文件路径
|
||||
"""
|
||||
self.databases_path = Path(databases_path)
|
||||
self.template_structure = {}
|
||||
|
||||
def connect_database(self, db_path: Path) -> sqlite3.Connection:
|
||||
"""连接SQLite数据库"""
|
||||
try:
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
return conn
|
||||
except Exception as e:
|
||||
print(f"连接数据库失败 {db_path}: {e}")
|
||||
return None
|
||||
|
||||
def detect_similar_table_patterns(self, table_names: List[str]) -> Dict[str, List[str]]:
|
||||
"""检测相似的表名模式(与主脚本逻辑一致)"""
|
||||
patterns = defaultdict(list)
|
||||
|
||||
for table_name in table_names:
|
||||
# 检测 前缀_后缀 模式,其中后缀是32位或更长的哈希字符串
|
||||
if '_' in table_name:
|
||||
parts = table_name.split('_', 1) # 只分割第一个下划线
|
||||
if len(parts) == 2:
|
||||
prefix, suffix = parts
|
||||
# 检查后缀是否像哈希值(长度>=16的十六进制字符串)
|
||||
if len(suffix) >= 16 and all(c in '0123456789abcdefABCDEF' for c in suffix):
|
||||
patterns[prefix].append(table_name)
|
||||
|
||||
# 只返回有多个表的模式
|
||||
return {prefix: tables for prefix, tables in patterns.items() if len(tables) > 1}
|
||||
|
||||
def compare_table_structures(self, conn: sqlite3.Connection, table_names: List[str]) -> Dict[str, Any]:
|
||||
"""比较多个表的结构是否相同(与主脚本逻辑一致)"""
|
||||
if not table_names:
|
||||
return {'are_identical': False, 'representative_table': None}
|
||||
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
structures = {}
|
||||
|
||||
# 获取每个表的结构
|
||||
for table_name in table_names:
|
||||
try:
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
# 标准化字段信息用于比较
|
||||
structure = []
|
||||
for col in columns:
|
||||
structure.append({
|
||||
'name': col[1],
|
||||
'type': col[2].upper(), # 统一大小写
|
||||
'notnull': col[3],
|
||||
'pk': col[5]
|
||||
})
|
||||
|
||||
structures[table_name] = structure
|
||||
except Exception as e:
|
||||
print(f"获取表结构失败 {table_name}: {e}")
|
||||
continue
|
||||
|
||||
if not structures:
|
||||
return {'are_identical': False, 'representative_table': None}
|
||||
|
||||
# 比较所有表结构
|
||||
first_table = list(structures.keys())[0]
|
||||
first_structure = structures[first_table]
|
||||
|
||||
are_identical = True
|
||||
|
||||
for table_name, structure in structures.items():
|
||||
if table_name == first_table:
|
||||
continue
|
||||
|
||||
if len(structure) != len(first_structure):
|
||||
are_identical = False
|
||||
break
|
||||
|
||||
for i, (field1, field2) in enumerate(zip(first_structure, structure)):
|
||||
if field1 != field2:
|
||||
are_identical = False
|
||||
break
|
||||
|
||||
if not are_identical:
|
||||
break
|
||||
|
||||
return {
|
||||
'are_identical': are_identical,
|
||||
'representative_table': first_table,
|
||||
'structure': first_structure,
|
||||
'table_count': len(structures),
|
||||
'table_names': list(structures.keys())
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"比较表结构失败: {e}")
|
||||
return {'are_identical': False, 'representative_table': None}
|
||||
|
||||
def analyze_database_structure(self, db_path: Path) -> Dict[str, Any]:
|
||||
"""分析单个数据库结构"""
|
||||
db_name = db_path.stem
|
||||
print(f"分析数据库结构: {db_name}")
|
||||
|
||||
conn = self.connect_database(db_path)
|
||||
if not conn:
|
||||
return {}
|
||||
|
||||
try:
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取所有表名
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
tables = cursor.fetchall()
|
||||
table_names = [table[0] for table in tables]
|
||||
|
||||
# 检测相似表并分组
|
||||
similar_patterns = self.detect_similar_table_patterns(table_names)
|
||||
processed_tables = set()
|
||||
db_structure = {}
|
||||
|
||||
# 处理相似表组
|
||||
for prefix, pattern_tables in similar_patterns.items():
|
||||
print(f" 检测到相似表模式 {prefix}_*: {len(pattern_tables)} 个表")
|
||||
|
||||
# 比较表结构
|
||||
comparison = self.compare_table_structures(conn, pattern_tables)
|
||||
|
||||
if comparison['are_identical']:
|
||||
print(f" → 表结构完全相同,使用代表表: {comparison['representative_table']}")
|
||||
# 使用模式名作为键,记录代表表的字段
|
||||
representative_table = comparison['representative_table']
|
||||
table_key = f"{prefix}_*" # 使用模式名
|
||||
|
||||
# 获取代表表的字段信息
|
||||
cursor.execute(f"PRAGMA table_info({representative_table})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
fields = {}
|
||||
for col in columns:
|
||||
field_name = col[1]
|
||||
field_type = col[2]
|
||||
fields[field_name] = {
|
||||
"type": field_type,
|
||||
"meaning": "", # 留空供用户填写
|
||||
"notes": f"字段类型: {field_type}"
|
||||
}
|
||||
|
||||
db_structure[table_key] = {
|
||||
"type": "similar_group",
|
||||
"pattern": f"{prefix}_{{hash}}",
|
||||
"table_count": comparison['table_count'],
|
||||
"representative_table": representative_table,
|
||||
"description": "", # 留空供用户填写
|
||||
"fields": fields
|
||||
}
|
||||
|
||||
# 标记这些表已被处理
|
||||
processed_tables.update(pattern_tables)
|
||||
else:
|
||||
print(f" → 表结构不同,保持独立处理")
|
||||
|
||||
# 处理剩余的独立表
|
||||
for table in tables:
|
||||
table_name = table[0]
|
||||
|
||||
if table_name in processed_tables:
|
||||
continue
|
||||
|
||||
try:
|
||||
# 获取表字段信息
|
||||
cursor.execute(f"PRAGMA table_info({table_name})")
|
||||
columns = cursor.fetchall()
|
||||
|
||||
fields = {}
|
||||
for col in columns:
|
||||
field_name = col[1]
|
||||
field_type = col[2]
|
||||
fields[field_name] = {
|
||||
"type": field_type,
|
||||
"meaning": "", # 留空供用户填写
|
||||
"notes": f"字段类型: {field_type}"
|
||||
}
|
||||
|
||||
db_structure[table_name] = {
|
||||
"type": "table",
|
||||
"description": "", # 留空供用户填写
|
||||
"fields": fields
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f" 处理表 {table_name} 失败: {e}")
|
||||
continue
|
||||
|
||||
return db_structure
|
||||
|
||||
except Exception as e:
|
||||
print(f"分析数据库失败 {db_name}: {e}")
|
||||
return {}
|
||||
finally:
|
||||
conn.close()
|
||||
|
||||
def generate_template(self, output_file: str = "wechat_db_config_template.json"):
|
||||
"""生成配置模板"""
|
||||
print("开始生成微信数据库配置模板...")
|
||||
|
||||
# 定义要排除的数据库模式和描述
|
||||
excluded_patterns = {
|
||||
r'biz_message_\d+\.db$': '企业微信聊天记录数据库',
|
||||
r'bizchat\.db$': '企业微信联系人数据库',
|
||||
r'contact_fts\.db$': '搜索联系人数据库',
|
||||
r'favorite_fts\.db$': '搜索收藏数据库'
|
||||
}
|
||||
|
||||
# 查找所有数据库文件
|
||||
all_db_files = []
|
||||
for account_dir in self.databases_path.iterdir():
|
||||
if account_dir.is_dir():
|
||||
for db_file in account_dir.glob("*.db"):
|
||||
all_db_files.append(db_file)
|
||||
|
||||
print(f"找到 {len(all_db_files)} 个数据库文件")
|
||||
|
||||
# 过滤数据库文件
|
||||
db_files = []
|
||||
excluded_files = []
|
||||
|
||||
for db_file in all_db_files:
|
||||
db_filename = db_file.name
|
||||
excluded_info = None
|
||||
|
||||
for pattern, description in excluded_patterns.items():
|
||||
if re.match(pattern, db_filename):
|
||||
excluded_files.append((db_file, description))
|
||||
excluded_info = description
|
||||
break
|
||||
|
||||
if excluded_info is None:
|
||||
db_files.append(db_file)
|
||||
|
||||
# 显示排除的数据库
|
||||
if excluded_files:
|
||||
print(f"\n排除以下数据库文件({len(excluded_files)} 个):")
|
||||
for excluded_file, description in excluded_files:
|
||||
print(f" - {excluded_file.name} ({description})")
|
||||
|
||||
print(f"\n实际处理 {len(db_files)} 个数据库文件")
|
||||
|
||||
# 过滤message数据库,只保留倒数第二个(与主脚本逻辑一致)
|
||||
message_numbered_dbs = []
|
||||
message_other_dbs = []
|
||||
|
||||
for db in db_files:
|
||||
if re.match(r'message_\d+$', db.stem): # message_{数字}.db
|
||||
message_numbered_dbs.append(db)
|
||||
elif db.stem.startswith('message_'): # message_fts.db, message_resource.db等
|
||||
message_other_dbs.append(db)
|
||||
|
||||
if len(message_numbered_dbs) > 1:
|
||||
# 按数字编号排序(提取数字进行排序)
|
||||
message_numbered_dbs.sort(key=lambda x: int(re.search(r'message_(\d+)', x.stem).group(1)))
|
||||
# 选择倒数第二个(按编号排序)
|
||||
selected_message_db = message_numbered_dbs[-2] # 倒数第二个
|
||||
print(f"检测到 {len(message_numbered_dbs)} 个message_{{数字}}.db数据库")
|
||||
print(f"选择倒数第二个: {selected_message_db.name}")
|
||||
|
||||
# 从db_files中移除其他message_{数字}.db数据库,但保留message_fts.db等
|
||||
db_files = [db for db in db_files if not re.match(r'message_\d+$', db.stem)]
|
||||
db_files.append(selected_message_db)
|
||||
|
||||
print(f"实际分析 {len(db_files)} 个数据库文件")
|
||||
|
||||
# 生成模板结构
|
||||
template = {
|
||||
"_metadata": {
|
||||
"description": "微信数据库字段配置模板",
|
||||
"version": "1.0",
|
||||
"instructions": {
|
||||
"zh": "请为每个字段的 'meaning' 填入准确的中文含义,'description' 填入数据库/表的功能描述",
|
||||
"en": "Please fill in accurate Chinese meanings for each field's 'meaning' and functional descriptions for 'description'"
|
||||
},
|
||||
"database_count": len(db_files),
|
||||
"generated_time": __import__('datetime').datetime.now().isoformat()
|
||||
},
|
||||
"databases": {}
|
||||
}
|
||||
|
||||
# 分析每个数据库
|
||||
for db_file in db_files:
|
||||
db_structure = self.analyze_database_structure(db_file)
|
||||
if db_structure:
|
||||
template["databases"][db_file.stem] = {
|
||||
"description": "", # 留空供用户填写
|
||||
"file_size": db_file.stat().st_size,
|
||||
"tables": db_structure
|
||||
}
|
||||
|
||||
# 添加额外的配置项
|
||||
template["message_types"] = {
|
||||
"_instructions": "消息类型映射 - 格式: 'Type,SubType': '含义描述'",
|
||||
"examples": {
|
||||
"1,0": "文本消息",
|
||||
"3,0": "图片消息",
|
||||
"34,0": "语音消息"
|
||||
}
|
||||
}
|
||||
|
||||
template["friend_types"] = {
|
||||
"_instructions": "好友类型映射 - 格式: 'TypeCode': '类型描述'",
|
||||
"examples": {
|
||||
"1": "好友",
|
||||
"2": "微信群",
|
||||
"3": "好友"
|
||||
}
|
||||
}
|
||||
|
||||
# 写入模板文件
|
||||
output_path = Path(output_file)
|
||||
with open(output_path, 'w', encoding='utf-8') as f:
|
||||
json.dump(template, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"\n配置模板生成完成: {output_file}")
|
||||
print(f" - 数据库数量: {len(template['databases'])}")
|
||||
|
||||
# 统计信息
|
||||
total_tables = 0
|
||||
total_fields = 0
|
||||
similar_groups = 0
|
||||
|
||||
for db_name, db_info in template["databases"].items():
|
||||
db_tables = len(db_info["tables"])
|
||||
total_tables += db_tables
|
||||
|
||||
for table_name, table_info in db_info["tables"].items():
|
||||
if table_info["type"] == "similar_group":
|
||||
similar_groups += 1
|
||||
total_fields += len(table_info["fields"])
|
||||
|
||||
print(f" - 表数量: {total_tables}")
|
||||
print(f" - 相似表组: {similar_groups}")
|
||||
print(f" - 字段总数: {total_fields}")
|
||||
|
||||
# 显示完成统计信息
|
||||
if excluded_files:
|
||||
print(f"\n生成完成统计:")
|
||||
print(f" - 成功处理: {len(template['databases'])} 个数据库")
|
||||
print(f" - 排除数据库: {len(excluded_files)} 个")
|
||||
print(f" - 排除原因: 个人微信数据分析不需要企业微信和搜索索引数据")
|
||||
|
||||
print(f"\n请编辑 {output_file} 文件,填入准确的字段含义和描述")
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("微信数据库配置模板生成器")
|
||||
print("=" * 50)
|
||||
|
||||
generator = ConfigTemplateGenerator()
|
||||
generator.generate_template()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
16
main.py
16
main.py
@@ -9,6 +9,8 @@
|
||||
"""
|
||||
|
||||
import uvicorn
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
def main():
|
||||
"""启动微信解密工具API服务"""
|
||||
@@ -21,12 +23,24 @@ def main():
|
||||
print("按 Ctrl+C 停止服务")
|
||||
print("=" * 60)
|
||||
|
||||
repo_root = Path(__file__).resolve().parent
|
||||
enable_reload = os.environ.get("WECHAT_TOOL_RELOAD", "0") == "1"
|
||||
|
||||
# 启动API服务
|
||||
uvicorn.run(
|
||||
"wechat_decrypt_tool.api:app",
|
||||
host="0.0.0.0",
|
||||
port=8000,
|
||||
reload=True,
|
||||
reload=enable_reload,
|
||||
reload_dirs=[str(repo_root / "src")] if enable_reload else None,
|
||||
reload_excludes=[
|
||||
"output/*",
|
||||
"output/**",
|
||||
"frontend/*",
|
||||
"frontend/**",
|
||||
".venv/*",
|
||||
".venv/**",
|
||||
] if enable_reload else None,
|
||||
log_level="info"
|
||||
)
|
||||
|
||||
|
||||
@@ -12,6 +12,7 @@ python wechat_decrypt.py
|
||||
import hashlib
|
||||
import hmac
|
||||
import os
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
@@ -383,6 +384,29 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
|
||||
account_output_dir.mkdir(parents=True, exist_ok=True)
|
||||
logger.info(f"账号 {account_name} 输出目录: {account_output_dir}")
|
||||
|
||||
try:
|
||||
source_db_storage_path = str(db_storage_path or "")
|
||||
wxid_dir = ""
|
||||
if db_storage_path:
|
||||
sp = Path(db_storage_path)
|
||||
if sp.name.lower() == "db_storage":
|
||||
wxid_dir = str(sp.parent)
|
||||
else:
|
||||
wxid_dir = str(sp)
|
||||
(account_output_dir / "_source.json").write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"db_storage_path": source_db_storage_path,
|
||||
"wxid_dir": wxid_dir,
|
||||
},
|
||||
ensure_ascii=False,
|
||||
indent=2,
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
account_success = 0
|
||||
account_processed = []
|
||||
account_failed = []
|
||||
|
||||
111
test_databases.py
Normal file
111
test_databases.py
Normal file
@@ -0,0 +1,111 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
测试数据库文件的可读性和数据内容
|
||||
"""
|
||||
|
||||
import sqlite3
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
def test_database(db_path):
|
||||
"""测试单个数据库文件"""
|
||||
db_name = db_path.name
|
||||
print(f"\n=== 测试数据库: {db_name} ===")
|
||||
|
||||
try:
|
||||
# 检查文件大小
|
||||
file_size = db_path.stat().st_size
|
||||
print(f"文件大小: {file_size:,} 字节")
|
||||
|
||||
if file_size == 0:
|
||||
print("❌ 文件为空")
|
||||
return False
|
||||
|
||||
# 尝试连接数据库
|
||||
conn = sqlite3.connect(str(db_path))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# 获取所有表名
|
||||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
|
||||
tables = cursor.fetchall()
|
||||
print(f"表数量: {len(tables)}")
|
||||
|
||||
if len(tables) == 0:
|
||||
print("❌ 没有表")
|
||||
conn.close()
|
||||
return False
|
||||
|
||||
# 检查每个表的数据量
|
||||
table_with_data = 0
|
||||
total_rows = 0
|
||||
|
||||
for table in tables:
|
||||
table_name = table[0]
|
||||
try:
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
||||
row_count = cursor.fetchone()[0]
|
||||
total_rows += row_count
|
||||
if row_count > 0:
|
||||
table_with_data += 1
|
||||
print(f" ✅ {table_name}: {row_count:,} 行")
|
||||
else:
|
||||
print(f" ❌ {table_name}: 0 行")
|
||||
except Exception as e:
|
||||
print(f" ⚠️ {table_name}: 查询失败 - {e}")
|
||||
|
||||
print(f"有数据的表: {table_with_data}/{len(tables)}")
|
||||
print(f"总数据行数: {total_rows:,}")
|
||||
|
||||
conn.close()
|
||||
|
||||
if total_rows > 0:
|
||||
print("✅ 数据库可用")
|
||||
return True
|
||||
else:
|
||||
print("❌ 数据库无数据")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ 数据库连接失败: {e}")
|
||||
return False
|
||||
|
||||
def main():
|
||||
"""主函数"""
|
||||
print("微信数据库文件测试工具")
|
||||
print("=" * 50)
|
||||
|
||||
databases_path = Path("output/databases")
|
||||
if not databases_path.exists():
|
||||
print("❌ 数据库目录不存在")
|
||||
return
|
||||
|
||||
# 查找所有数据库文件
|
||||
db_files = []
|
||||
for account_dir in databases_path.iterdir():
|
||||
if account_dir.is_dir():
|
||||
for db_file in account_dir.glob("*.db"):
|
||||
db_files.append(db_file)
|
||||
|
||||
print(f"找到 {len(db_files)} 个数据库文件")
|
||||
|
||||
available_dbs = []
|
||||
empty_dbs = []
|
||||
error_dbs = []
|
||||
|
||||
for db_file in sorted(db_files):
|
||||
result = test_database(db_file)
|
||||
if result:
|
||||
available_dbs.append(db_file.name)
|
||||
elif db_file.stat().st_size == 0:
|
||||
empty_dbs.append(db_file.name)
|
||||
else:
|
||||
error_dbs.append(db_file.name)
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("测试结果总结:")
|
||||
print(f"✅ 可用数据库 ({len(available_dbs)}): {', '.join(available_dbs) if available_dbs else '无'}")
|
||||
print(f"❌ 空数据库 ({len(empty_dbs)}): {', '.join(empty_dbs) if empty_dbs else '无'}")
|
||||
print(f"⚠️ 问题数据库 ({len(error_dbs)}): {', '.join(error_dbs) if error_dbs else '无'}")
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user