fix: fix the request error when next_action_id changed

This commit is contained in:
H3CoF6
2026-02-12 22:39:57 +08:00
parent b9e954ef3e
commit 35656540d5

View File

@@ -14,8 +14,10 @@ import subprocess
import hashlib
import os
import json
import re
import random
import logging
import asyncio
import httpx
from pathlib import Path
from typing import Optional, List, Dict, Any
@@ -203,7 +205,56 @@ def get_db_key_workflow():
# 远程 API 配置
REMOTE_URL = "https://view.free.c3o.re/dashboard"
NEXT_ACTION_ID = "7c8f99280c70626ccf5960cc4a68f368197e15f8e9"
BASE_URL = "https://view.free.c3o.re" # 用于拼接js
# NEXT_ACTION_ID = "7c8f99280c70626ccf5960cc4a68f368197e15f8e9" # 不可以硬编码
async def fetch_js_and_scan(client: httpx.AsyncClient, js_path: str) -> Optional[str]:
"""
异步下载单个 JS 文件并匹配 Action ID
"""
full_url = f"{BASE_URL}{js_path}" if js_path.startswith("/") else js_path
try:
response = await client.get(full_url)
if response.status_code != 200:
return None
content = response.text
action_id_pattern = re.compile(r'createServerReference.*?["\']([a-f0-9]{42})["\'].*?["\']getUserConfigFromBytes["\']')
match = action_id_pattern.search(content)
if match:
found_id = match.group(1)
return found_id
except Exception as e:
logger.error(f"Error fetching {js_path}: {e}")
return None
async def _get_next_action_id_async() -> str:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(REMOTE_URL)
html = resp.text
js_file_pattern = re.compile(r'src="(/_next/static/chunks/[^"]+\.js)"')
js_files = set(js_file_pattern.findall(html))
if not js_files:
raise Exception("未找到任何 Next.js chunk 文件,可能页面结构已变动。")
tasks = [fetch_js_and_scan(client, js_path) for js_path in js_files]
results = await asyncio.gather(*tasks)
for res in results:
if res:
return res
raise Exception("遍历了所有 JS 文件,但未找到匹配的 createServerReference ID。")
def get_wechat_internal_global_config(wx_dir: Path, file_name1) -> bytes:
@@ -227,6 +278,14 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
wx_id_dir = _resolve_account_wxid_dir(account_dir)
wxid = wx_id_dir.name
logger.info("尝试获取next_action_id")
try:
next_action_id = await _get_next_action_id_async()
logger.info(f"获取next_action_id成功: {next_action_id}")
except Exception as e:
raise RuntimeError(f"获取next_action_id失败{e}")
logger.info(f"正在为账号 {wxid} 获取密钥...")
try:
@@ -245,7 +304,7 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
headers = {
"Accept": "text/x-component",
"Next-Action": NEXT_ACTION_ID,
"Next-Action": next_action_id,
"Next-Router-State-Tree": "%5B%22%22%2C%7B%22children%22%3A%5B%22dashboard%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D",
"Origin": "https://view.free.c3o.re",
"Referer": "https://view.free.c3o.re/dashboard",