Compare commits

...

6 Commits

21 changed files with 407 additions and 148 deletions
+2
View File
@@ -227,6 +227,8 @@ export const createMessageNormalizer = ({ apiBase, getSelectedAccount, getSelect
_quoteThumbError: false,
amount: msg.amount || '',
coverUrl: msg.coverUrl || '',
objectId: String(msg.objectId || '').trim(),
objectNonceId: String(msg.objectNonceId || '').trim(),
fileSize: msg.fileSize || '',
fileMd5: msg.fileMd5 || '',
paySubType: msg.paySubType || '',
+4 -11
View File
@@ -58,7 +58,7 @@
<svg v-else class="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M4 4v5h.582m15.356 2A8.001 8.001 0 004.582 9m0 0H9m11 11v-5h-.581m0 0a8.003 8.003 0 01-15.357-2m15.357 2H15" />
</svg>
{{ isGettingDbKey ? '获取中...' : '一键获取全部密钥' }}
{{ isGettingDbKey ? '获取中...' : '一键获取数据库密钥' }}
</button>
</div>
<p v-if="formErrors.key" class="mt-1 text-sm text-red-600 flex items-center">
@@ -71,7 +71,7 @@
<svg class="w-4 h-4 mr-1 text-[#10AEEF]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"/>
</svg>
点击按钮将自动获取数据库图片双重密钥您也可以手动输入已知的64位密钥使用<a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a>等工具获取
点击按钮将自动获取数据库解密密钥您也可以手动输入已知的64位密钥
</p>
</div>
@@ -189,7 +189,7 @@
<svg class="w-4 h-4 mr-1 text-[#10AEEF]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M13 16h-1v-4h-1m1-4h.01M21 12a9 9 0 11-18 0 9 9 0 0118 0z"/>
</svg>
如果您在第一步使用了一键获取触发了云端解析下方输入框已被自动填充您也可可以使用<a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a>等工具手动获取
系统已为您尝试通过本地算法云端解析自动获取图片密钥如果输入框为空请手动填写
</p>
<div class="grid grid-cols-1 md:grid-cols-2 gap-4">
@@ -547,14 +547,7 @@ const handleGetDbKey = async () => {
if (res.data?.db_key) {
formData.key = res.data.db_key
}
// 直接把图片密钥也存好
if (res.data?.xor_key) {
manualKeys.xor_key = res.data.xor_key
}
if (res.data?.aes_key) {
manualKeys.aes_key = res.data.aes_key
}
warning.value = '🎉 数据库与图片密钥均已获取成功!'
warning.value = '🎉 数据库解密密钥已获取成功!'
// 3秒后清除成功提示,保持 UI 干净
setTimeout(() => { if(warning.value.includes('获取成功')) warning.value = '' }, 3000)
} else {
+1 -1
View File
@@ -20,7 +20,7 @@ dependencies = [
"pilk>=0.2.4",
"pypinyin>=0.53.0",
"jieba>=0.42.1",
"wx_key>=1.1.0",
"wx_key>=2.0.0",
"packaging",
"httpx",
]
@@ -3435,6 +3435,8 @@ def _parse_message_for_export(
quote_voice_length = ""
quote_title = ""
quote_content = ""
object_id = ""
object_nonce_id = ""
amount = ""
cover_url = ""
file_size = ""
@@ -3472,6 +3474,8 @@ def _parse_message_for_export(
from_username = str(parsed.get("fromUsername") or "")
link_type = str(parsed.get("linkType") or "")
link_style = str(parsed.get("linkStyle") or "")
object_id = str(parsed.get("objectId") or "")
object_nonce_id = str(parsed.get("objectNonceId") or "")
record_item = str(parsed.get("recordItem") or "")
quote_username = str(parsed.get("quoteUsername") or "")
quote_server_id = str(parsed.get("quoteServerId") or "")
@@ -3699,6 +3703,8 @@ def _parse_message_for_export(
from_username = str(parsed.get("fromUsername") or from_username)
link_type = str(parsed.get("linkType") or link_type)
link_style = str(parsed.get("linkStyle") or link_style)
object_id = str(parsed.get("objectId") or object_id)
object_nonce_id = str(parsed.get("objectNonceId") or object_nonce_id)
record_item = str(parsed.get("recordItem") or record_item)
quote_username = str(parsed.get("quoteUsername") or quote_username)
quote_server_id = str(parsed.get("quoteServerId") or quote_server_id)
@@ -3765,6 +3771,8 @@ def _parse_message_for_export(
"fromUsername": from_username,
"linkType": link_type,
"linkStyle": link_style,
"objectId": object_id,
"objectNonceId": object_nonce_id,
"recordItem": record_item,
"thumbUrl": thumb_url,
"imageMd5": image_md5,
+18
View File
@@ -1238,6 +1238,14 @@ def _parse_app_message(text: str) -> dict[str, Any]:
or (_extract_xml_tag_text(finder_feed, "username") if finder_feed else "")
or (_extract_xml_tag_text(finder_feed, "finderusername") if finder_feed else "")
)
object_id = (
(_extract_xml_tag_or_attr(finder_feed, "objectid") if finder_feed else "")
or _extract_xml_tag_or_attr(text, "objectid")
)
object_nonce_id = (
(_extract_xml_tag_or_attr(finder_feed, "objectnonceid") if finder_feed else "")
or _extract_xml_tag_or_attr(text, "objectnonceid")
)
thumb_url = _normalize_xml_url(
_extract_xml_tag_or_attr(text, "thumburl")
@@ -1277,6 +1285,8 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"fromUsername": from_u,
"linkType": "finder",
"linkStyle": "finder",
"objectId": str(object_id or "").strip(),
"objectNonceId": str(object_nonce_id or "").strip(),
}
if app_type in (33, 36):
@@ -2418,6 +2428,8 @@ def _row_to_search_hit(
quote_thumb_url = ""
link_type = ""
link_style = ""
object_id = ""
object_nonce_id = ""
amount = ""
pay_sub_type = ""
transfer_status = ""
@@ -2441,6 +2453,8 @@ def _row_to_search_hit(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or "")
link_type = str(parsed.get("linkType") or "")
link_style = str(parsed.get("linkStyle") or "")
object_id = str(parsed.get("objectId") or "")
object_nonce_id = str(parsed.get("objectNonceId") or "")
quote_username = str(parsed.get("quoteUsername") or "")
amount = str(parsed.get("amount") or "")
pay_sub_type = str(parsed.get("paySubType") or "")
@@ -2526,6 +2540,8 @@ def _row_to_search_hit(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or quote_thumb_url)
link_type = str(parsed.get("linkType") or link_type)
link_style = str(parsed.get("linkStyle") or link_style)
object_id = str(parsed.get("objectId") or object_id)
object_nonce_id = str(parsed.get("objectNonceId") or object_nonce_id)
amount = str(parsed.get("amount") or amount)
pay_sub_type = str(parsed.get("paySubType") or pay_sub_type)
quote_username = str(parsed.get("quoteUsername") or quote_username)
@@ -2567,6 +2583,8 @@ def _row_to_search_hit(
"url": url,
"linkType": link_type,
"linkStyle": link_style,
"objectId": object_id,
"objectNonceId": object_nonce_id,
"quoteUsername": quote_username,
"quoteTitle": quote_title,
"quoteContent": quote_content,
+86 -102
View File
@@ -32,81 +32,11 @@ logger = logging.getLogger(__name__)
# ====================== 以下是hook逻辑 ======================================
@dataclass
class HookConfig:
min_version: str
pattern: str
mask: str
offset: int
md5_pattern: str = ""
md5_mask: str = ""
md5_offset: int = 0
class WeChatKeyFetcher:
def __init__(self):
self.process_name = "Weixin.exe"
self.timeout_seconds = 60
@staticmethod
def _hex_array_to_str(hex_array: List[int]) -> str:
return " ".join([f"{b:02X}" for b in hex_array])
def _get_hook_config(self, version_str: str) -> Optional[HookConfig]:
try:
v_curr = pkg_version.parse(version_str)
except Exception as e:
logger.error(f"版本号解析失败: {version_str} || {e}")
return None
if v_curr > pkg_version.parse("4.1.6.14"):
return HookConfig(
min_version=">4.1.6.14",
pattern=self._hex_array_to_str([
0x24, 0x50, 0x48, 0xC7, 0x45, 0x00, 0xFE, 0xFF, 0xFF, 0xFF,
0x44, 0x89, 0xCF, 0x44, 0x89, 0xC3, 0x49, 0x89, 0xD6, 0x48,
0x89, 0xCE, 0x48, 0x89
]),
mask="xxxxxxxxxxxxxxxxxxxxxxxx",
offset=-3,
md5_pattern="48 8D 4D 00 48 89 4D B0 48 89 45 B8 48 8D 7D 00 48 8D 55 B0 48 89 F9",
md5_mask="xxx?xxxxxxxxxxx?xxxxxxx",
md5_offset=4
)
if pkg_version.parse("4.1.4") <= v_curr <= pkg_version.parse("4.1.6.14"):
return HookConfig(
min_version="4.1.4-4.1.6.14",
pattern=self._hex_array_to_str([
0x24, 0x08, 0x48, 0x89, 0x6c, 0x24, 0x10, 0x48, 0x89, 0x74,
0x00, 0x18, 0x48, 0x89, 0x7c, 0x00, 0x20, 0x41, 0x56, 0x48,
0x83, 0xec, 0x50, 0x41
]),
mask="xxxxxxxxxx?xxxx?xxxxxxxx",
offset=-3,
md5_pattern="48 8D 4D 00 48 89 4D B0 48 89 45 B8 48 8D 7D 00 48 8D 55 B0 48 89 F9",
md5_mask="xxx?xxxxxxxxxxx?xxxxxxx",
md5_offset=4
)
if v_curr < pkg_version.parse("4.1.4"):
"""图片密钥可能是错的,版本过低没有测试"""
return HookConfig(
min_version="<4.1.4",
pattern=self._hex_array_to_str([
0x24, 0x50, 0x48, 0xc7, 0x45, 0x00, 0xfe, 0xff, 0xff, 0xff,
0x44, 0x89, 0xcf, 0x44, 0x89, 0xc3, 0x49, 0x89, 0xd6, 0x48,
0x89, 0xce, 0x48, 0x89
]),
mask="xxxxxxxxxxxxxxxxxxxxxxxx",
offset=-15, # -0xf
md5_pattern="48 8D 4D 00 48 89 4D B0 48 89 45 B8 48 8D 7D 00 48 8D 55 B0 48 89 F9",
md5_mask="xxx?xxxxxxxxxxx?xxxxxxx",
md5_offset=4
)
return None
def kill_wechat(self):
"""检测并查杀微信进程"""
killed = False
@@ -125,9 +55,7 @@ class WeChatKeyFetcher:
def launch_wechat(self, exe_path: str) -> int:
"""启动微信并返回 PID"""
try:
process = subprocess.Popen(exe_path)
time.sleep(2)
candidates = []
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
@@ -135,7 +63,6 @@ class WeChatKeyFetcher:
candidates.append(proc)
if candidates:
candidates.sort(key=lambda x: x.info['create_time'], reverse=True)
target_pid = candidates[0].info['pid']
return target_pid
@@ -146,8 +73,8 @@ class WeChatKeyFetcher:
logger.error(f"启动微信失败: {e}")
raise RuntimeError(f"无法启动微信: {e}")
def fetch_key(self) -> dict:
"""调用 wx_key 获取双密钥"""
def fetch_db_key(self) -> dict:
"""调用 wx_key 获取数据库密钥 (Hook 模式)"""
if wx_key is None:
raise RuntimeError("wx_key 模块未安装或加载失败")
@@ -160,36 +87,26 @@ class WeChatKeyFetcher:
logger.info(f"Detect WeChat: {version} at {exe_path}")
config = self._get_hook_config(version)
if not config:
raise RuntimeError(f"原生获取失败:当前微信版本 ({version}) 过低,为保证稳定性,仅支持 4.1.5 及以上版本使用原生获取。")
self.kill_wechat()
pid = self.launch_wechat(exe_path)
logger.info(f"WeChat launched, PID: {pid}")
if not wx_key.initialize_hook(pid, "", config.pattern, config.mask, config.offset,
config.md5_pattern, config.md5_mask, config.md5_offset):
# 仅传入 PID,触发数据库密钥自动 Hook
if not wx_key.initialize_hook(pid):
err = wx_key.get_last_error_msg()
raise RuntimeError(f"Hook初始化失败: {err}")
raise RuntimeError(f"数据库 Hook 初始化失败: {err}")
start_time = time.time()
found_db_key = None
found_md5_data = None
try:
while True:
if time.time() - start_time > self.timeout_seconds:
raise TimeoutError("获取密钥超时 (60s),请确保在弹出的微信中完成登录。")
raise TimeoutError("获取数据库密钥超时 (60s),请确保在弹出的微信中完成登录。")
key_data = wx_key.poll_key_data()
if key_data:
if 'key' in key_data:
found_db_key = key_data['key']
if 'md5' in key_data:
found_md5_data = key_data['md5']
if found_db_key and found_md5_data:
if key_data and 'key' in key_data:
found_db_key = key_data['key']
break
while True:
@@ -204,22 +121,13 @@ class WeChatKeyFetcher:
logger.info("Cleaning up hook...")
wx_key.cleanup_hook()
aes_key = None # gemini !!! ???
xor_key = None
if found_md5_data and "|" in found_md5_data:
aes_key, xor_key_dec = found_md5_data.split("|")
xor_key = f"0x{int(xor_key_dec):02X}"
return {
"db_key": found_db_key,
"aes_key": aes_key,
"xor_key": xor_key
"db_key": found_db_key
}
def get_db_key_workflow():
fetcher = WeChatKeyFetcher()
return fetcher.fetch_key()
return fetcher.fetch_db_key()
# ============================== 以下是图片密钥逻辑 =====================================
@@ -232,6 +140,82 @@ def get_wechat_internal_global_config(wx_dir: Path, file_name1) -> bytes:
return Path(target_path).read_bytes()
def try_get_local_image_keys() -> List[Dict[str, Any]]:
"""尝试通过本地算法提取图片密钥 (无需 Hook)"""
if wx_key is None or not hasattr(wx_key, 'get_image_key'):
return []
try:
res_json = wx_key.get_image_key()
if not res_json:
return []
data = json.loads(res_json)
accounts = data.get('accounts', [])
results = []
for acc in accounts:
wxid = acc.get('wxid')
keys = acc.get('keys', [])
for k in keys:
xor_key = k.get('xorKey')
aes_key = k.get('aesKey')
if xor_key is not None:
results.append({
"wxid": wxid,
"xor_key": f"0x{int(xor_key):02X}",
"aes_key": aes_key
})
return results
except Exception as e:
logger.error(f"本地提取图片密钥失败: {e}")
return []
async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
1. 优先尝试本地算法提取
2. 如果本地提取失败或未匹配到指定账号,尝试远程 API 解析
"""
# 1. 尝试本地提取
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account:
try:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
target_account_wxid = wx_id_dir.name
except:
target_account_wxid = account
if local_keys:
# 如果指定了账号,尝试在本地结果中找匹配的
if target_account_wxid:
for k in local_keys:
if k['wxid'] == target_account_wxid:
logger.info(f"成功通过本地算法匹配到账号 {target_account_wxid} 的图片密钥")
upsert_account_keys_in_store(
account=k['wxid'],
image_xor_key=k['xor_key'],
image_aes_key=k['aes_key']
)
return k
else:
# 如果没指定账号,返回第一个发现的并存入 store (如果有的话)
k = local_keys[0]
logger.info(f"本地算法提取成功 (未指定账号,返回首个): {k['wxid']}")
upsert_account_keys_in_store(
account=k['wxid'],
image_xor_key=k['xor_key'],
image_aes_key=k['aes_key']
)
return k
# 2. 本地提取失败或不匹配,尝试远程解析
logger.info("本地算法提取未命中,尝试远程 API 解析...")
return await fetch_and_save_remote_keys(account)
async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str, Any]:
account_dir = _resolve_account_dir(account)
+26
View File
@@ -3049,6 +3049,8 @@ def _append_full_messages_from_rows(
quote_thumb_url = ""
link_type = ""
link_style = ""
object_id = ""
object_nonce_id = ""
quote_server_id = ""
quote_type = ""
quote_voice_length = ""
@@ -3082,6 +3084,8 @@ def _append_full_messages_from_rows(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or "")
link_type = str(parsed.get("linkType") or "")
link_style = str(parsed.get("linkStyle") or "")
object_id = str(parsed.get("objectId") or "")
object_nonce_id = str(parsed.get("objectNonceId") or "")
quote_username = str(parsed.get("quoteUsername") or "")
quote_server_id = str(parsed.get("quoteServerId") or "")
quote_type = str(parsed.get("quoteType") or "")
@@ -3324,6 +3328,8 @@ def _append_full_messages_from_rows(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or quote_thumb_url)
link_type = str(parsed.get("linkType") or link_type)
link_style = str(parsed.get("linkStyle") or link_style)
object_id = str(parsed.get("objectId") or object_id)
object_nonce_id = str(parsed.get("objectNonceId") or object_nonce_id)
amount = str(parsed.get("amount") or amount)
cover_url = str(parsed.get("coverUrl") or cover_url)
thumb_url = str(parsed.get("thumbUrl") or thumb_url)
@@ -3382,6 +3388,8 @@ def _append_full_messages_from_rows(
"url": url,
"linkType": link_type,
"linkStyle": link_style,
"objectId": object_id,
"objectNonceId": object_nonce_id,
"from": from_name,
"fromUsername": from_username,
"recordItem": record_item,
@@ -4584,6 +4592,8 @@ def _collect_chat_messages(
quote_thumb_url = ""
link_type = ""
link_style = ""
object_id = ""
object_nonce_id = ""
quote_server_id = ""
quote_type = ""
quote_voice_length = ""
@@ -4617,6 +4627,8 @@ def _collect_chat_messages(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or "")
link_type = str(parsed.get("linkType") or "")
link_style = str(parsed.get("linkStyle") or "")
object_id = str(parsed.get("objectId") or "")
object_nonce_id = str(parsed.get("objectNonceId") or "")
quote_username = str(parsed.get("quoteUsername") or "")
quote_server_id = str(parsed.get("quoteServerId") or "")
quote_type = str(parsed.get("quoteType") or "")
@@ -4838,6 +4850,8 @@ def _collect_chat_messages(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or quote_thumb_url)
link_type = str(parsed.get("linkType") or link_type)
link_style = str(parsed.get("linkStyle") or link_style)
object_id = str(parsed.get("objectId") or object_id)
object_nonce_id = str(parsed.get("objectNonceId") or object_nonce_id)
amount = str(parsed.get("amount") or amount)
cover_url = str(parsed.get("coverUrl") or cover_url)
thumb_url = str(parsed.get("thumbUrl") or thumb_url)
@@ -4901,6 +4915,8 @@ def _collect_chat_messages(
"url": url,
"linkType": link_type,
"linkStyle": link_style,
"objectId": object_id,
"objectNonceId": object_nonce_id,
"from": from_name,
"fromUsername": from_username,
"recordItem": record_item,
@@ -5502,6 +5518,8 @@ def list_chat_messages(
quote_thumb_url = ""
link_type = ""
link_style = ""
object_id = ""
object_nonce_id = ""
quote_server_id = ""
quote_type = ""
quote_voice_length = ""
@@ -5531,6 +5549,8 @@ def list_chat_messages(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or "")
link_type = str(parsed.get("linkType") or "")
link_style = str(parsed.get("linkStyle") or "")
object_id = str(parsed.get("objectId") or "")
object_nonce_id = str(parsed.get("objectNonceId") or "")
quote_username = str(parsed.get("quoteUsername") or "")
quote_server_id = str(parsed.get("quoteServerId") or "")
quote_type = str(parsed.get("quoteType") or "")
@@ -5736,6 +5756,8 @@ def list_chat_messages(
quote_thumb_url = str(parsed.get("quoteThumbUrl") or quote_thumb_url)
link_type = str(parsed.get("linkType") or link_type)
link_style = str(parsed.get("linkStyle") or link_style)
object_id = str(parsed.get("objectId") or object_id)
object_nonce_id = str(parsed.get("objectNonceId") or object_nonce_id)
amount = str(parsed.get("amount") or amount)
cover_url = str(parsed.get("coverUrl") or cover_url)
thumb_url = str(parsed.get("thumbUrl") or thumb_url)
@@ -5788,6 +5810,8 @@ def list_chat_messages(
"url": url,
"linkType": link_type,
"linkStyle": link_style,
"objectId": object_id,
"objectNonceId": object_nonce_id,
"from": from_name,
"fromUsername": from_username,
"recordItem": record_item,
@@ -7796,6 +7820,8 @@ async def resolve_app_message(
"fromUsername": str(parsed.get("fromUsername") or "").strip(),
"linkType": str(parsed.get("linkType") or "").strip(),
"linkStyle": str(parsed.get("linkStyle") or "").strip(),
"objectId": str(parsed.get("objectId") or "").strip(),
"objectNonceId": str(parsed.get("objectNonceId") or "").strip(),
"size": str(parsed.get("size") or "").strip(),
"baseUrl": base_url,
}
+4 -4
View File
@@ -3,7 +3,7 @@ from typing import Optional
from fastapi import APIRouter
from ..key_store import get_account_keys_from_store
from ..key_service import get_db_key_workflow, fetch_and_save_remote_keys
from ..key_service import get_db_key_workflow, get_image_key_integrated_workflow
from ..media_helpers import _load_media_keys, _resolve_account_dir
from ..path_fix import PathFixRoute
@@ -97,7 +97,7 @@ async def get_image_key(account: Optional[str] = None):
4. 解析返回流,自动存入本地数据库
"""
try:
result = await fetch_and_save_remote_keys(account)
result = await get_image_key_integrated_workflow(account)
return {
"status": 0,
@@ -105,8 +105,8 @@ async def get_image_key(account: Optional[str] = None):
"data": {
"xor_key": result["xor_key"],
"aes_key": result["aes_key"],
"nick_name": result.get("nick_name"),
"account": result["wxid"]
"nick_name": result.get("nick_name", ""),
"account": result.get("wxid", "")
}
}
except FileNotFoundError as e:
+231 -23
View File
@@ -4,6 +4,8 @@ import binascii
import json
import os
import re
import socket
import subprocess
import sys
import threading
import time
@@ -128,6 +130,10 @@ _loaded_wcdb_api_dll: Optional[Path] = None
_preloaded_native_libs: list[ctypes.CDLL] = []
_protection_checked = False
_protection_result: Optional[tuple[int, str]] = None
_AUTO_SIDECAR_LOCK = threading.Lock()
_AUTO_SIDECAR_PROC: Optional[subprocess.Popen] = None
_AUTO_SIDECAR_URL = ""
_AUTO_SIDECAR_TOKEN = ""
def _is_windows() -> bool:
@@ -238,6 +244,197 @@ def _sidecar_enabled() -> bool:
return bool(_sidecar_url())
def _repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def _source_sidecar_assets() -> tuple[Path, Path, Path] | None:
if getattr(sys, "frozen", False):
return None
repo_root = _repo_root()
electron_exe = repo_root / "desktop" / "node_modules" / "electron" / "dist" / "electron.exe"
sidecar_script = repo_root / "desktop" / "src" / "wcdb-sidecar.cjs"
koffi_dir = repo_root / "desktop" / "vendor" / "koffi"
try:
if electron_exe.is_file() and sidecar_script.is_file() and koffi_dir.exists():
return electron_exe, sidecar_script, koffi_dir
except Exception:
return None
return None
def _auto_sidecar_started_here() -> bool:
with _AUTO_SIDECAR_LOCK:
return bool(_AUTO_SIDECAR_URL and _AUTO_SIDECAR_TOKEN)
def _parse_port(value: object) -> Optional[int]:
try:
raw = str(value or "").strip()
if not raw:
return None
port = int(raw, 10)
except Exception:
return None
if 1 <= port <= 65535:
return port
return None
def _pick_free_port() -> int:
requested = _parse_port(os.environ.get("WECHAT_TOOL_WCDB_SIDECAR_PORT"))
if requested is not None:
return requested
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.bind(("127.0.0.1", 0))
sock.listen(1)
return int(sock.getsockname()[1])
def _build_auto_sidecar_resource_paths(wcdb_api_dll: Path) -> list[str]:
items: list[str] = []
seen: set[str] = set()
def add(path: str | Path | None) -> None:
if not path:
return
try:
resolved = Path(path).resolve()
except Exception:
resolved = Path(path)
key = str(resolved).replace("/", "\\").rstrip("\\").lower()
if not key or key in seen:
return
seen.add(key)
items.append(str(resolved))
repo_root = _repo_root()
dll_dir = wcdb_api_dll.parent
add(dll_dir)
add(dll_dir.parent)
add(repo_root)
add(repo_root / "resources")
data_dir = str(os.environ.get("WECHAT_TOOL_DATA_DIR", "") or "").strip()
if data_dir:
add(data_dir)
add(Path(data_dir) / "resources")
else:
add(Path.cwd())
add(Path.cwd() / "resources")
return items
def _stop_auto_sidecar() -> None:
global _AUTO_SIDECAR_PROC, _AUTO_SIDECAR_URL, _AUTO_SIDECAR_TOKEN
with _AUTO_SIDECAR_LOCK:
proc = _AUTO_SIDECAR_PROC
owned_url = _AUTO_SIDECAR_URL
owned_token = _AUTO_SIDECAR_TOKEN
_AUTO_SIDECAR_PROC = None
_AUTO_SIDECAR_URL = ""
_AUTO_SIDECAR_TOKEN = ""
if owned_url and os.environ.get("WECHAT_TOOL_WCDB_SIDECAR_URL") == owned_url:
os.environ.pop("WECHAT_TOOL_WCDB_SIDECAR_URL", None)
if owned_token and os.environ.get("WECHAT_TOOL_WCDB_SIDECAR_TOKEN") == owned_token:
os.environ.pop("WECHAT_TOOL_WCDB_SIDECAR_TOKEN", None)
if proc is None:
return
try:
if proc.poll() is None:
proc.terminate()
try:
proc.wait(timeout=5.0)
except Exception:
proc.kill()
except Exception:
pass
def _maybe_start_auto_sidecar() -> bool:
global _AUTO_SIDECAR_PROC, _AUTO_SIDECAR_URL, _AUTO_SIDECAR_TOKEN
if _sidecar_enabled() or not _is_windows():
return False
assets = _source_sidecar_assets()
if not assets:
return False
wcdb_api_dll = _resolve_wcdb_api_dll_path()
try:
if not wcdb_api_dll.exists():
return False
except Exception:
return False
electron_exe, sidecar_script, koffi_dir = assets
repo_root = _repo_root()
with _AUTO_SIDECAR_LOCK:
proc = _AUTO_SIDECAR_PROC
if proc is not None and proc.poll() is None and _AUTO_SIDECAR_URL and _AUTO_SIDECAR_TOKEN:
os.environ["WECHAT_TOOL_WCDB_SIDECAR_URL"] = _AUTO_SIDECAR_URL
os.environ["WECHAT_TOOL_WCDB_SIDECAR_TOKEN"] = _AUTO_SIDECAR_TOKEN
return True
if proc is not None and proc.poll() is not None:
_AUTO_SIDECAR_PROC = None
_AUTO_SIDECAR_URL = ""
_AUTO_SIDECAR_TOKEN = ""
port = _pick_free_port()
token = os.urandom(24).hex()
url = f"http://127.0.0.1:{port}"
env = os.environ.copy()
env.update(
{
"ELECTRON_RUN_AS_NODE": "1",
"WECHAT_TOOL_WCDB_SIDECAR_HOST": "127.0.0.1",
"WECHAT_TOOL_WCDB_SIDECAR_PORT": str(port),
"WECHAT_TOOL_WCDB_SIDECAR_TOKEN": token,
"WECHAT_TOOL_WCDB_API_DLL_PATH": str(wcdb_api_dll),
"WECHAT_TOOL_WCDB_DLL_DIR": str(wcdb_api_dll.parent),
"WECHAT_TOOL_WCDB_RESOURCE_PATHS": json.dumps(
_build_auto_sidecar_resource_paths(wcdb_api_dll), ensure_ascii=False
),
"WECHAT_TOOL_KOFFI_DIR": str(koffi_dir),
}
)
creationflags = int(getattr(subprocess, "CREATE_NO_WINDOW", 0) or 0)
try:
proc = subprocess.Popen(
[str(electron_exe), str(sidecar_script)],
cwd=str(repo_root),
env=env,
stdin=subprocess.DEVNULL,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
creationflags=creationflags,
)
except Exception as exc:
logger.warning("[wcdb] auto sidecar start failed: %s", exc)
return False
_AUTO_SIDECAR_PROC = proc
_AUTO_SIDECAR_URL = url
_AUTO_SIDECAR_TOKEN = token
os.environ["WECHAT_TOOL_WCDB_SIDECAR_URL"] = url
os.environ["WECHAT_TOOL_WCDB_SIDECAR_TOKEN"] = token
logger.info("[wcdb] auto-started electron sidecar url=%s dll=%s", _AUTO_SIDECAR_URL, wcdb_api_dll)
return True
def _sidecar_call(action: str, payload: Optional[dict[str, Any]] = None, *, timeout: float = 30.0) -> dict[str, Any]:
base_url = _sidecar_url()
if not base_url:
@@ -476,30 +673,37 @@ def _load_wcdb_lib() -> ctypes.CDLL:
def _ensure_initialized() -> None:
global _initialized, _loaded_wcdb_api_dll, _protection_result
_maybe_start_auto_sidecar()
if _sidecar_enabled():
with _lib_lock:
if _initialized:
return
result = _sidecar_call("init", timeout=30.0)
dll_path = str(result.get("dllPath") or "").strip()
if dll_path:
try:
_loaded_wcdb_api_dll = Path(dll_path)
except Exception:
pass
protection = result.get("protection")
if isinstance(protection, list):
for item in protection:
if isinstance(item, dict) and "rc" in item:
try:
_protection_result = (int(item.get("rc")), str(item.get("path") or ""))
if int(item.get("rc")) == 0:
break
except Exception:
continue
with _lib_lock:
_initialized = True
return
try:
result = _sidecar_call("init", timeout=30.0)
dll_path = str(result.get("dllPath") or "").strip()
if dll_path:
try:
_loaded_wcdb_api_dll = Path(dll_path)
except Exception:
pass
protection = result.get("protection")
if isinstance(protection, list):
for item in protection:
if isinstance(item, dict) and "rc" in item:
try:
_protection_result = (int(item.get("rc")), str(item.get("path") or ""))
if int(item.get("rc")) == 0:
break
except Exception:
continue
with _lib_lock:
_initialized = True
return
except Exception:
if not _auto_sidecar_started_here():
raise
logger.warning("[wcdb] auto sidecar init failed, fallback to in-process wcdb")
_stop_auto_sidecar()
lib = _load_wcdb_lib()
with _lib_lock:
@@ -1188,13 +1392,15 @@ def shutdown() -> None:
global _initialized
if _sidecar_enabled():
with _lib_lock:
if not _initialized:
return
should_shutdown = bool(_initialized)
try:
_sidecar_call("shutdown", timeout=5.0)
if should_shutdown:
_sidecar_call("shutdown", timeout=5.0)
finally:
with _lib_lock:
_initialized = False
if _auto_sidecar_started_here():
_stop_auto_sidecar()
return
lib = _load_wcdb_lib()
@@ -1205,6 +1411,8 @@ def shutdown() -> None:
lib.wcdb_shutdown()
finally:
_initialized = False
if _auto_sidecar_started_here():
_stop_auto_sidecar()
def _resolve_session_db_path(db_storage_dir: Path) -> Path:
+20
View File
@@ -148,6 +148,26 @@ class TestParseAppMessage(unittest.TestCase):
self.assertEqual(parsed.get("thumbUrl"), "https://finder.video.qq.com/cover.jpg")
self.assertEqual(parsed.get("url"), "https://channels.weixin.qq.com/web/pages/feed?feedid=abc")
def test_finder_type_51_exposes_object_fields(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>当前版本不支持展示该内容,请升级至最新版本。</title>'
'<des></des>'
'<type>51</type>'
'<finderFeed>'
'<nickname><![CDATA[央视新闻]]></nickname>'
'<objectId><![CDATA[1234567890]]></objectId>'
'<objectNonceId><![CDATA[nonce-abc]]></objectNonceId>'
'</finderFeed>'
'</appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("linkType"), "finder")
self.assertEqual(parsed.get("objectId"), "1234567890")
self.assertEqual(parsed.get("objectNonceId"), "nonce-abc")
def test_quote_type_5_nested_xml_refermsg_uses_inner_title(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
Generated
+7 -7
View File
@@ -1,5 +1,5 @@
version = 1
revision = 2
revision = 3
requires-python = ">=3.11"
[[package]]
@@ -919,7 +919,7 @@ requires-dist = [
{ name = "requests", specifier = ">=2.32.4" },
{ name = "typing-extensions", specifier = ">=4.8.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0" },
{ name = "wx-key", specifier = ">=1.1.0" },
{ name = "wx-key", specifier = ">=2.0.0" },
{ name = "zstandard", specifier = ">=0.23.0" },
]
provides-extras = ["build"]
@@ -935,13 +935,13 @@ wheels = [
[[package]]
name = "wx-key"
version = "1.1.0"
version = "2.0.0"
source = { registry = "tools/key_wheels" }
wheels = [
{ path = "wx_key-1.1.0-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-1.1.0-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-1.1.0-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-1.1.0-cp314-cp314-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp314-cp314-win_amd64.whl" },
]
[[package]]