fix(wx_key): 删掉时间排序获取主pid的逻辑,强制使用更稳定的最短命令行方案

This commit is contained in:
H3CoF6
2026-04-29 01:55:59 +08:00
Unverified
parent 1ac7cd4cc1
commit 5a2b9076c4
+34 -22
View File
@@ -51,10 +51,10 @@ def _summarize_key_payload(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
def _resolve_wxid_dir_for_image_key(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Path:
explicit_wxid_dir = str(wxid_dir or "").strip()
if explicit_wxid_dir:
@@ -193,15 +193,26 @@ class WeChatKeyFetcher:
process = subprocess.Popen(normalized_exe_path)
time.sleep(2)
candidates = []
target_process_name = Path(normalized_exe_path).name.lower()
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
proc_name = str(proc.info.get('name') or "").strip().lower()
if proc_name == target_process_name or self._is_wechat_process(proc_name):
candidates.append(proc)
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
try:
p_name = proc.info.get('name')
if p_name and p_name.lower() in self.process_names:
cmdline_list = proc.info.get('cmdline') or []
cmdline_str = " ".join(cmdline_list).lower()
if any(target.lower() in cmdline_str for target in WECHAT_EXECUTABLE_NAMES):
candidates.append({
"pid": proc.info['pid'],
"cmd_len": len(cmdline_str)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if candidates:
candidates.sort(key=lambda x: x.info['create_time'], reverse=True)
target_pid = candidates[0].info['pid']
# 选择命令行最短的一个作为主进程
main_proc = min(candidates, key=lambda x: x['cmd_len'])
target_pid = main_proc["pid"]
return target_pid
return process.pid
@@ -275,6 +286,7 @@ class WeChatKeyFetcher:
"db_key": found_db_key
}
def get_db_key_workflow(wechat_install_path: Optional[str] = None):
fetcher = WeChatKeyFetcher()
return fetcher.fetch_db_key(wechat_install_path=wechat_install_path)
@@ -295,13 +307,13 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
if wx_key is None or not hasattr(wx_key, 'get_image_key'):
logger.info("[image_key] 本地算法不可用:wx_key.get_image_key 缺失")
return []
try:
res_json = wx_key.get_image_key()
if not res_json:
logger.info("[image_key] 本地算法返回空结果")
return []
data = json.loads(res_json)
accounts = data.get('accounts', [])
results = []
@@ -329,10 +341,10 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
async def get_image_key_integrated_workflow(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
@@ -341,7 +353,7 @@ async def get_image_key_integrated_workflow(
"""
# 1. 尝试本地提取
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account or wxid_dir or db_storage_path:
try:
@@ -409,10 +421,10 @@ async def get_image_key_integrated_workflow(
async def fetch_and_save_remote_keys(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
wx_id_dir = _resolve_wxid_dir_for_image_key(
account,