feat: parse last login account info

This commit is contained in:
H3CoF6
2026-04-09 01:30:35 +08:00
Unverified
parent 73f69f6f14
commit 4b47c1e69c
3 changed files with 233 additions and 99 deletions
+60 -35
View File
@@ -132,40 +132,58 @@
</div>
<div class="divide-y divide-[#EDEDED] max-h-96 overflow-y-auto">
<div v-for="(account, index) in sortedAccounts" :key="index"
:class="['p-5 hover:bg-[#F9F9F9] transition-all duration-200', isCurrentAccount(account.account_name) ? 'bg-[#07C160]/5' : '']">
<div class="flex items-center justify-between">
:class="['p-5 transition-all duration-200 relative overflow-hidden', isCurrentAccount(account.account_name) ? 'bg-[#07C160]/5 border border-[#07C160]/20' : 'hover:bg-[#F9F9F9]']">
<div v-if="isCurrentAccount(account.account_name)" class="absolute top-0 right-0 bg-gradient-to-l from-[#07C160]/20 to-transparent px-4 py-1 rounded-bl-xl flex items-center">
<span class="text-xs text-[#07C160] font-bold flex items-center">
<svg class="w-3 h-3 mr-1" fill="none" stroke="currentColor" viewBox="0 0 24 24"><path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M5 13l4 4L19 7"/></svg>
最近登录账户
</span>
</div>
<div class="flex items-center justify-between mt-1">
<div class="flex-1">
<div class="flex items-center">
<div class="w-12 h-12 bg-gradient-to-br from-[#07C160]/10 to-[#91D300]/10 rounded-full flex items-center justify-center mr-4 shadow-inner">
<span class="text-[#07C160] font-bold text-lg">{{ account.account_name?.charAt(0)?.toUpperCase() || 'U' }}</span>
</div>
<div>
<div class="flex items-center">
<p class="text-lg font-bold text-[#000000e6]">{{ account.account_name || '未知账户' }}</p>
<span v-if="isCurrentAccount(account.account_name)"
class="ml-2 inline-flex items-center px-2.5 py-0.5 bg-[#07C160]/10 text-[#07C160] rounded-full text-xs font-bold">
当前登录
</span>
<template v-if="isCurrentAccount(account.account_name) && currentAccountInfo?.avatar">
<img :src="currentAccountInfo.avatar" class="w-14 h-14 rounded-xl border-2 border-[#07C160]/30 mr-4 shadow-sm object-cover bg-white" alt=""/>
</template>
<template v-else>
<div class="w-14 h-14 bg-gradient-to-br from-[#07C160]/10 to-[#91D300]/10 rounded-xl flex items-center justify-center mr-4 shadow-inner">
<span class="text-[#07C160] font-bold text-xl">{{ account.account_name?.charAt(0)?.toUpperCase() || 'U' }}</span>
</div>
<div class="flex items-center mt-1.5 space-x-4 text-sm text-[#7F7F7F]">
<span class="flex items-center">
<svg class="w-4 h-4 mr-1 text-gray-400" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 7v10c0 2.21 3.582 4 8 4s8-1.79 8-4V7M4 7c0 2.21 3.582 4 8 4s8-1.79 8-4M4 7c0-2.21 3.582-4 8-4s8 1.79 8 4"/>
</svg>
{{ account.database_count }} 个库文件
</span>
</template>
<div>
<div class="flex flex-col">
<template v-if="isCurrentAccount(account.account_name) && currentAccountInfo?.nickname">
<p class="text-xl font-extrabold text-[#000000e6] leading-tight">{{ currentAccountInfo.nickname }}</p>
<p class="text-xs text-[#7F7F7F] mt-1 font-mono">wxid: {{ account.account_name }}</p>
</template>
<template v-else>
<p class="text-lg font-bold text-[#000000e6]">{{ account.account_name || '未知账户' }}</p>
</template>
</div>
<div class="flex items-center mt-2 space-x-4 text-sm text-[#7F7F7F]">
<span class="flex items-center">
<svg class="w-4 h-4 mr-1 text-gray-400" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 7v10c0 2.21 3.582 4 8 4s8-1.79 8-4V7M4 7c0 2.21 3.582 4 8 4s8-1.79 8-4M4 7c0-2.21 3.582-4 8-4s8 1.79 8 4"/>
</svg>
{{ account.database_count }} 个库文件
</span>
<span v-if="account.data_dir" class="flex items-center">
<svg class="w-4 h-4 mr-1 text-[#07C160]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M5 13l4 4L19 7"/>
</svg>
路径已确认
</span>
<svg class="w-4 h-4 mr-1 text-[#07C160]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M5 13l4 4L19 7"/>
</svg>
路径已确认
</span>
</div>
</div>
</div>
</div>
<button @click="goToDecrypt(account)"
class="inline-flex items-center px-6 py-2.5 bg-[#07C160] text-white rounded-xl font-bold hover:bg-[#06AD56] hover:shadow-md hover:-translate-y-0.5 transition-all duration-200 text-sm shrink-0">
class="inline-flex items-center px-6 py-2.5 bg-[#07C160] text-white rounded-xl font-bold hover:bg-[#06AD56] hover:shadow-md hover:-translate-y-0.5 transition-all duration-200 text-sm shrink-0 z-10">
解密提取
<svg class="w-4 h-4 ml-1" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M9 5l7 7-7 7"/>
@@ -173,7 +191,6 @@
</button>
</div>
<!-- 展开更多信息 -->
<div class="mt-4 pt-3 border-t border-dashed border-gray-200 text-sm text-gray-400">
<p v-if="account.data_dir" class="font-mono text-xs truncate" title="复制路径">
📂 {{ account.data_dir }}
@@ -254,20 +271,26 @@ const handlePickDirectory = async () => {
// 计算属性:将当前登录账号排在第一位
const sortedAccounts = computed(() => {
if (!detectionResult.value?.data?.accounts) return []
const accounts = [...detectionResult.value.data.accounts]
const currentAccountName = detectionResult.value.data?.current_account?.current_account
if (!currentAccountName) return accounts
// 将当前登录账号移到第一位
const current = detectionResult.value.data?.current_account
const currentTargetName = current?.matched_folder || current?.current_account
if (!currentTargetName) return accounts
// 置顶最近登录账号
return accounts.sort((a, b) => {
if (a.account_name === currentAccountName) return -1
if (b.account_name === currentAccountName) return 1
if (a.account_name === currentTargetName) return -1
if (b.account_name === currentTargetName) return 1
return 0
})
})
const currentAccountInfo = computed(() => {
return detectionResult.value?.data?.current_account || null
})
// 开始检测
const startDetection = async () => {
loading.value = true
@@ -357,7 +380,9 @@ const isCurrentAccount = (accountName) => {
if (!detectionResult.value?.data?.current_account) {
return false
}
return detectionResult.value.data.current_account.current_account === accountName
const current = detectionResult.value.data.current_account
// 支持严格匹配或通过后缀兼容的匹配
return accountName === current.matched_folder || accountName === current.current_account
}
// 页面加载时自动检测
@@ -21,7 +21,21 @@ async def detect_wechat_detailed(data_root_path: Optional[str] = None):
# 检测当前登录账号
current_account_info = detect_current_logged_in_account(data_root_path)
# 【新增特性】目录匹配校验:处理目录名 wxid_xxxx_yyyy 与真实 wxid_xxxx 的适配
if current_account_info and current_account_info.get("current_account"):
base_wxid = current_account_info["current_account"]
current_account_info["matched_folder"] = base_wxid # 默认兜底
# 遍历寻找以该 wxid 开头的用户文件夹(支持后缀匹配)
for acc in info.get("accounts", []):
acc_name = acc["account_name"]
if acc_name == base_wxid or acc_name.startswith(f"{base_wxid}_"):
current_account_info["matched_folder"] = acc_name
break
info['current_account'] = current_account_info
# logger.info(current_account_info)
# 添加一些统计信息
stats = {
+159 -64
View File
@@ -14,7 +14,6 @@ from ctypes import wintypes
from datetime import datetime
def get_wx_db(msg_dir: str = None,
db_types: Union[List[str], str] = None,
wxids: Union[List[str], str] = None) -> List[dict]:
@@ -49,7 +48,7 @@ def get_wx_db(msg_dir: str = None,
wxid_dirs[os.path.basename(sub_dir)] = os.path.join(msg_dir, sub_dir)
else:
wxid_dirs[os.path.basename(msg_dir)] = msg_dir
for wxid, wxid_dir in wxid_dirs.items():
if wxids and wxid not in wxids: # 如果指定wxid,则过滤掉其他wxid
continue
@@ -70,6 +69,7 @@ def get_wx_db(msg_dir: str = None,
result.append({"wxid": wxid, "db_type": db_type, "db_path": db_path, "wxid_dir": wxid_dir})
return result
# Windows API 常量和结构
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_READ = 0x0010
@@ -87,6 +87,7 @@ CreateToolhelp32Snapshot = kernel32.CreateToolhelp32Snapshot
Process32FirstW = kernel32.Process32FirstW
Process32NextW = kernel32.Process32NextW
class PROCESSENTRY32W(ctypes.Structure):
_fields_ = [
('dwSize', wintypes.DWORD),
@@ -105,6 +106,98 @@ class PROCESSENTRY32W(ctypes.Structure):
# 删除了WeChatDecryptor类,解密功能已移至独立的wechat_decrypt.py脚本
def parse_global_config(base_path: str) -> dict:
"""
解析 all_users/config/global_config 获取最近登录用户信息
基于 AES-128-CFB 解密,并解析 MMKV 的 Varint 格式
"""
try:
import os
config_path = os.path.join(base_path, 'all_users', 'config', 'global_config')
if not os.path.exists(config_path):
return None
with open(config_path, 'rb') as f:
full_data = f.read()
if len(full_data) <= 4:
return None
encrypted_data = full_data[4:]
# 核心修复 1:强制截断取前 16 字节,等同于 Rust 中的 b"xwechat_crypt_ke"
key = b'xwechat_crypt_key'[:16]
iv = b'\0' * 16
# 尝试主流的两种密码库
try:
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.backends import default_backend
cipher = Cipher(algorithms.AES(key), modes.CFB(iv), backend=default_backend())
decryptor = cipher.decryptor()
decrypted = decryptor.update(encrypted_data) + decryptor.finalize()
except ImportError:
from Crypto.Cipher import AES
# PyCryptodome 中 CFB 模式默认 segment_size 是 8,需要指定为 128
cipher = AES.new(key, AES.MODE_CFB, iv=iv, segment_size=128)
decrypted = cipher.decrypt(encrypted_data)
# MMKV Varint 长度解码
def decode_varint(data, offset):
result = 0
shift = 0
while offset < len(data):
byte = data[offset]
offset += 1
result |= (byte & 0x7f) << shift
if not (byte & 0x80):
break
shift += 7
return result, offset
def extract_mmkv_string(data: bytes, key_str: str) -> str:
key_bytes = key_str.encode('utf-8')
idx = data.find(key_bytes)
if idx == -1: return None
offset = idx + len(key_bytes)
try:
value_len, offset = decode_varint(data, offset)
if value_len <= 0 or offset >= len(data):
return None
str_len, offset = decode_varint(data, offset)
if str_len > 0 and offset + str_len <= len(data):
return data[offset:offset + str_len].decode('utf-8', errors='ignore')
except Exception:
pass
return None
wxid = extract_mmkv_string(decrypted, 'mmkv_key_user_name')
nickname = extract_mmkv_string(decrypted, 'mmkv_key_nick_name')
avatar_url = extract_mmkv_string(decrypted, 'mmkv_key_head_img_url')
# 核心修复 2:参考 Rust 逻辑,头像链接往往以 "/0" 结尾(微信头像的尺寸标识)
if not avatar_url and b'http' in decrypted:
http_idx = decrypted.find(b'http')
slash_zero_idx = decrypted.find(b'/0', http_idx)
if slash_zero_idx != -1:
# 包含 "/0" 这两个字符本身,所以是 +2
avatar_url = decrypted[http_idx:slash_zero_idx + 2].decode('utf-8', errors='ignore')
if wxid or nickname:
return {
"wxid": wxid,
"nickname": nickname,
"avatar": avatar_url
}
return None
except Exception as e:
print(f"[DEBUG] 解析 global_config 失败: {e}")
return None
def find_wechat_databases() -> List[str]:
"""在新的xwechat_files目录中查找微信数据库文件
@@ -119,13 +212,13 @@ def find_wechat_databases() -> List[str]:
# 检查新的微信4.0+目录结构
wechat_dirs = [
documents_dir / "xwechat_files", # 新版微信4.0+
documents_dir / "WeChat Files" # 旧版微信
documents_dir / "WeChat Files" # 旧版微信
]
for wechat_dir in wechat_dirs:
if not wechat_dir.exists():
continue
# 查找用户目录(wxid_*模式)
for user_dir in wechat_dir.iterdir():
if not user_dir.is_dir():
@@ -149,7 +242,7 @@ def find_wechat_databases() -> List[str]:
for db_file in multi_dir.glob("*.db"):
if db_file.is_file():
db_files.append(str(db_file))
return db_files
@@ -158,7 +251,7 @@ def get_process_exe_path(process_id):
h_process = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, process_id)
if not h_process:
return None
exe_path = ctypes.create_unicode_buffer(MAX_PATH)
if GetModuleFileNameExW(h_process, None, exe_path, MAX_PATH) > 0:
CloseHandle(h_process)
@@ -167,35 +260,37 @@ def get_process_exe_path(process_id):
CloseHandle(h_process)
return None
def get_process_list():
"""获取系统进程列表"""
h_process_snap = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0)
if h_process_snap == ctypes.wintypes.HANDLE(-1).value:
return []
pe32 = PROCESSENTRY32W()
pe32.dwSize = ctypes.sizeof(PROCESSENTRY32W)
process_list = []
if not Process32FirstW(h_process_snap, ctypes.byref(pe32)):
CloseHandle(h_process_snap)
return []
while True:
process_list.append((pe32.th32ProcessID, pe32.szExeFile))
if not Process32NextW(h_process_snap, ctypes.byref(pe32)):
break
CloseHandle(h_process_snap)
return process_list
def auto_detect_wechat_data_dirs():
"""
自动检测微信数据目录 - 多策略组合检测
:return: 检测到的微信数据目录列表
"""
detected_dirs = []
# 策略1:注册表检测已移除
# 策略2和策略3:注册表相关检测已移除
@@ -211,24 +306,24 @@ def auto_detect_wechat_data_dirs():
for drive in drives:
if not os.path.exists(drive):
continue
try:
# 扫描驱动器根目录和常见目录
scan_paths = [
drive + os.sep,
os.path.join(drive + os.sep, "Users"),
]
for scan_path in scan_paths:
if not os.path.exists(scan_path):
continue
try:
for item in os.listdir(scan_path):
item_path = os.path.join(scan_path, item)
if not os.path.isdir(item_path):
continue
# 检查是否匹配微信目录模式
for pattern in common_wechat_patterns:
if pattern.lower() in item.lower():
@@ -242,7 +337,7 @@ def auto_detect_wechat_data_dirs():
continue
except (PermissionError, OSError):
continue
# 策略2:进程内存分析(简化版)
try:
process_list = get_process_list()
@@ -263,7 +358,7 @@ def auto_detect_wechat_data_dirs():
current = parent
else:
break
for parent_dir in parent_dirs:
for pattern in common_wechat_patterns:
potential_dir = os.path.join(parent_dir, pattern)
@@ -275,7 +370,7 @@ def auto_detect_wechat_data_dirs():
pass
except:
pass
return detected_dirs
@@ -300,6 +395,7 @@ def has_wxid_directories(directory):
except:
return False
def get_wx_dir_by_reg(wxid="all"):
"""
通过多种方法获取微信目录 - 改进的自动检测
@@ -327,6 +423,7 @@ def get_wx_dir_by_reg(wxid="all"):
return wx_dir if os.path.exists(wx_dir) else None
def detect_wechat_accounts_from_backup(backup_base_path: str = None) -> List[Dict[str, Any]]:
"""
从指定的备份路径检测微信账号
@@ -393,8 +490,8 @@ def detect_wechat_accounts_from_backup(backup_base_path: str = None) -> List[Dic
for data_item in os.listdir(backup_base_path):
data_item_path = os.path.join(backup_base_path, data_item)
if (os.path.isdir(data_item_path) and
data_item.startswith(f"{account_name}_") and
data_item != "Backup"):
data_item.startswith(f"{account_name}_") and
data_item != "Backup"):
data_dir = data_item_path
break
except (PermissionError, OSError):
@@ -525,9 +622,9 @@ def detect_wechat_accounts_from_login(login_base_path: str = None) -> List[Dict[
for data_item in os.listdir(base_path):
data_item_path = os.path.join(base_path, data_item)
if (
os.path.isdir(data_item_path)
and data_item.startswith(f"{account_name}_")
and data_item not in ["Backup", "all_users"]
os.path.isdir(data_item_path)
and data_item.startswith(f"{account_name}_")
and data_item not in ["Backup", "all_users"]
):
data_dir = data_item_path
break
@@ -551,6 +648,7 @@ def detect_wechat_accounts_from_login(login_base_path: str = None) -> List[Dict[
return accounts
def collect_account_databases(data_dir: str, account_name: str) -> List[Dict[str, Any]]:
"""
收集指定账号数据目录下的所有数据库文件
@@ -801,40 +899,37 @@ def detect_wechat_installation(data_root_path: str | None = None) -> Dict[str, A
def detect_current_logged_in_account(base_path: str = None) -> Dict[str, Any]:
"""
通过key_info.db文件时间检测当前登录的微信账号
Args:
base_path: 微信数据根目录,如果为None则自动检测
Returns:
当前登录账号信息
通过 global_config 解析 或 key_info.db 时间检测当前登录的微信账号
"""
current_account = None
latest_time = None
# 添加调试信息
print(f"[DEBUG] 开始检测当前登录账号,提供的base_path: {base_path}")
# 如果没有指定路径,尝试自动检测
# print(f"[DEBUG] 开始检测当前登录账号,提供的base_path: {base_path}")
if base_path is None:
detected_dirs = auto_detect_wechat_data_dirs()
print(f"[DEBUG] 自动检测到的目录: {detected_dirs}")
if not detected_dirs:
return {
"current_account": None,
"latest_time": None,
"message": "未检测到微信数据目录"
}
return {"current_account": None, "message": "未检测到微信数据目录"}
base_path = detected_dirs[0]
print(f"[DEBUG] 使用的base_path: {base_path}")
# 查找登录信息目录 - 尝试多个可能的路径
# 1. 新特性:优先尝试从 global_config 解析完整用户信息
parsed_config = parse_global_config(base_path)
if parsed_config and parsed_config.get('wxid'):
print(f"[DEBUG] 从 global_config 成功解析出账号: {parsed_config['wxid']}")
return {
"current_account": parsed_config["wxid"], # 不带校验位的 wxid
"nickname": parsed_config.get("nickname"),
"avatar": parsed_config.get("avatar"),
"latest_time": None,
"message": f"通过 global_config 检测到最近登录账号: {parsed_config['wxid']}"
}
# 2. 降级回退机制:原先基于 key_info.db 的时间探测逻辑
latest_time = None
current_account = None
possible_login_paths = [
os.path.join(base_path, "all_users", "login"), # 标准路径
os.path.join(base_path, "login"), # 备选路径1
os.path.join(base_path, "all_users", "login"),
os.path.join(base_path, "login"),
]
# 也尝试在子目录中查找
try:
for item in os.listdir(base_path):
@@ -842,11 +937,11 @@ def detect_current_logged_in_account(base_path: str = None) -> Dict[str, Any]:
if os.path.isdir(item_path):
possible_login_paths.extend([
os.path.join(item_path, "all_users", "login"), # 子目录中的标准路径
os.path.join(item_path, "login"), # 子目录中的备选路径
os.path.join(item_path, "login"), # 子目录中的备选路径
])
except (PermissionError, OSError):
pass
login_dir = None
for path in possible_login_paths:
print(f"[DEBUG] 检查路径: {path}")
@@ -854,49 +949,49 @@ def detect_current_logged_in_account(base_path: str = None) -> Dict[str, Any]:
login_dir = path
print(f"[DEBUG] 找到登录目录: {login_dir}")
break
if not login_dir:
return {
"current_account": None,
"latest_time": None,
"message": f"未找到登录信息目录,尝试的路径: {possible_login_paths}"
}
try:
# 遍历登录目录下的所有账号文件夹
items = os.listdir(login_dir)
print(f"[DEBUG] 登录目录内容: {items}")
for item in items:
item_path = os.path.join(login_dir, item)
print(f"[DEBUG] 检查项目: {item}, 路径: {item_path}, 是否为目录: {os.path.isdir(item_path)}")
if not os.path.isdir(item_path):
continue
# 检查key_info.db文件
key_info_path = os.path.join(item_path, "key_info.db")
print(f"[DEBUG] 检查key_info.db文件: {key_info_path}, 是否存在: {os.path.exists(key_info_path)}")
if not os.path.exists(key_info_path):
continue
# 获取文件修改时间
try:
file_time = os.path.getmtime(key_info_path)
file_datetime = datetime.fromtimestamp(file_time)
print(f"[DEBUG] 找到key_info.db文件: {key_info_path}, 修改时间: {file_datetime}")
# 更新最新登录的账号
if latest_time is None or file_time > latest_time:
latest_time = file_time
current_account = item
print(f"[DEBUG] 更新最新登录账号: {current_account}, 时间: {file_datetime}")
except OSError as e:
print(f"[DEBUG] 无法获取文件时间: {key_info_path}, 错误: {e}")
continue
except (PermissionError, OSError) as e:
print(f"[DEBUG] 无法访问登录目录: {login_dir}, 错误: {e}")
return {
@@ -904,7 +999,7 @@ def detect_current_logged_in_account(base_path: str = None) -> Dict[str, Any]:
"latest_time": None,
"message": f"无法访问登录目录: {e}"
}
if current_account:
print(f"[DEBUG] 最终结果: 当前登录账号 {current_account}, 时间 {latest_time}")
return {