fix(api): 修复解密校验逻辑

This commit is contained in:
2977094657
2025-07-24 18:03:38 +08:00
parent 701a05e041
commit 0b12e31c96
2 changed files with 256 additions and 47 deletions

View File

@@ -1,11 +1,15 @@
"""微信解密工具的FastAPI Web服务器"""
import time
from typing import Optional
import re
import json
import os
from typing import Optional, Callable
from fastapi import FastAPI, HTTPException, Request
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from pydantic import BaseModel, Field
from .logging_config import setup_logging, get_logger
from .wechat_decrypt import decrypt_wechat_databases
@@ -15,12 +19,210 @@ setup_logging()
logger = get_logger(__name__)
class PathFixRequest(Request):
"""自定义Request类自动修复JSON中的路径问题并检测相对路径"""
def _is_absolute_path(self, path: str) -> bool:
"""检测是否为绝对路径支持Windows、macOS、Linux"""
if not path:
return False
# Windows绝对路径以盘符开头 (C:\, D:\, etc.)
if re.match(r'^[A-Za-z]:[/\\]', path):
return True
# Unix-like系统绝对路径以 / 开头
if path.startswith('/'):
return True
return False
def _validate_paths_in_json(self, json_data: dict) -> Optional[str]:
"""验证JSON中的路径返回错误信息如果有"""
logger.info(f"开始验证路径JSON数据: {json_data}")
# 检查db_storage_path字段现在是必需的
if 'db_storage_path' not in json_data:
return "缺少必需的db_storage_path参数请提供具体的数据库存储路径。"
if 'db_storage_path' in json_data:
path = json_data['db_storage_path']
# 检查路径是否为空
if not path or not path.strip():
return "db_storage_path参数不能为空请提供具体的数据库存储路径。"
logger.info(f"检查路径: {path}")
is_absolute = self._is_absolute_path(path)
logger.info(f"是否为绝对路径: {is_absolute}")
if not is_absolute:
error_msg = f"请提供绝对路径,当前输入的是相对路径: {path}\n" \
f"Windows绝对路径示例: D:\\wechatMSG\\xwechat_files\\wxid_xxx\\db_storage"
return error_msg
# 检查路径是否存在
logger.info(f"检查路径是否存在: {path}")
path_exists = os.path.exists(path)
logger.info(f"路径存在性: {path_exists}")
if not path_exists:
# 检查父目录
parent_path = os.path.dirname(path)
logger.info(f"检查父目录: {parent_path}")
parent_exists = os.path.exists(parent_path)
logger.info(f"父目录存在性: {parent_exists}")
if parent_exists:
try:
files = os.listdir(parent_path)
logger.info(f"父目录内容: {files}")
error_msg = f"指定的路径不存在: {path}\n" \
f"父目录存在但不包含 'db_storage' 文件夹。\n" \
f"请检查路径是否正确,或确保微信数据已生成。"
except PermissionError:
logger.info(f"无法访问父目录,权限不足")
error_msg = f"指定的路径不存在: {path}\n" \
f"无法访问父目录,可能是权限问题。"
else:
error_msg = f"指定的路径不存在: {path}\n" \
f"父目录也不存在,请检查路径是否正确。"
logger.info(f"返回路径错误: {error_msg}")
return error_msg
else:
logger.info(f"路径存在,使用递归方式检查数据库文件")
try:
# 使用与自动检测相同的逻辑:递归查找.db文件
db_files = []
for root, dirs, files in os.walk(path):
# 只处理db_storage目录下的数据库文件与自动检测逻辑一致
if "db_storage" not in root:
continue
for file_name in files:
if not file_name.endswith(".db"):
continue
# 排除不需要解密的数据库(与自动检测逻辑一致)
if file_name in ["key_info.db"]:
continue
db_path = os.path.join(root, file_name)
db_files.append(db_path)
logger.info(f"递归查找到的数据库文件: {db_files}")
if not db_files:
error_msg = f"路径存在但没有找到有效的数据库文件: {path}\n" \
f"请确保该目录或其子目录包含微信数据库文件(.db文件)。\n" \
f"注意key_info.db文件会被自动排除。"
logger.info(f"返回错误: 递归查找未找到有效.db文件")
return error_msg
logger.info(f"路径验证通过,递归找到{len(db_files)}个有效数据库文件")
except PermissionError:
error_msg = f"无法访问路径: {path}\n" \
f"权限不足,请检查文件夹权限。"
return error_msg
except Exception as e:
logger.warning(f"检查路径内容时出错: {e}")
# 如果无法检查内容,继续执行,让后续逻辑处理
return None
async def body(self) -> bytes:
"""重写body方法预处理JSON中的路径问题"""
body = await super().body()
# 只处理JSON请求
content_type = self.headers.get("content-type", "")
if "application/json" not in content_type:
return body
try:
# 将bytes转换为字符串
body_str = body.decode('utf-8')
# 首先尝试解析JSON以验证路径
try:
json_data = json.loads(body_str)
path_error = self._validate_paths_in_json(json_data)
if path_error:
logger.info(f"检测到路径错误: {path_error}")
# 我们将错误信息存储在请求中,稍后在路由处理器中检查
self.state.path_validation_error = path_error
return body
except json.JSONDecodeError as e:
# JSON格式错误继续尝试修复
logger.info(f"JSON解析失败尝试修复: {e}")
pass
# 使用正则表达式安全地处理Windows路径中的反斜杠
# 需要处理两种情况:
# 1. 以盘符开头的绝对路径D:\path\to\file
# 2. 不以盘符开头的相对路径wechatMSG\xwechat_files\...
# 匹配引号内包含反斜杠的路径(不管是否以盘符开头)
pattern = r'"([^"]*?\\[^"]*?)"'
def fix_path(match):
path = match.group(1)
# 将单个反斜杠替换为双反斜杠,但避免替换已经转义的反斜杠
fixed_path = re.sub(r'(?<!\\)\\(?!\\)', r'\\\\', path)
return f'"{fixed_path}"'
# 应用修复
fixed_body_str = re.sub(pattern, fix_path, body_str)
# 记录修复信息(仅在有修改时)
if fixed_body_str != body_str:
logger.info(f"自动修复JSON路径格式: {body_str[:100]}... -> {fixed_body_str[:100]}...")
# 修复后重新验证路径
try:
json_data = json.loads(fixed_body_str)
logger.info(f"修复后解析JSON成功开始验证路径")
path_error = self._validate_paths_in_json(json_data)
if path_error:
logger.info(f"修复后检测到路径错误: {path_error}")
self.state.path_validation_error = path_error
return fixed_body_str.encode('utf-8')
else:
logger.info(f"修复后路径验证通过")
except json.JSONDecodeError as e:
logger.warning(f"修复后JSON仍然解析失败: {e}")
return fixed_body_str.encode('utf-8')
except Exception as e:
# 如果处理失败返回原始body
logger.warning(f"JSON路径修复失败使用原始请求体: {e}")
return body
class PathFixRoute(APIRoute):
"""自定义APIRoute类使用PathFixRequest并处理路径验证错误"""
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> any:
# 将Request替换为我们的自定义Request
custom_request = PathFixRequest(request.scope, request.receive)
# 检查是否有路径验证错误
if hasattr(custom_request.state, 'path_validation_error'):
from fastapi import HTTPException
raise HTTPException(
status_code=400,
detail=custom_request.state.path_validation_error
)
return await original_route_handler(custom_request)
return custom_route_handler
app = FastAPI(
title="微信数据库解密工具",
description="现代化的微信数据库解密工具,支持微信信息检测和数据库解密功能",
version="0.1.0"
)
# 设置自定义路由类
app.router.route_class = PathFixRoute
# Enable CORS for React frontend
app.add_middleware(
CORSMiddleware,
@@ -53,8 +255,8 @@ async def log_requests(request: Request, call_next):
class DecryptRequest(BaseModel):
"""解密请求模型"""
key: str
db_storage_path: Optional[str] = None # 可选的数据库存储路径,如 ......\{微信id}\db_storage
key: str = Field(..., description="解密密钥64位十六进制字符串")
db_storage_path: str = Field(..., description="数据库存储路径,必须是绝对路径")
@@ -109,13 +311,16 @@ async def detect_wechat_detailed():
@app.post("/api/decrypt", summary="解密微信数据库")
async def decrypt_databases(request: DecryptRequest):
"""使用提供的密钥解密微信数据库
"""使用提供的密钥解密指定账户的微信数据库
参数:
- key: 解密密钥(必选)
- db_storage_path: 数据库存储路径(选),如 ......\\{微信id}\\db_storage
- key: 解密密钥(必选)- 64位十六进制字符串
- db_storage_path: 数据库存储路径(选),如 D:\\wechatMSG\\xwechat_files\\{微信id}\\db_storage
如果不提供db_storage_path将自动检测所有微信数据库
注意:
- 一个密钥只能解密对应账户的数据库
- 必须提供具体的db_storage_path不支持自动检测多账户
- 支持自动处理Windows路径中的反斜杠转义问题
"""
logger.info(f"开始解密请求: db_storage_path={request.db_storage_path}")
try:

View File

@@ -9,25 +9,24 @@ python wechat_decrypt.py
密钥: 请通过参数传入您的解密密钥
"""
import os
import hashlib
import hmac
import os
from pathlib import Path
from datetime import datetime
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
# 默认密钥已移除,请通过参数传入
WECHAT_KEY = None
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
# 注意:不再支持默认密钥,所有密钥必须通过参数传入
# SQLite文件头
SQLITE_HEADER = b"SQLite format 3\x00"
def setup_logging():
"""设置日志配置 - 已弃用,使用统一的日志配置"""
from .logging_config import setup_logging as unified_setup_logging, get_log_file_path
from .logging_config import setup_logging as unified_setup_logging
# 使用统一的日志配置
log_file = unified_setup_logging()
@@ -211,7 +210,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
参数:
db_storage_path: 数据库存储路径,如 ......\\{微信id}\\db_storage
如果为None将自动搜索数据库文件
key: 解密密钥如果为None将使用默认密钥
key: 解密密钥必需参数64位十六进制字符串
返回值:
dict: 解密结果统计信息
@@ -261,6 +260,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
if db_storage_path:
# 使用指定路径查找数据库
storage_path = Path(db_storage_path)
if storage_path.exists():
# 尝试从路径中提取账号名
account_name = "unknown_account"
@@ -273,7 +273,13 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
# 检查是否匹配已知的账号格式
for pattern in account_patterns:
if part.startswith(pattern):
account_name = part.split('_')[0] if '_' in part else part
# 提取主要部分,去掉后面的随机后缀
# 例如wxid_v4mbduwqtzpt22_1e7a -> wxid_v4mbduwqtzpt22
parts = part.split('_')
if len(parts) >= 3: # wxid_主要部分_随机后缀
account_name = '_'.join(parts[:-1]) # 去掉最后一个随机部分
else:
account_name = part # 如果格式不符合预期,保留原名
break
if account_name != "unknown_account":
break
@@ -287,11 +293,21 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
break
databases = []
for db_file in storage_path.glob("*.db"):
if db_file.is_file() and db_file.name != 'key_info.db':
# 使用递归查找,与自动检测逻辑一致
for root, dirs, files in os.walk(storage_path):
# 只处理db_storage目录下的数据库文件
if "db_storage" not in str(root):
continue
for file_name in files:
if not file_name.endswith(".db"):
continue
# 排除不需要解密的数据库
if file_name in ["key_info.db"]:
continue
db_path = os.path.join(root, file_name)
databases.append({
'path': str(db_file),
'name': db_file.name,
'path': db_path,
'name': file_name,
'account': account_name
})
@@ -310,29 +326,17 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed_files": []
}
else:
# 使用检测函数获取数据库列表,按账号组织
try:
from .wechat_detection import detect_wechat_installation
wechat_info = detect_wechat_installation()
if wechat_info and wechat_info.get('accounts'):
for account in wechat_info['accounts']:
account_name = account['account_name']
databases = []
for db in account['databases']:
databases.append({
'path': db['path'],
'name': db['name'],
'account': account_name
})
if databases:
account_databases[account_name] = databases
total_dbs = sum(len(dbs) for dbs in account_databases.values())
logger.info(f"通过检测函数找到 {len(account_databases)} 个账号的 {total_dbs} 个数据库文件")
else:
logger.warning("检测函数未找到数据库文件")
except Exception as e:
logger.error(f"检测函数调用失败: {e}")
# 不再支持自动检测要求用户提供具体的db_storage_path
return {
"status": "error",
"message": "请提供具体的db_storage_path参数。由于一个密钥只能对应一个账户不支持自动检测多账户。",
"total_databases": 0,
"successful_count": 0,
"failed_count": 0,
"output_directory": str(base_output_dir.absolute()),
"processed_files": [],
"failed_files": []
}
if not account_databases:
return {
@@ -432,7 +436,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
logger.info("解密任务完成!")
logger.info(f"成功: {success_count}/{total_databases}")
logger.info(f"失败: {total_databases - success_count}/{total_databases}")
logger.info(f"输出目录: {output_dir.absolute()}")
logger.info(f"输出目录: {base_output_dir.absolute()}")
logger.info("=" * 60)
return result