Merge pull request #54 from H3CoF6/main

尝试修护两个hook的bug
This commit is contained in:
2977094657
2026-04-29 02:15:06 +08:00
committed by GitHub
Unverified
18 changed files with 94 additions and 61 deletions
+1 -1
View File
@@ -20,7 +20,7 @@ dependencies = [
"pilk>=0.2.4",
"pypinyin>=0.53.0",
"jieba>=0.42.1",
"wx_key>=2.0.0",
"wx_key>=2.0.1",
"packaging",
"httpx",
]
+16 -13
View File
@@ -6,19 +6,21 @@ from .logging_config import get_logger
logger = get_logger(__name__)
class ImgHelper:
def __init__(self):
self._lib: Optional[ctypes.CDLL] = None
self._enabled = False
self._lock = __import__("threading").Lock()
def _resolve_dll_path(self) -> Path:
@staticmethod
def _resolve_dll_path() -> Path:
# 1. Default (source code layout)
base = Path(__file__).resolve().parent
path = base / "native" / "img_helper.dll"
if path.exists():
return path
# 2. Frozen (bundled exe)
import sys
if getattr(sys, "frozen", False):
@@ -27,22 +29,22 @@ class ImgHelper:
for p in [exe_dir / "native" / "img_helper.dll", exe_dir / "img_helper.dll"]:
if p.exists():
return p
# 3. Current working directory
for p in [Path.cwd() / "native" / "img_helper.dll", Path.cwd() / "img_helper.dll"]:
if p.exists():
return p
return path # Fallback to default for error message
return path # Fallback to default for error message
def _load_lib(self):
if self._lib is not None:
return self._lib
dll_path = self._resolve_dll_path()
if not dll_path.exists():
raise FileNotFoundError(f"Missing img_helper.dll at: {dll_path}")
try:
# On Windows, ensure the DLL's directory is in the search path for dependencies
if hasattr(os, 'add_dll_directory'):
@@ -50,18 +52,18 @@ class ImgHelper:
os.add_dll_directory(str(dll_path.parent))
except Exception:
pass
lib = ctypes.CDLL(str(dll_path))
lib.InitImgHelper.argtypes = [ctypes.c_uint32]
lib.InitImgHelper.restype = ctypes.c_bool
lib.UninstallImgHelper.argtypes = []
lib.UninstallImgHelper.restype = None
lib.GetImgHelperError.argtypes = []
lib.GetImgHelperError.restype = ctypes.c_char_p
self._lib = lib
return lib
except Exception as e:
@@ -76,7 +78,7 @@ class ImgHelper:
# If already enabled, we uninstall first to be safe as per DLL docs suggestion
# about being designed to hook one process at a time.
lib.UninstallImgHelper()
if lib.InitImgHelper(pid):
self._enabled = True
logger.info(f"ImgHelper hook applied to PID {pid}")
@@ -108,4 +110,5 @@ class ImgHelper:
def is_enabled(self) -> bool:
return self._enabled
IMG_HELPER = ImgHelper()
+34 -22
View File
@@ -51,10 +51,10 @@ def _summarize_key_payload(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
def _resolve_wxid_dir_for_image_key(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Path:
explicit_wxid_dir = str(wxid_dir or "").strip()
if explicit_wxid_dir:
@@ -193,15 +193,26 @@ class WeChatKeyFetcher:
process = subprocess.Popen(normalized_exe_path)
time.sleep(2)
candidates = []
target_process_name = Path(normalized_exe_path).name.lower()
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
proc_name = str(proc.info.get('name') or "").strip().lower()
if proc_name == target_process_name or self._is_wechat_process(proc_name):
candidates.append(proc)
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
try:
p_name = proc.info.get('name')
if p_name and p_name.lower() in self.process_names:
cmdline_list = proc.info.get('cmdline') or []
cmdline_str = " ".join(cmdline_list).lower()
if any(target.lower() in cmdline_str for target in WECHAT_EXECUTABLE_NAMES):
candidates.append({
"pid": proc.info['pid'],
"cmd_len": len(cmdline_str)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if candidates:
candidates.sort(key=lambda x: x.info['create_time'], reverse=True)
target_pid = candidates[0].info['pid']
# 选择命令行最短的一个作为主进程
main_proc = min(candidates, key=lambda x: x['cmd_len'])
target_pid = main_proc["pid"]
return target_pid
return process.pid
@@ -275,6 +286,7 @@ class WeChatKeyFetcher:
"db_key": found_db_key
}
def get_db_key_workflow(wechat_install_path: Optional[str] = None):
fetcher = WeChatKeyFetcher()
return fetcher.fetch_db_key(wechat_install_path=wechat_install_path)
@@ -295,13 +307,13 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
if wx_key is None or not hasattr(wx_key, 'get_image_key'):
logger.info("[image_key] 本地算法不可用:wx_key.get_image_key 缺失")
return []
try:
res_json = wx_key.get_image_key()
if not res_json:
logger.info("[image_key] 本地算法返回空结果")
return []
data = json.loads(res_json)
accounts = data.get('accounts', [])
results = []
@@ -329,10 +341,10 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
async def get_image_key_integrated_workflow(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
@@ -341,7 +353,7 @@ async def get_image_key_integrated_workflow(
"""
# 1. 尝试本地提取
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account or wxid_dir or db_storage_path:
try:
@@ -409,10 +421,10 @@ async def get_image_key_integrated_workflow(
async def fetch_and_save_remote_keys(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
wx_id_dir = _resolve_wxid_dir_for_image_key(
account,
Binary file not shown.
Binary file not shown.
+4 -4
View File
@@ -53,16 +53,16 @@ async def toggle_img_helper(req: ImgHelperToggleRequest):
if not req.enabled:
IMG_HELPER.disable()
return {"status": "success", "enabled": False}
# Attempt to enable
status_res = await check_wechat_status()
wx_status = status_res.get("wx_status", {})
if not wx_status.get("is_running") or not wx_status.get("pid"):
raise HTTPException(status_code=400, detail="未检测到微信正在运行,请先打开微信再尝试!")
pid = wx_status["pid"]
ok, err = IMG_HELPER.enable(pid)
if not ok:
raise HTTPException(status_code=500, detail=f"开启失败: {err}")
return {"status": "success", "enabled": True}
return {"status": "success", "enabled": True}
@@ -90,8 +90,11 @@ async def detect_current_account(data_root_path: Optional[str] = None):
@router.get("/api/wechat/status", summary="检查微信运行状态")
async def check_wechat_status():
"""
检查系统中是否有 Weixin.exe 或 WeChat.exe 进程在运行
返回: status=0 成功, wx_status={is_running: bool, pid: int, ...}
检查系统微信主进程状态
逻辑:
1. 匹配进程名 Weixin.exe 或 WeChat.exe
2. 校验命令行必须包含 exe 名称(排除崩溃后的残留/无效进程)
3. 在有效进程中选择命令行最短的一个作为主进程
"""
process_name_targets = ["Weixin.exe", "WeChat.exe"]
@@ -103,21 +106,37 @@ async def check_wechat_status():
}
try:
for proc in psutil.process_iter(['pid', 'name', 'exe', 'memory_info']):
candidates = []
for proc in psutil.process_iter(['pid', 'name', 'exe', 'memory_info', 'cmdline']):
try:
if proc.info['name'] and proc.info['name'] in process_name_targets:
wx_status["is_running"] = True
wx_status["pid"] = proc.info['pid']
wx_status["exe_path"] = proc.info['exe']
p_name = proc.info.get('name')
if p_name and p_name in process_name_targets:
# 获取命令行并合并为字符串
cmdline_list = proc.info.get('cmdline') or []
cmdline_str = " ".join(cmdline_list).lower()
mem = proc.info['memory_info']
if mem:
wx_status["memory_usage_mb"] = round(mem.rss / (1024 * 1024), 2)
break
if any(target.lower() in cmdline_str for target in process_name_targets):
candidates.append({
"pid": proc.info['pid'],
"exe_path": proc.info['exe'],
"cmd_len": len(cmdline_str),
"memory_info": proc.info['memory_info']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if candidates:
main_proc = min(candidates, key=lambda x: x['cmd_len'])
wx_status["is_running"] = True
wx_status["pid"] = main_proc["pid"]
wx_status["exe_path"] = main_proc["exe_path"]
mem = main_proc["memory_info"]
if mem:
wx_status["memory_usage_mb"] = round(mem.rss / (1024 * 1024), 2)
return {
"status": 0,
"errmsg": "ok",
@@ -125,9 +144,8 @@ async def check_wechat_status():
}
except Exception as e:
# 即使出错也返回 JSON,但 status 非 0
return {
"status": -1,
"errmsg": f"检查进程失败: {str(e)}",
"errmsg": f"检查微信主进程失败: {str(e)}",
"wx_status": wx_status
}
}
Generated
+6 -6
View File
@@ -919,7 +919,7 @@ requires-dist = [
{ name = "requests", specifier = ">=2.32.4" },
{ name = "typing-extensions", specifier = ">=4.8.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0" },
{ name = "wx-key", specifier = ">=2.0.0" },
{ name = "wx-key", specifier = ">=2.0.1" },
{ name = "zstandard", specifier = ">=0.23.0" },
]
provides-extras = ["build"]
@@ -935,13 +935,13 @@ wheels = [
[[package]]
name = "wx-key"
version = "2.0.0"
version = "2.0.1"
source = { registry = "tools/key_wheels" }
wheels = [
{ path = "wx_key-2.0.0-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp314-cp314-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp314-cp314-win_amd64.whl" },
]
[[package]]