feat(detection): 添加当前登录账号检测功能

This commit is contained in:
2977094657
2025-08-19 17:10:32 +08:00
parent ce06384f0a
commit a86e8f762f
6 changed files with 488 additions and 28 deletions

View File

@@ -277,8 +277,12 @@ async def detect_wechat_detailed(data_root_path: Optional[str] = None):
"""详细检测微信安装信息,包括版本、路径、消息目录等。"""
logger.info("开始执行微信检测")
try:
from .wechat_detection import detect_wechat_installation
from .wechat_detection import detect_wechat_installation, detect_current_logged_in_account
info = detect_wechat_installation(data_root_path=data_root_path)
# 检测当前登录账号
current_account_info = detect_current_logged_in_account(data_root_path)
info['current_account'] = current_account_info
# 添加一些统计信息
stats = {
@@ -306,6 +310,29 @@ async def detect_wechat_detailed(data_root_path: Optional[str] = None):
}
@app.get("/api/current-account", summary="检测当前登录账号")
async def detect_current_account(data_root_path: Optional[str] = None):
"""检测当前登录的微信账号"""
logger.info("开始检测当前登录账号")
try:
from .wechat_detection import detect_current_logged_in_account
result = detect_current_logged_in_account(data_root_path)
logger.info(f"当前账号检测完成: {result.get('message', '无结果')}")
return {
'status': 'success',
'data': result
}
except Exception as e:
logger.error(f"当前账号检测失败: {str(e)}")
return {
'status': 'error',
'error': str(e),
'data': None
}

View File

@@ -11,6 +11,7 @@ import ctypes
from pathlib import Path
from typing import List, Dict, Any, Union
from ctypes import wintypes
from datetime import datetime
@@ -421,6 +422,135 @@ def detect_wechat_accounts_from_backup(backup_base_path: str = None) -> List[Dic
return accounts
def _resolve_login_paths_from_base(provided_path: str) -> Dict[str, str]:
"""
根据用户提供的路径推断 base_path 和 login_dir。
兼容三种传入:
- 直接传 xwechat_files 根目录
- 传 xwechat_files/all_users
- 传 xwechat_files/all_users/login
"""
base_path = provided_path
login_dir = os.path.join(provided_path, "all_users", "login")
try:
norm = os.path.normpath(provided_path)
last = os.path.basename(norm).lower()
parent = os.path.dirname(norm)
parent_last = os.path.basename(parent).lower() if parent else ""
if last == "login" and parent_last == "all_users":
# .../xwechat_files/all_users/login -> base_path 为 xwechat_files
base_path = os.path.dirname(parent)
login_dir = norm
elif last == "all_users":
# .../xwechat_files/all_users -> login_dir 追加 login
base_path = os.path.dirname(norm)
login_dir = os.path.join(norm, "login")
else:
# 认为传的是 xwechat_files 根
base_path = norm
login_dir = os.path.join(norm, "all_users", "login")
except Exception:
# 兜底:保持初始推断
pass
return {"base_path": base_path, "login_dir": login_dir}
def detect_wechat_accounts_from_login(login_base_path: str = None) -> List[Dict[str, Any]]:
"""
通过登录信息目录检测微信账号,并映射到实际数据目录。
Args:
login_base_path: 可选的微信数据根目录。
Returns:
账号信息列表(与 Backup 检测返回结构一致)
"""
accounts: List[Dict[str, Any]] = []
# 若用户提供路径,则优先按该路径推断
if login_base_path:
paths = _resolve_login_paths_from_base(login_base_path)
base_path = paths["base_path"]
login_dir = paths["login_dir"]
if not os.path.exists(login_dir):
return accounts
else:
# 自动检测:遍历候选根目录,寻找登录信息目录
base_path = None
login_dir = None
detected_dirs = auto_detect_wechat_data_dirs()
for detected_dir in detected_dirs:
try:
test_login = os.path.join(detected_dir, "all_users", "login")
if os.path.exists(test_login):
base_path = detected_dir
login_dir = test_login
break
# 也检查一层子目录
for sub in os.listdir(detected_dir):
sub_path = os.path.join(detected_dir, sub)
if not os.path.isdir(sub_path):
continue
test_login = os.path.join(sub_path, "all_users", "login")
if os.path.exists(test_login):
base_path = sub_path
login_dir = test_login
break
if base_path:
break
except (PermissionError, OSError):
continue
if not base_path or not login_dir:
return accounts
# 枚举 login 目录下的子项,每个子项代表一个账号标识(可能是文件或文件夹)
try:
for item in os.listdir(login_dir):
account_name = item
account_login_item_path = os.path.join(login_dir, item)
# 无论是文件还是文件夹,都视为一个账号标识
if not os.path.exists(account_login_item_path):
continue
# 在 base_path 下查找以 {account_name}_ 开头的数据目录(与 Backup 规则一致)
data_dir = None
try:
for data_item in os.listdir(base_path):
data_item_path = os.path.join(base_path, data_item)
if (
os.path.isdir(data_item_path)
and data_item.startswith(f"{account_name}_")
and data_item not in ["Backup", "all_users"]
):
data_dir = data_item_path
break
except (PermissionError, OSError):
pass
databases = collect_account_databases(data_dir, account_name) if data_dir else []
accounts.append(
{
"account_name": account_name,
"backup_dir": None,
"data_dir": data_dir,
"databases": databases,
"database_count": len(databases),
}
)
except (PermissionError, OSError):
# 无权限访问时返回已收集的账号
return accounts
return accounts
def collect_account_databases(data_dir: str, account_name: str) -> List[Dict[str, Any]]:
"""
收集指定账号数据目录下的所有数据库文件
@@ -474,7 +604,7 @@ def collect_account_databases(data_dir: str, account_name: str) -> List[Dict[str
return databases
def detect_wechat_installation() -> Dict[str, Any]:
def detect_wechat_installation(data_root_path: str | None = None) -> Dict[str, Any]:
"""
检测微信安装情况 - 改进的多账户检测逻辑
"""
@@ -528,47 +658,92 @@ def detect_wechat_installation() -> Dict[str, Any]:
if not result["is_running"]:
result["detection_methods"].append("未检测到微信进程")
# 2. 使用新的账号检测逻辑
result["detection_methods"].append("多账户检测")
# 2. 使用新的账号检测逻辑:同时支持 Backup 与登录信息目录,并合并结果
result["detection_methods"].append("多账户检测(多来源合并)")
try:
# 检测指定路径下的微信账号
accounts = detect_wechat_accounts_from_backup()
# 支持前端兜底路径:若提供 data_root_path则两种方式都以该路径为基准
accounts_from_backup = detect_wechat_accounts_from_backup(
backup_base_path=data_root_path
)
accounts_from_login = detect_wechat_accounts_from_login(
login_base_path=data_root_path
)
# 合并账号:按 account_name 去重,优先保留信息更完整者
account_map: Dict[str, Dict[str, Any]] = {}
def _merge_account(acc: Dict[str, Any]):
name = acc.get("account_name")
if not name:
return
if name not in account_map:
account_map[name] = {
"account_name": name,
"backup_dir": acc.get("backup_dir"),
"data_dir": acc.get("data_dir"),
"databases": list(acc.get("databases", [])),
"database_count": int(acc.get("database_count", 0)),
}
else:
existing = account_map[name]
if not existing.get("backup_dir") and acc.get("backup_dir"):
existing["backup_dir"] = acc.get("backup_dir")
if not existing.get("data_dir") and acc.get("data_dir"):
existing["data_dir"] = acc.get("data_dir")
# 合并数据库(按 path 去重)
seen_paths = {d.get("path") for d in existing.get("databases", [])}
for db in acc.get("databases", []):
if db.get("path") not in seen_paths:
existing.setdefault("databases", []).append(db)
seen_paths.add(db.get("path"))
existing["database_count"] = len(existing.get("databases", []))
for acc in accounts_from_backup:
_merge_account(acc)
for acc in accounts_from_login:
_merge_account(acc)
accounts = list(account_map.values())
result["accounts"] = accounts
result["total_accounts"] = len(accounts)
# 统计总数据库数量
total_db_count = sum(account["database_count"] for account in accounts)
total_db_count = sum(account.get("database_count", 0) for account in accounts)
result["total_databases"] = total_db_count
if accounts:
result["detection_methods"].append(f"在指定路径检测到 {len(accounts)} 个微信账户")
result["detection_methods"].append(
f"检测到 {len(accounts)} 个微信账户(已合并两种来源)"
)
result["detection_methods"].append(f"总计 {total_db_count} 个数据库文件")
# 为每个账户添加详细信息
for account in accounts:
account_name = account["account_name"]
db_count = account["database_count"]
data_dir_status = "已找到" if account["data_dir"] else "未找到"
result["detection_methods"].append(f"账户 {account_name}: {db_count} 个数据库, 数据目录{data_dir_status}")
account_name = account.get("account_name")
db_count = account.get("database_count", 0)
data_dir_status = "已找到" if account.get("data_dir") else "未找到"
result["detection_methods"].append(
f"账户 {account_name}: {db_count} 个数据库, 数据目录{data_dir_status}"
)
else:
result["detection_methods"].append("在指定路径检测到微信账户")
result["detection_methods"].append("未检测到微信账户")
# 填充向后兼容性字段
for account in accounts:
if account["data_dir"]:
if account.get("data_dir"):
result["wechat_data_dirs"].append(account["data_dir"])
result["message_dirs"].append(account["data_dir"])
result["user_accounts"].append(account["account_name"])
result["user_accounts"].append(account.get("account_name"))
# 添加数据库到兼容性列表
for db in account["databases"]:
for db in account.get("databases", []):
result["databases"].append({
"path": db["path"],
"name": db["name"],
"type": db["type"],
"size": db["size"],
"user": account["account_name"],
"user_dir": account["data_dir"]
"user": account.get("account_name"),
"user_dir": account.get("data_dir"),
})
except Exception as e:
@@ -624,6 +799,129 @@ def detect_wechat_installation() -> Dict[str, Any]:
return result
def detect_current_logged_in_account(base_path: str = None) -> Dict[str, Any]:
"""
通过key_info.db文件时间检测当前登录的微信账号
Args:
base_path: 微信数据根目录如果为None则自动检测
Returns:
当前登录账号信息
"""
current_account = None
latest_time = None
# 添加调试信息
print(f"[DEBUG] 开始检测当前登录账号提供的base_path: {base_path}")
# 如果没有指定路径,尝试自动检测
if base_path is None:
detected_dirs = auto_detect_wechat_data_dirs()
print(f"[DEBUG] 自动检测到的目录: {detected_dirs}")
if not detected_dirs:
return {
"current_account": None,
"latest_time": None,
"message": "未检测到微信数据目录"
}
base_path = detected_dirs[0]
print(f"[DEBUG] 使用的base_path: {base_path}")
# 查找登录信息目录 - 尝试多个可能的路径
possible_login_paths = [
os.path.join(base_path, "all_users", "login"), # 标准路径
os.path.join(base_path, "login"), # 备选路径1
]
# 也尝试在子目录中查找
try:
for item in os.listdir(base_path):
item_path = os.path.join(base_path, item)
if os.path.isdir(item_path):
possible_login_paths.extend([
os.path.join(item_path, "all_users", "login"), # 子目录中的标准路径
os.path.join(item_path, "login"), # 子目录中的备选路径
])
except (PermissionError, OSError):
pass
login_dir = None
for path in possible_login_paths:
print(f"[DEBUG] 检查路径: {path}")
if os.path.exists(path):
login_dir = path
print(f"[DEBUG] 找到登录目录: {login_dir}")
break
if not login_dir:
return {
"current_account": None,
"latest_time": None,
"message": f"未找到登录信息目录,尝试的路径: {possible_login_paths}"
}
try:
# 遍历登录目录下的所有账号文件夹
items = os.listdir(login_dir)
print(f"[DEBUG] 登录目录内容: {items}")
for item in items:
item_path = os.path.join(login_dir, item)
print(f"[DEBUG] 检查项目: {item}, 路径: {item_path}, 是否为目录: {os.path.isdir(item_path)}")
if not os.path.isdir(item_path):
continue
# 检查key_info.db文件
key_info_path = os.path.join(item_path, "key_info.db")
print(f"[DEBUG] 检查key_info.db文件: {key_info_path}, 是否存在: {os.path.exists(key_info_path)}")
if not os.path.exists(key_info_path):
continue
# 获取文件修改时间
try:
file_time = os.path.getmtime(key_info_path)
file_datetime = datetime.fromtimestamp(file_time)
print(f"[DEBUG] 找到key_info.db文件: {key_info_path}, 修改时间: {file_datetime}")
# 更新最新登录的账号
if latest_time is None or file_time > latest_time:
latest_time = file_time
current_account = item
print(f"[DEBUG] 更新最新登录账号: {current_account}, 时间: {file_datetime}")
except OSError as e:
print(f"[DEBUG] 无法获取文件时间: {key_info_path}, 错误: {e}")
continue
except (PermissionError, OSError) as e:
print(f"[DEBUG] 无法访问登录目录: {login_dir}, 错误: {e}")
return {
"current_account": None,
"latest_time": None,
"message": f"无法访问登录目录: {e}"
}
if current_account:
print(f"[DEBUG] 最终结果: 当前登录账号 {current_account}, 时间 {latest_time}")
return {
"current_account": current_account,
"latest_time": latest_time,
"latest_time_formatted": datetime.fromtimestamp(latest_time).isoformat() if latest_time else None,
"message": f"检测到当前登录账号: {current_account}"
}
else:
print(f"[DEBUG] 最终结果: 未检测到当前登录账号")
return {
"current_account": None,
"latest_time": None,
"message": "未检测到当前登录账号"
}
def get_wechat_info() -> Dict[str, Any]:
"""获取微信安装和数据库信息