diff --git a/.gitignore b/.gitignore index 1e1db13..ba0176e 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ wheels/ /output/ /.idea /.history/ +/.augment/ diff --git a/src/wechat_decrypt_tool/api.py b/src/wechat_decrypt_tool/api.py index a6bd21c..e21ce04 100644 --- a/src/wechat_decrypt_tool/api.py +++ b/src/wechat_decrypt_tool/api.py @@ -1,12 +1,19 @@ """微信解密工具的FastAPI Web服务器""" +import time from typing import Optional -from fastapi import FastAPI, HTTPException + +from fastapi import FastAPI, HTTPException, Request from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel +from .logging_config import setup_logging, get_logger from .wechat_decrypt import decrypt_wechat_databases +# 初始化日志系统 +setup_logging() +logger = get_logger(__name__) + app = FastAPI( title="微信数据库解密工具", @@ -24,6 +31,26 @@ app.add_middleware( ) +@app.middleware("http") +async def log_requests(request: Request, call_next): + """记录所有HTTP请求的中间件""" + start_time = time.time() + + # 记录请求开始 + logger.info(f"请求开始: {request.method} {request.url}") + + # 处理请求 + response = await call_next(request) + + # 计算处理时间 + process_time = time.time() - start_time + + # 记录请求完成 + logger.info(f"请求完成: {request.method} {request.url} - 状态码: {response.status_code} - 耗时: {process_time:.3f}s") + + return response + + class DecryptRequest(BaseModel): """解密请求模型""" key: str @@ -36,6 +63,7 @@ class DecryptRequest(BaseModel): @app.get("/", summary="根端点") async def root(): """根端点""" + logger.info("访问根端点") return {"message": "微信数据库解密工具 API"} @@ -45,10 +73,11 @@ async def root(): @app.get("/api/wechat-detection", summary="详细检测微信安装信息") async def detect_wechat_detailed(): """详细检测微信安装信息,包括版本、路径、消息目录等。""" + logger.info("开始执行微信检测") try: from .wechat_detection import detect_wechat_installation info = detect_wechat_installation() - + # 添加一些统计信息 stats = { 'total_databases': len(info['databases']), @@ -57,13 +86,16 @@ async def detect_wechat_detailed(): 'has_wechat_installed': info['wechat_install_path'] is not None, 'detection_time': __import__('datetime').datetime.now().isoformat() } - + + logger.info(f"微信检测完成: 检测到 {stats['total_user_accounts']} 个账户, {stats['total_databases']} 个数据库") + return { 'status': 'success', 'data': info, 'statistics': stats } except Exception as e: + logger.error(f"微信检测失败: {str(e)}") return { 'status': 'error', 'error': str(e), @@ -85,9 +117,11 @@ async def decrypt_databases(request: DecryptRequest): 如果不提供db_storage_path,将自动检测所有微信数据库 """ + logger.info(f"开始解密请求: db_storage_path={request.db_storage_path}") try: # 验证密钥格式 if not request.key or len(request.key) != 64: + logger.warning(f"密钥格式无效: 长度={len(request.key) if request.key else 0}") raise HTTPException(status_code=400, detail="密钥格式无效,必须是64位十六进制字符串") # 使用新的解密API @@ -97,8 +131,11 @@ async def decrypt_databases(request: DecryptRequest): ) if results["status"] == "error": + logger.error(f"解密失败: {results['message']}") raise HTTPException(status_code=400, detail=results["message"]) + logger.info(f"解密完成: 成功 {results['successful_count']}/{results['total_databases']} 个数据库") + return { "status": "completed" if results["status"] == "success" else "failed", "total_databases": results["total_databases"], @@ -111,6 +148,7 @@ async def decrypt_databases(request: DecryptRequest): } except Exception as e: + logger.error(f"解密API异常: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) @@ -120,6 +158,7 @@ async def decrypt_databases(request: DecryptRequest): @app.get("/api/health", summary="健康检查端点") async def health_check(): """健康检查端点""" + logger.debug("健康检查请求") return {"status": "healthy", "service": "微信解密工具"} diff --git a/src/wechat_decrypt_tool/logging_config.py b/src/wechat_decrypt_tool/logging_config.py new file mode 100644 index 0000000..4635353 --- /dev/null +++ b/src/wechat_decrypt_tool/logging_config.py @@ -0,0 +1,159 @@ +""" +统一的日志配置模块 +""" + +import logging +import sys +from datetime import datetime +from pathlib import Path +from typing import Optional + + +class ColoredFormatter(logging.Formatter): + """彩色日志格式化器""" + + # ANSI颜色代码 + COLORS = { + 'DEBUG': '\033[36m', # 青色 + 'INFO': '\033[32m', # 绿色 + 'WARNING': '\033[33m', # 黄色 + 'ERROR': '\033[31m', # 红色 + 'CRITICAL': '\033[35m', # 紫色 + 'RESET': '\033[0m' # 重置 + } + + def format(self, record): + # 获取原始格式化的消息 + formatted = super().format(record) + + # 只在控制台输出时添加颜色 + if hasattr(sys.stderr, 'isatty') and sys.stderr.isatty(): + level_color = self.COLORS.get(record.levelname, self.COLORS['RESET']) + reset_color = self.COLORS['RESET'] + + # 为日志级别添加颜色 + formatted = formatted.replace( + f' | {record.levelname} | ', + f' | {level_color}{record.levelname}{reset_color} | ' + ) + + return formatted + + +class WeChatLogger: + """微信解密工具统一日志管理器""" + + _instance: Optional['WeChatLogger'] = None + _initialized = False + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + return cls._instance + + def __init__(self): + if not self._initialized: + self.setup_logging() + WeChatLogger._initialized = True + + def setup_logging(self, log_level: str = "INFO"): + """设置日志配置""" + # 创建日志目录 + now = datetime.now() + log_dir = Path("output/logs") / str(now.year) / f"{now.month:02d}" / f"{now.day:02d}" + log_dir.mkdir(parents=True, exist_ok=True) + + # 设置日志文件名 + date_str = now.strftime("%d") + self.log_file = log_dir / f"{date_str}_wechat_tool.log" + + # 清除现有的处理器 + root_logger = logging.getLogger() + for handler in root_logger.handlers[:]: + root_logger.removeHandler(handler) + + # 配置日志格式 + # 文件格式(无颜色) + file_formatter = logging.Formatter( + '%(asctime)s | %(levelname)s | %(name)s | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 控制台格式(有颜色) + console_formatter = ColoredFormatter( + '%(asctime)s | %(levelname)s | %(name)s | %(message)s', + datefmt='%Y-%m-%d %H:%M:%S' + ) + + # 文件处理器 + file_handler = logging.FileHandler(self.log_file, encoding='utf-8') + file_handler.setFormatter(file_formatter) + file_handler.setLevel(getattr(logging, log_level.upper())) + + # 控制台处理器 + console_handler = logging.StreamHandler(sys.stdout) + console_handler.setFormatter(console_formatter) + console_handler.setLevel(getattr(logging, log_level.upper())) + + # 配置根日志器 + root_logger.setLevel(getattr(logging, log_level.upper())) + root_logger.addHandler(file_handler) + root_logger.addHandler(console_handler) + + # 只为uvicorn日志器添加文件处理器,保持其原有的控制台处理器(带颜色) + uvicorn_logger = logging.getLogger("uvicorn") + uvicorn_logger.addHandler(file_handler) + uvicorn_logger.setLevel(getattr(logging, log_level.upper())) + + # 只为uvicorn.access日志器添加文件处理器 + uvicorn_access_logger = logging.getLogger("uvicorn.access") + uvicorn_access_logger.addHandler(file_handler) + uvicorn_access_logger.setLevel(getattr(logging, log_level.upper())) + + # 只为uvicorn.error日志器添加文件处理器 + uvicorn_error_logger = logging.getLogger("uvicorn.error") + uvicorn_error_logger.addHandler(file_handler) + uvicorn_error_logger.setLevel(getattr(logging, log_level.upper())) + + # 配置FastAPI日志器 + fastapi_logger = logging.getLogger("fastapi") + fastapi_logger.handlers = [] + fastapi_logger.addHandler(file_handler) + fastapi_logger.addHandler(console_handler) + fastapi_logger.setLevel(getattr(logging, log_level.upper())) + + # 记录初始化信息 + logger = logging.getLogger(__name__) + logger.info("=" * 60) + logger.info("微信解密工具日志系统初始化完成") + logger.info(f"日志文件: {self.log_file}") + logger.info(f"日志级别: {log_level}") + logger.info("=" * 60) + + return self.log_file + + def get_logger(self, name: str) -> logging.Logger: + """获取指定名称的日志器""" + return logging.getLogger(name) + + def get_log_file_path(self) -> Path: + """获取当前日志文件路径""" + return self.log_file + + +def setup_logging(log_level: str = "INFO") -> Path: + """设置日志配置的便捷函数""" + logger_manager = WeChatLogger() + return logger_manager.setup_logging(log_level) + + +def get_logger(name: str) -> logging.Logger: + """获取日志器的便捷函数""" + logger_manager = WeChatLogger() + return logger_manager.get_logger(name) + + +def get_log_file_path() -> Path: + """获取当前日志文件路径的便捷函数""" + logger_manager = WeChatLogger() + return logger_manager.get_log_file_path() diff --git a/src/wechat_decrypt_tool/wechat_decrypt.py b/src/wechat_decrypt_tool/wechat_decrypt.py index 5285712..f7a9f8b 100644 --- a/src/wechat_decrypt_tool/wechat_decrypt.py +++ b/src/wechat_decrypt_tool/wechat_decrypt.py @@ -26,29 +26,16 @@ WECHAT_KEY = None SQLITE_HEADER = b"SQLite format 3\x00" def setup_logging(): - """设置日志配置""" + """设置日志配置 - 已弃用,使用统一的日志配置""" + from .logging_config import setup_logging as unified_setup_logging, get_log_file_path + + # 使用统一的日志配置 + log_file = unified_setup_logging() + log_dir = log_file.parent + import logging - - # 创建日志目录 - now = datetime.now() - log_dir = Path("output/logs") / str(now.year) / f"{now.month:02d}" / f"{now.day:02d}" - log_dir.mkdir(parents=True, exist_ok=True) - - # 设置日志文件名 - date_str = now.strftime("%d") - log_file = log_dir / f"{date_str}_decrypt.log" - - # 配置日志 - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s | %(levelname)s | %(message)s', - handlers=[ - logging.FileHandler(log_file, encoding='utf-8'), - logging.StreamHandler() - ] - ) - - logging.info(f"日志系统初始化完成,日志文件: {log_file}") + logger = logging.getLogger(__name__) + logger.info(f"解密模块日志系统初始化完成,日志文件: {log_file}") return log_dir @@ -72,30 +59,31 @@ class WeChatDatabaseDecryptor: def decrypt_database(self, db_path: str, output_path: str) -> bool: """解密微信4.x版本数据库 - + 使用SQLCipher 4.0参数: - PBKDF2-SHA512, 256000轮迭代 - AES-256-CBC加密 - HMAC-SHA512验证 - 页面大小4096字节 """ - import logging - - logging.info(f"开始解密数据库: {db_path}") + from .logging_config import get_logger + logger = get_logger(__name__) + + logger.info(f"开始解密数据库: {db_path}") try: with open(db_path, 'rb') as f: encrypted_data = f.read() - logging.info(f"读取文件大小: {len(encrypted_data)} bytes") - + logger.info(f"读取文件大小: {len(encrypted_data)} bytes") + if len(encrypted_data) < 4096: - logging.warning(f"文件太小,跳过解密: {db_path}") + logger.warning(f"文件太小,跳过解密: {db_path}") return False - + # 检查是否已经是解密的数据库 if encrypted_data.startswith(SQLITE_HEADER): - logging.info(f"文件已是SQLite格式,直接复制: {db_path}") + logger.info(f"文件已是SQLite格式,直接复制: {db_path}") with open(output_path, 'wb') as f: f.write(encrypted_data) return True @@ -152,7 +140,7 @@ class WeChatDatabaseDecryptor: page_num = cur_page + 1 # 页面编号从1开始 if len(page) < page_size: - logging.warning(f"页面 {page_num} 大小不足: {len(page)} bytes") + logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes") break # 确定偏移量:第一页(cur_page == 0)需要跳过salt @@ -174,7 +162,7 @@ class WeChatDatabaseDecryptor: expected_hmac = mac.digest() if stored_hmac != expected_hmac: - logging.warning(f"页面 {page_num} HMAC验证失败") + logger.warning(f"页面 {page_num} HMAC验证失败") failed_pages += 1 continue @@ -199,21 +187,21 @@ class WeChatDatabaseDecryptor: successful_pages += 1 except Exception as e: - logging.error(f"页面 {page_num} AES解密失败: {e}") + logger.error(f"页面 {page_num} AES解密失败: {e}") failed_pages += 1 continue - - logging.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages} 页") - + + logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages} 页") + # 写入解密后的文件 with open(output_path, 'wb') as f: f.write(decrypted_data) - - logging.info(f"解密文件大小: {len(decrypted_data)} bytes") + + logger.info(f"解密文件大小: {len(decrypted_data)} bytes") return True - + except Exception as e: - logging.error(f"解密失败: {db_path}, 错误: {e}") + logger.error(f"解密失败: {db_path}, 错误: {e}") return False def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> dict: @@ -238,10 +226,10 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di "failed_files": ["解密失败的文件列表"] } """ - import logging + from .logging_config import get_logger - # 初始化日志系统 - setup_logging() + # 获取日志器 + logger = get_logger(__name__) # 验证密钥是否提供 if not key: @@ -258,25 +246,58 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di decrypt_key = key - logging.info("=" * 60) - logging.info("微信4.x数据库解密工具 - API模式") - logging.info("=" * 60) + logger.info("=" * 60) + logger.info("微信4.x数据库解密工具 - API模式") + logger.info("=" * 60) - # 创建输出目录 - output_dir = Path("output/databases") - output_dir.mkdir(parents=True, exist_ok=True) - logging.info(f"输出目录: {output_dir.absolute()}") + # 创建基础输出目录 + base_output_dir = Path("output/databases") + base_output_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"基础输出目录: {base_output_dir.absolute()}") + + # 查找数据库文件并按账号组织 + account_databases = {} # {account_name: [db_info, ...]} - # 查找数据库文件 if db_storage_path: # 使用指定路径查找数据库 - database_paths = [] storage_path = Path(db_storage_path) if storage_path.exists(): + # 尝试从路径中提取账号名 + account_name = "unknown_account" + path_parts = storage_path.parts + + # 常见的微信账号格式模式 + account_patterns = ['wxid_'] + + for part in path_parts: + # 检查是否匹配已知的账号格式 + for pattern in account_patterns: + if part.startswith(pattern): + account_name = part.split('_')[0] if '_' in part else part + break + if account_name != "unknown_account": + break + + # 如果没有匹配到已知格式,使用包含数据库的目录名 + if account_name == "unknown_account": + # 查找包含db_storage的父目录作为账号名 + for part in reversed(path_parts): + if part != "db_storage" and len(part) > 3: + account_name = part + break + + databases = [] for db_file in storage_path.glob("*.db"): if db_file.is_file() and db_file.name != 'key_info.db': - database_paths.append(str(db_file)) - logging.info(f"在指定路径找到 {len(database_paths)} 个数据库文件") + databases.append({ + 'path': str(db_file), + 'name': db_file.name, + 'account': account_name + }) + + if databases: + account_databases[account_name] = databases + logger.info(f"在指定路径找到账号 {account_name} 的 {len(databases)} 个数据库文件") else: return { "status": "error", @@ -284,90 +305,135 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di "total_databases": 0, "successful_count": 0, "failed_count": 0, - "output_directory": str(output_dir.absolute()), + "output_directory": str(base_output_dir.absolute()), "processed_files": [], "failed_files": [] } else: - # 使用检测函数获取数据库列表 + # 使用检测函数获取数据库列表,按账号组织 try: from .wechat_detection import detect_wechat_installation wechat_info = detect_wechat_installation() - if wechat_info and wechat_info.get('databases'): - database_paths = [db['path'] for db in wechat_info['databases']] - logging.info(f"通过检测函数找到 {len(database_paths)} 个数据库文件") - else: - database_paths = [] - logging.warning("检测函数未找到数据库文件") - except Exception as e: - logging.error(f"检测函数调用失败: {e}") - database_paths = [] + if wechat_info and wechat_info.get('accounts'): + for account in wechat_info['accounts']: + account_name = account['account_name'] + databases = [] + for db in account['databases']: + databases.append({ + 'path': db['path'], + 'name': db['name'], + 'account': account_name + }) + if databases: + account_databases[account_name] = databases - if not database_paths: + total_dbs = sum(len(dbs) for dbs in account_databases.values()) + logger.info(f"通过检测函数找到 {len(account_databases)} 个账号的 {total_dbs} 个数据库文件") + else: + logger.warning("检测函数未找到数据库文件") + except Exception as e: + logger.error(f"检测函数调用失败: {e}") + + if not account_databases: return { "status": "error", "message": "未找到微信数据库文件!请确保微信已安装并有数据,或提供正确的db_storage路径", "total_databases": 0, "successful_count": 0, "failed_count": 0, - "output_directory": str(output_dir.absolute()), + "output_directory": str(base_output_dir.absolute()), "processed_files": [], "failed_files": [] } + # 计算总数据库数量 + total_databases = sum(len(dbs) for dbs in account_databases.values()) + # 创建解密器 try: decryptor = WeChatDatabaseDecryptor(decrypt_key) - logging.info("解密器初始化成功") + logger.info("解密器初始化成功") except ValueError as e: return { "status": "error", "message": f"密钥错误: {e}", - "total_databases": len(database_paths), + "total_databases": total_databases, "successful_count": 0, "failed_count": 0, - "output_directory": str(output_dir.absolute()), + "output_directory": str(base_output_dir.absolute()), "processed_files": [], "failed_files": [] } - # 批量解密 + # 按账号批量解密 success_count = 0 - total_count = len(database_paths) processed_files = [] failed_files = [] + account_results = {} - for db_path in database_paths: - # 生成输出文件名 - db_name = os.path.basename(db_path) - output_path = output_dir / f"decrypted_{db_name}" + for account_name, databases in account_databases.items(): + logger.info(f"开始解密账号 {account_name} 的 {len(databases)} 个数据库") - # 解密数据库 - if decryptor.decrypt_database(db_path, str(output_path)): - success_count += 1 - processed_files.append(str(output_path)) - else: - failed_files.append(db_path) - logging.error(f"解密失败: {db_path}") + # 为每个账号创建专门的输出目录 + account_output_dir = base_output_dir / account_name + account_output_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"账号 {account_name} 输出目录: {account_output_dir}") + + account_success = 0 + account_processed = [] + account_failed = [] + + for db_info in databases: + db_path = db_info['path'] + db_name = db_info['name'] + + # 生成输出文件名(保持原始文件名,不添加前缀) + output_path = account_output_dir / db_name + + # 解密数据库 + logger.info(f"解密 {account_name}/{db_name}") + if decryptor.decrypt_database(db_path, str(output_path)): + account_success += 1 + success_count += 1 + account_processed.append(str(output_path)) + processed_files.append(str(output_path)) + logger.info(f"解密成功: {account_name}/{db_name}") + else: + account_failed.append(db_path) + failed_files.append(db_path) + logger.error(f"解密失败: {account_name}/{db_name}") + + # 记录账号解密结果 + account_results[account_name] = { + "total": len(databases), + "success": account_success, + "failed": len(databases) - account_success, + "output_dir": str(account_output_dir), + "processed_files": account_processed, + "failed_files": account_failed + } + + logger.info(f"账号 {account_name} 解密完成: 成功 {account_success}/{len(databases)}") # 返回结果 result = { "status": "success" if success_count > 0 else "error", - "message": f"解密完成: 成功 {success_count}/{total_count}", - "total_databases": total_count, + "message": f"解密完成: 成功 {success_count}/{total_databases}", + "total_databases": total_databases, "successful_count": success_count, - "failed_count": total_count - success_count, - "output_directory": str(output_dir.absolute()), + "failed_count": total_databases - success_count, + "output_directory": str(base_output_dir.absolute()), "processed_files": processed_files, - "failed_files": failed_files + "failed_files": failed_files, + "account_results": account_results # 新增:按账号的详细结果 } - logging.info("=" * 60) - logging.info("解密任务完成!") - logging.info(f"成功: {success_count}/{total_count}") - logging.info(f"失败: {total_count - success_count}/{total_count}") - logging.info(f"输出目录: {output_dir.absolute()}") - logging.info("=" * 60) + logger.info("=" * 60) + logger.info("解密任务完成!") + logger.info(f"成功: {success_count}/{total_databases}") + logger.info(f"失败: {total_databases - success_count}/{total_databases}") + logger.info(f"输出目录: {output_dir.absolute()}") + logger.info("=" * 60) return result diff --git a/src/wechat_decrypt_tool/wechat_detection.py b/src/wechat_decrypt_tool/wechat_detection.py index 3e4b9ba..c336f3f 100644 --- a/src/wechat_decrypt_tool/wechat_detection.py +++ b/src/wechat_decrypt_tool/wechat_detection.py @@ -201,10 +201,10 @@ def auto_detect_wechat_data_dirs(): # 策略1:常见驱动器扫描微信相关目录 common_wechat_patterns = [ - "WeChat Files", "wechat_files", "xwechat_files", "wechatMSG", + "WeChat Files", "wechat_files", "xwechat_files", "wechatMSG", "WeChat", "微信", "Weixin", "wechat" ] - + # 扫描常见驱动器 drives = ['C:', 'D:', 'E:', 'F:'] for drive in drives: @@ -326,30 +326,181 @@ def get_wx_dir_by_reg(wxid="all"): return wx_dir if os.path.exists(wx_dir) else None +def detect_wechat_accounts_from_backup(backup_base_path: str = None) -> List[Dict[str, Any]]: + """ + 从指定的备份路径检测微信账号 + + Args: + backup_base_path: 微信文件基础路径,如果为None则自动检测 + + Returns: + 账号信息列表,每个账号包含: + - account_name: 账号名 + - backup_dir: 备份目录路径 + - data_dir: 实际数据目录路径 + - databases: 数据库文件列表 + """ + accounts = [] + + # 如果没有指定路径,尝试自动检测 + if backup_base_path is None: + # 使用自动检测找到包含Backup的路径 + detected_dirs = auto_detect_wechat_data_dirs() + for detected_dir in detected_dirs: + # 首先检查直接的Backup目录 + backup_test_dir = os.path.join(detected_dir, "Backup") + if os.path.exists(backup_test_dir): + backup_base_path = detected_dir + break + + # 然后检查子目录中的Backup目录(如xwechat_files/Backup) + try: + for subdir in os.listdir(detected_dir): + subdir_path = os.path.join(detected_dir, subdir) + if os.path.isdir(subdir_path): + backup_test_dir = os.path.join(subdir_path, "Backup") + if os.path.exists(backup_test_dir): + backup_base_path = subdir_path + break + if backup_base_path: + break + except (PermissionError, OSError): + continue + + # 如果还是没找到,返回空列表 + if backup_base_path is None: + return accounts + + # 检查备份目录 + backup_dir = os.path.join(backup_base_path, "Backup") + if not os.path.exists(backup_dir): + return accounts + + try: + # 遍历备份目录下的所有子文件夹(每个代表一个账号) + for item in os.listdir(backup_dir): + account_backup_path = os.path.join(backup_dir, item) + if not os.path.isdir(account_backup_path): + continue + + account_name = item + + # 在上级目录中查找对应的实际数据文件夹 + # 命名规则:{账号名}_{随机字符} + data_dir = None + try: + for data_item in os.listdir(backup_base_path): + data_item_path = os.path.join(backup_base_path, data_item) + if (os.path.isdir(data_item_path) and + data_item.startswith(f"{account_name}_") and + data_item != "Backup"): + data_dir = data_item_path + break + except (PermissionError, OSError): + continue + + # 收集该账号的数据库文件 + databases = [] + if data_dir and os.path.exists(data_dir): + databases = collect_account_databases(data_dir, account_name) + + account_info = { + "account_name": account_name, + "backup_dir": account_backup_path, + "data_dir": data_dir, + "databases": databases, + "database_count": len(databases) + } + + accounts.append(account_info) + + except (PermissionError, OSError) as e: + # 如果无法访问备份目录,返回空列表 + pass + + return accounts + + +def collect_account_databases(data_dir: str, account_name: str) -> List[Dict[str, Any]]: + """ + 收集指定账号数据目录下的所有数据库文件 + + Args: + data_dir: 账号数据目录 + account_name: 账号名 + + Returns: + 数据库文件信息列表 + """ + databases = [] + + if not os.path.exists(data_dir): + return databases + + try: + # 递归查找所有.db文件 + for root, dirs, files in os.walk(data_dir): + for file_name in files: + if not file_name.endswith('.db'): + continue + + # 排除不需要解密的数据库 + if file_name in ["key_info.db"]: + continue + + db_path = os.path.join(root, file_name) + + # 确定数据库类型 + db_type = re.sub(r'\d*\.db$', '', file_name) + + try: + file_size = os.path.getsize(db_path) + except OSError: + file_size = 0 + + db_info = { + "path": db_path, + "name": file_name, + "type": db_type, + "size": file_size, + "relative_path": os.path.relpath(db_path, data_dir) + } + + databases.append(db_info) + + except (PermissionError, OSError): + pass + + return databases + + def detect_wechat_installation() -> Dict[str, Any]: """ - 检测微信安装情况 - 完全按照PyWxDump的逻辑实现 + 检测微信安装情况 - 改进的多账户检测逻辑 """ result = { "wechat_version": None, "wechat_install_path": None, "wechat_exe_path": None, + "is_running": False, + "accounts": [], + "total_accounts": 0, + "total_databases": 0, + "detection_errors": [], + "detection_methods": [], + # 保持向后兼容性的字段 "wechat_data_dirs": [], "message_dirs": [], "databases": [], - "version_detected": None, - "is_running": False, - "user_accounts": [], - "detection_errors": [], - "detection_methods": [] + "user_accounts": [] } - # 进程检测 - 只检测Weixin.exe(按照用户要求) + # 1. 进程检测 - 检测微信是否运行 result["detection_methods"].append("进程检测") process_list = get_process_list() for pid, process_name in process_list: - # 只检查Weixin.exe进程 + # 检查Weixin.exe进程 if process_name.lower() == 'weixin.exe': try: exe_path = get_process_exe_path(pid) @@ -377,63 +528,98 @@ def detect_wechat_installation() -> Dict[str, Any]: if not result["is_running"]: result["detection_methods"].append("未检测到微信进程") - # 2. 使用自动检测逻辑获取微信目录和数据库 - result["detection_methods"].append("目录自动检测") + # 2. 使用新的账号检测逻辑 + result["detection_methods"].append("多账户检测") try: - wx_dir = get_wx_dir_by_reg() - if wx_dir and os.path.exists(wx_dir): - result["wechat_data_dirs"].append(wx_dir) - result["detection_methods"].append(f"通过自动检测找到微信目录: {wx_dir}") + # 检测指定路径下的微信账号 + accounts = detect_wechat_accounts_from_backup() + result["accounts"] = accounts + result["total_accounts"] = len(accounts) - # 使用PyWxDump的get_wx_db函数获取数据库信息 - db_list = get_wx_db(msg_dir=wx_dir) # 移除db_types限制,获取所有.db文件 + # 统计总数据库数量 + total_db_count = sum(account["database_count"] for account in accounts) + result["total_databases"] = total_db_count - # 统计用户账户和消息目录 - user_accounts_set = set() - message_dirs_set = set() + if accounts: + result["detection_methods"].append(f"在指定路径检测到 {len(accounts)} 个微信账户") + result["detection_methods"].append(f"总计 {total_db_count} 个数据库文件") - for db_info in db_list: - wxid = db_info["wxid"] - wxid_dir = db_info["wxid_dir"] - db_path = db_info["db_path"] - db_type = db_info["db_type"] - - # 添加用户账户 - user_accounts_set.add(wxid) - message_dirs_set.add(wxid_dir) - - # 添加数据库信息 - if os.path.exists(db_path): - result["databases"].append({ - "path": db_path, - "name": os.path.basename(db_path), - "type": db_type, - "size": os.path.getsize(db_path), - "user": wxid, - "user_dir": wxid_dir - }) - - # 转换为列表 - result["user_accounts"] = list(user_accounts_set) - result["message_dirs"] = list(message_dirs_set) - - result["detection_methods"].append(f"检测到 {len(result['user_accounts'])} 个用户账户") - result["detection_methods"].append(f"检测到 {len(result['databases'])} 个数据库文件") - - # 按数据库类型统计 - db_type_count = {} - for db in result["databases"]: - db_type = db["type"] - db_type_count[db_type] = db_type_count.get(db_type, 0) + 1 - - if db_type_count: - type_summary = ", ".join([f"{k}({v})" for k, v in db_type_count.items()]) - result["detection_methods"].append(f"数据库类型分布: {type_summary}") + # 为每个账户添加详细信息 + for account in accounts: + account_name = account["account_name"] + db_count = account["database_count"] + data_dir_status = "已找到" if account["data_dir"] else "未找到" + result["detection_methods"].append(f"账户 {account_name}: {db_count} 个数据库, 数据目录{data_dir_status}") else: - result["detection_methods"].append("自动检测未找到微信目录") - except Exception as e: - result["detection_errors"].append(f"目录检测失败: {str(e)}") + result["detection_methods"].append("未在指定路径检测到微信账户") + # 填充向后兼容性字段 + for account in accounts: + if account["data_dir"]: + result["wechat_data_dirs"].append(account["data_dir"]) + result["message_dirs"].append(account["data_dir"]) + result["user_accounts"].append(account["account_name"]) + + # 添加数据库到兼容性列表 + for db in account["databases"]: + result["databases"].append({ + "path": db["path"], + "name": db["name"], + "type": db["type"], + "size": db["size"], + "user": account["account_name"], + "user_dir": account["data_dir"] + }) + + except Exception as e: + result["detection_errors"].append(f"账户检测失败: {str(e)}") + + # 3. 如果新检测方法没有找到账户,尝试旧的检测方法作为备用 + if not result["accounts"]: + result["detection_methods"].append("备用检测方法") + try: + wx_dir = get_wx_dir_by_reg() + if wx_dir and os.path.exists(wx_dir): + result["wechat_data_dirs"].append(wx_dir) + result["detection_methods"].append(f"通过备用方法找到微信目录: {wx_dir}") + + # 使用旧的检测逻辑 + db_list = get_wx_db(msg_dir=wx_dir) + + # 按账户组织数据库 + account_db_map = {} + for db_info in db_list: + wxid = db_info["wxid"] + if wxid not in account_db_map: + account_db_map[wxid] = { + "account_name": wxid, + "backup_dir": None, + "data_dir": db_info["wxid_dir"], + "databases": [], + "database_count": 0 + } + + if os.path.exists(db_info["db_path"]): + db_entry = { + "path": db_info["db_path"], + "name": os.path.basename(db_info["db_path"]), + "type": db_info["db_type"], + "size": os.path.getsize(db_info["db_path"]), + "relative_path": os.path.relpath(db_info["db_path"], db_info["wxid_dir"]) + } + account_db_map[wxid]["databases"].append(db_entry) + account_db_map[wxid]["database_count"] += 1 + + result["accounts"] = list(account_db_map.values()) + result["total_accounts"] = len(result["accounts"]) + result["total_databases"] = sum(account["database_count"] for account in result["accounts"]) + + result["detection_methods"].append(f"备用方法检测到 {result['total_accounts']} 个账户") + result["detection_methods"].append(f"总计 {result['total_databases']} 个数据库文件") + else: + result["detection_methods"].append("备用检测方法未找到微信目录") + except Exception as e: + result["detection_errors"].append(f"备用检测失败: {str(e)}") return result