mirror of
https://github.com/LifeArchiveProject/WeChatDataAnalysis.git
synced 2026-02-19 14:20:51 +08:00
feat: support get image key!!
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
# import sys
|
||||
# import requests
|
||||
|
||||
try:
|
||||
import wx_key
|
||||
@@ -10,15 +11,25 @@ except ImportError:
|
||||
import time
|
||||
import psutil
|
||||
import subprocess
|
||||
import hashlib
|
||||
import os
|
||||
import json
|
||||
import random
|
||||
import logging
|
||||
from typing import Optional, List
|
||||
import httpx
|
||||
from pathlib import Path
|
||||
from typing import Optional, List, Dict, Any
|
||||
from dataclasses import dataclass
|
||||
from packaging import version as pkg_version # 建议使用 packaging 库处理版本比较
|
||||
from .wechat_detection import detect_wechat_installation
|
||||
from .key_store import upsert_account_keys_in_store
|
||||
from .media_helpers import _resolve_account_dir, _resolve_account_wxid_dir
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ====================== 以下是hook逻辑 ======================================
|
||||
|
||||
@dataclass
|
||||
class HookConfig:
|
||||
min_version: str
|
||||
@@ -185,3 +196,169 @@ class WeChatKeyFetcher:
|
||||
def get_db_key_workflow():
|
||||
fetcher = WeChatKeyFetcher()
|
||||
return fetcher.fetch_key()
|
||||
|
||||
|
||||
# ============================== 以下是图片密钥逻辑 =====================================
|
||||
|
||||
|
||||
# 远程 API 配置
|
||||
REMOTE_URL = "https://view.free.c3o.re/dashboard"
|
||||
NEXT_ACTION_ID = "7c8f99280c70626ccf5960cc4a68f368197e15f8e9"
|
||||
|
||||
|
||||
def get_wechat_internal_global_config(wx_dir: Path, file_name1) -> bytes:
|
||||
"""
|
||||
获取 Blob 1: 微信内部的 global_config 文件
|
||||
路径逻辑: account_dir (wxid_xxx) -> parent (xwechat_files) -> all_users -> config -> global_config
|
||||
"""
|
||||
xwechat_files_root = wx_dir.parent
|
||||
|
||||
target_path = os.path.join(xwechat_files_root, "all_users", "config", file_name1)
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
logger.error(f"未找到微信内部 global_config: {target_path}")
|
||||
raise FileNotFoundError(f"找不到文件: {target_path},请确认微信数据目录结构是否完整")
|
||||
|
||||
return Path(target_path).read_bytes()
|
||||
|
||||
|
||||
# def get_local_config_sha3_224() -> bytes:
|
||||
# """
|
||||
# 不要在意,抽象的实现 哈哈哈
|
||||
# """
|
||||
# content = json.dumps({
|
||||
# "wxfile_dir": "C:\\Users\\17078\\xwechat_files",
|
||||
# "weixin_id_folder": "wxid_lnyf4hdo9csb12_f1c4",
|
||||
# "cache_dir": "C:\\Users\\17078\\Desktop\\wxDBHook\\test\\wx-dat\\wx-dat\\.cache",
|
||||
# "db_key": "",
|
||||
# "port": 8001
|
||||
# }, indent=4).encode("utf-8")
|
||||
#
|
||||
# # 计算 SHA3-224
|
||||
# digest = hashlib.sha3_224(content).digest()
|
||||
# return digest
|
||||
|
||||
# async def log_request(request):
|
||||
# print(f"--- Request Raw ---")
|
||||
# print(f"{request.method} {request.url} {request.extensions.get('http_version', b'HTTP/1.1').decode()}")
|
||||
# for name, value in request.headers.items():
|
||||
# print(f"{name}: {value}")
|
||||
#
|
||||
# print()
|
||||
#
|
||||
# body = request.read()
|
||||
# if body:
|
||||
# print(body.decode(errors='replace'))
|
||||
# print(f"-------------------\n")
|
||||
|
||||
|
||||
async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str, Any]:
|
||||
# 1. 确定账号目录和 WXID
|
||||
account_dir = _resolve_account_dir(account)
|
||||
wx_id_dir = _resolve_account_wxid_dir(account_dir)
|
||||
wxid = wx_id_dir.name
|
||||
|
||||
logger.info(f"正在为账号 {wxid} 获取密钥...")
|
||||
|
||||
try:
|
||||
blob1_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1= "global_config") # 估计这是唯一有效的数据!!
|
||||
logger.info(f"获取微信内部配置成功,大小: {len(blob1_bytes)} bytes")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取微信内部文件失败: {e}")
|
||||
|
||||
try:
|
||||
blob2_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1= "global_config.crc")
|
||||
logger.info(f"获取微信内部配置成功,大小: {len(blob2_bytes)} bytes")
|
||||
except Exception as e:
|
||||
raise RuntimeError(f"读取微信内部文件失败: {e}")
|
||||
|
||||
# Blob 3: 空 (联系人)
|
||||
blob3_bytes = b""
|
||||
|
||||
|
||||
# 3. 构造请求
|
||||
headers = {
|
||||
"Accept": "text/x-component",
|
||||
"Next-Action": NEXT_ACTION_ID,
|
||||
"Next-Router-State-Tree": "%5B%22%22%2C%7B%22children%22%3A%5B%22dashboard%22%2C%7B%22children%22%3A%5B%22__PAGE__%22%2C%7B%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%5D%7D%2Cnull%2Cnull%2Ctrue%5D",
|
||||
"Origin": "https://view.free.c3o.re",
|
||||
"Referer": "https://view.free.c3o.re/dashboard",
|
||||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36"
|
||||
}
|
||||
|
||||
files = {
|
||||
'1': ('blob', blob1_bytes, 'application/octet-stream'),
|
||||
'2': ('blob', blob2_bytes, 'application/octet-stream'),
|
||||
'3': ('blob', blob3_bytes, 'application/octet-stream'),
|
||||
'0': (None, json.dumps([wxid, "$A1", "$A2", "$A3", 0],separators=(",",":")).encode('utf-8')),
|
||||
}
|
||||
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
logger.info("向远程服务器发送请求...")
|
||||
response = await client.post(REMOTE_URL, headers=headers, files=files)
|
||||
|
||||
if response.status_code != 200:
|
||||
raise RuntimeError(f"远程服务器错误: {response.status_code} - {response.text[:100]}")
|
||||
|
||||
|
||||
result_data = {}
|
||||
lines = response.text.split('\n')
|
||||
|
||||
found_config = False
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith('1:'):
|
||||
try:
|
||||
json_part = line[2:] # 去掉 "1:"
|
||||
data_obj = json.loads(json_part)
|
||||
|
||||
if "config" in data_obj:
|
||||
config = data_obj["config"]
|
||||
result_data = {
|
||||
"xor_key": config.get("xor_key", ""),
|
||||
"aes_key": config.get("aes_key", ""),
|
||||
"nick_name": config.get("nick_name", ""),
|
||||
"avatar_url": config.get("avatar_url", "")
|
||||
}
|
||||
found_config = True
|
||||
break
|
||||
except Exception as e:
|
||||
logger.warning(f"解析响应行失败: {e}")
|
||||
continue
|
||||
|
||||
if not found_config or not result_data.get("aes_key"):
|
||||
logger.error(f"响应中未找到密钥信息。Full Response: {response.text[:500]}")
|
||||
raise RuntimeError("解析失败: 服务器未返回 config 数据")
|
||||
|
||||
# 6. 处理并保存密钥
|
||||
xor_raw = str(result_data["xor_key"])
|
||||
aes_val = str(result_data["aes_key"])
|
||||
|
||||
# 转换 XOR Key (服务器返回的是十进制字符串 "178")
|
||||
try:
|
||||
if xor_raw.startswith("0x"):
|
||||
xor_int = int(xor_raw, 16)
|
||||
else:
|
||||
xor_int = int(xor_raw)
|
||||
xor_hex_str = f"0x{xor_int:02X}" # 格式化为 0xB2
|
||||
except:
|
||||
xor_hex_str = xor_raw # 转换失败则原样保留
|
||||
|
||||
# 保存到数据库/Store
|
||||
upsert_account_keys_in_store(
|
||||
account=wxid,
|
||||
image_xor_key=xor_hex_str,
|
||||
image_aes_key=aes_val
|
||||
)
|
||||
|
||||
return {
|
||||
"wxid": wxid,
|
||||
"xor_key": xor_hex_str,
|
||||
"aes_key": aes_val,
|
||||
"nick_name": result_data["nick_name"]
|
||||
}
|
||||
|
||||
|
||||
@@ -3,7 +3,7 @@ from typing import Optional
|
||||
from fastapi import APIRouter
|
||||
|
||||
from ..key_store import get_account_keys_from_store
|
||||
from ..key_service import get_db_key_workflow
|
||||
from ..key_service import get_db_key_workflow, fetch_and_save_remote_keys
|
||||
from ..media_helpers import _load_media_keys, _resolve_account_dir
|
||||
from ..path_fix import PathFixRoute
|
||||
|
||||
@@ -86,3 +86,42 @@ async def get_wechat_db_key():
|
||||
"errmsg": f"获取失败: {str(e)}",
|
||||
"data": {}
|
||||
}
|
||||
|
||||
|
||||
@router.get("/api/get_image_key", summary="获取并保存微信图片密钥")
|
||||
async def get_image_key(account: Optional[str] = None):
|
||||
"""
|
||||
通过模拟 Next.js Server Action 协议,利用本地微信配置文件换取 AES/XOR 密钥。
|
||||
|
||||
1. 读取 [wx_dir]/all_users/config/global_config (Blob 1)
|
||||
2. 计算 本地 global_config (JSON) 的 SHA3-224 (Blob 2)
|
||||
3. 构造 Multipart 包发送至远程服务器
|
||||
4. 解析返回流,自动存入本地数据库
|
||||
"""
|
||||
try:
|
||||
result = await fetch_and_save_remote_keys(account)
|
||||
|
||||
return {
|
||||
"status": 0,
|
||||
"errmsg": "ok",
|
||||
"data": {
|
||||
"xor_key": result["xor_key"],
|
||||
"aes_key": result["aes_key"],
|
||||
"nick_name": result.get("nick_name"),
|
||||
"account": result["wxid"]
|
||||
}
|
||||
}
|
||||
except FileNotFoundError as e:
|
||||
return {
|
||||
"status": -1,
|
||||
"errmsg": f"文件缺失: {str(e)}",
|
||||
"data": {}
|
||||
}
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
return {
|
||||
"status": -1,
|
||||
"errmsg": f"获取失败: {str(e)}",
|
||||
"data": {}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user