fix(key): 支持通过 db_storage_path 精确获取当前账号图片密钥

- 前端获取图片密钥时补充透传 db_storage_path / wxid_dir
- 后端支持通过 db_storage_path 反推出目标 wxid_dir
- 本地图片密钥匹配改为账号精确匹配,避免子串误命中
- 切换账号时重置并重新预填密钥,避免跨账号串用旧密钥
- 增加单测,覆盖精确匹配和未完成数据库解密时的远程获取场景
This commit is contained in:
2977094657
2026-04-13 18:10:53 +08:00
Unverified
parent c4334daf59
commit f5c3fed181
5 changed files with 284 additions and 57 deletions
+2
View File
@@ -578,6 +578,8 @@ export const useApi = () => {
const getImageKey = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.db_storage_path) query.set('db_storage_path', params.db_storage_path)
if (params && params.wxid_dir) query.set('wxid_dir', params.wxid_dir)
const url = '/get_image_key' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
+46 -41
View File
@@ -448,6 +448,7 @@ const error = ref('')
const warning = ref('') // 警告,用于密钥提示
const currentStep = ref(0)
const mediaAccount = ref('')
const activeKeyAccount = ref('')
const isGettingDbKey = ref(false)
// 步骤定义
@@ -485,6 +486,8 @@ const manualKeyErrors = reactive({
aes_key: ''
})
const normalizeAccountId = (value) => String(value || '').trim()
const normalizeXorKey = (value) => {
const raw = String(value || '').trim()
if (!raw) return { ok: false, value: '', message: '请输入 XOR 密钥' }
@@ -503,7 +506,7 @@ const normalizeAesKey = (value) => {
}
const prefillKeysForAccount = async (account) => {
const acc = String(account || '').trim()
const acc = normalizeAccountId(account)
if (!acc) return
try {
const resp = await getSavedKeys({ account: acc })
@@ -529,6 +532,44 @@ const prefillKeysForAccount = async (account) => {
}
}
const tryAutoFetchImageKeys = async (account) => {
const acc = normalizeAccountId(account)
if (!acc) return
if (String(manualKeys.xor_key || '').trim() || String(manualKeys.aes_key || '').trim()) return
warning.value = '正在通过云端/本地算法自动获取图片密钥,请稍候...'
try {
const imgRes = await getImageKey({
account: acc,
db_storage_path: String(formData.db_storage_path || '').trim()
})
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if (warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
}
}
const ensureKeysForAccount = async (account) => {
const acc = normalizeAccountId(account)
if (!acc) return
if (activeKeyAccount.value && activeKeyAccount.value !== acc) {
clearManualKeys()
}
activeKeyAccount.value = acc
await prefillKeysForAccount(acc)
await tryAutoFetchImageKeys(acc)
}
const handleGetDbKey = async () => {
if (isGettingDbKey.value) return
isGettingDbKey.value = true
@@ -605,6 +646,7 @@ const clearManualKeys = () => {
manualKeyErrors.aes_key = ''
mediaKeys.xor_key = ''
mediaKeys.aes_key = ''
activeKeyAccount.value = ''
}
// 图片解密相关
@@ -759,26 +801,7 @@ const handleDecrypt = async () => {
} catch (e) {}
currentStep.value = 1
await prefillKeysForAccount(mediaAccount.value)
if (!manualKeys.xor_key && !manualKeys.aes_key) {
warning.value = '正在通过云端/本地算法自动获取图片密钥,请稍候...'
try {
const params = mediaAccount.value ? { account: mediaAccount.value } : {}
const imgRes = await getImageKey(params)
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if(warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
}
}
await ensureKeysForAccount(mediaAccount.value)
} else if (result.status === 'failed') {
if (result.failure_count > 0 && result.success_count === 0) {
@@ -863,25 +886,7 @@ const handleDecrypt = async () => {
if (data.status === 'completed') {
currentStep.value = 1
await prefillKeysForAccount(mediaAccount.value)
if (!manualKeys.xor_key && !manualKeys.aes_key) {
warning.value = '正在通过云端/本地算法自动获取图片密钥,请稍候...'
try {
const params = mediaAccount.value ? { account: mediaAccount.value } : {}
const imgRes = await getImageKey(params)
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if(warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
}
}
await ensureKeysForAccount(mediaAccount.value)
} else if (data.status === 'failed') {
error.value = data.message || '所有文件解密失败'
} else {
@@ -1063,7 +1068,7 @@ onMounted(async () => {
}
// 清除sessionStorage
sessionStorage.removeItem('selectedAccount')
await prefillKeysForAccount(mediaAccount.value)
await ensureKeysForAccount(mediaAccount.value)
} catch (e) {
console.error('解析账户信息失败:', e)
}
+72 -14
View File
@@ -30,6 +30,43 @@ from .media_helpers import _resolve_account_dir, _resolve_account_wxid_dir
logger = logging.getLogger(__name__)
def _resolve_wxid_dir_for_image_key(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Path:
explicit_wxid_dir = str(wxid_dir or "").strip()
if explicit_wxid_dir:
candidate = Path(explicit_wxid_dir).expanduser()
if candidate.exists() and candidate.is_dir():
return candidate
raise FileNotFoundError(f"指定的 wxid_dir 不存在或不是目录: {candidate}")
explicit_db_storage_path = str(db_storage_path or "").strip()
if explicit_db_storage_path:
db_storage_dir = Path(explicit_db_storage_path).expanduser()
if db_storage_dir.exists() and db_storage_dir.is_dir():
if db_storage_dir.name.lower() == "db_storage":
candidate = db_storage_dir.parent
if candidate.exists() and candidate.is_dir():
return candidate
nested_db_storage = db_storage_dir / "db_storage"
if nested_db_storage.exists() and nested_db_storage.is_dir():
return db_storage_dir
if account:
try:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
if wx_id_dir:
return wx_id_dir
except Exception:
pass
raise FileNotFoundError("无法定位该账号的 wxid_dir,请传入有效的 db_storage_path 或先完成数据库解密")
# ====================== 以下是hook逻辑 ======================================
class WeChatKeyFetcher:
@@ -171,7 +208,12 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
return []
async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Dict[str, Any]:
async def get_image_key_integrated_workflow(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
1. 优先尝试本地算法提取
@@ -181,22 +223,26 @@ async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Di
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account:
if account or wxid_dir or db_storage_path:
try:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
target_account_wxid = wx_id_dir.name
except:
resolved_wxid_dir = _resolve_wxid_dir_for_image_key(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
target_account_wxid = resolved_wxid_dir.name
except Exception:
target_account_wxid = account
target_account_wxid = str(target_account_wxid or "").strip().lower()
if local_keys:
# 如果指定了账号,尝试在本地结果中找匹配的
if target_account_wxid:
for k in local_keys:
if k['wxid'] in target_account_wxid:
logger.info(f"成功通过本地算法匹配到账号 {target_account_wxid} 的图片密钥")
local_wxid = str(k.get("wxid") or "").strip().lower()
if local_wxid and local_wxid == target_account_wxid:
upsert_account_keys_in_store(
account=k['wxid'],
account=str(k.get("wxid") or "").strip(),
image_xor_key=k['xor_key'],
image_aes_key=k['aes_key']
)
@@ -215,12 +261,24 @@ async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Di
# 2. 本地提取失败或不匹配,尝试远程解析
logger.info("本地算法提取未命中,尝试远程 API 解析...")
return await fetch_and_save_remote_keys(account)
return await fetch_and_save_remote_keys(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str, Any]:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
async def fetch_and_save_remote_keys(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
wx_id_dir = _resolve_wxid_dir_for_image_key(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
wxid = wx_id_dir.name
url = "https://view.free.c3o.re/api/key"
@@ -274,4 +332,4 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
"xor_key": xor_hex_str,
"aes_key": aes_val,
"nick_name": config.get("nickName", config.get("nick_name", ""))
}
}
+10 -2
View File
@@ -87,7 +87,11 @@ async def get_wechat_db_key():
@router.get("/api/get_image_key", summary="获取并保存微信图片密钥")
async def get_image_key(account: Optional[str] = None):
async def get_image_key(
account: Optional[str] = None,
db_storage_path: Optional[str] = None,
wxid_dir: Optional[str] = None,
):
"""
通过模拟 Next.js Server Action 协议,利用本地微信配置文件换取 AES/XOR 密钥。
@@ -97,7 +101,11 @@ async def get_image_key(account: Optional[str] = None):
4. 解析返回流,自动存入本地数据库
"""
try:
result = await get_image_key_integrated_workflow(account)
result = await get_image_key_integrated_workflow(
account,
db_storage_path=db_storage_path,
wxid_dir=wxid_dir,
)
return {
"status": 0,
@@ -0,0 +1,154 @@
import asyncio
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
import wechat_decrypt_tool.key_service as key_service
class TestKeyServiceImageKeyAccountMatch(unittest.TestCase):
def test_local_image_keys_do_not_match_by_substring(self) -> None:
remote_result = {
"wxid": "wxid_demo_extra",
"xor_key": "0x8A",
"aes_key": "BBBBBBBBBBBBBBBB",
}
with mock.patch.object(
key_service,
"try_get_local_image_keys",
return_value=[
{"wxid": "wxid_demo", "xor_key": "0x01", "aes_key": "AAAAAAAAAAAAAAAA"},
],
), mock.patch.object(
key_service,
"_resolve_account_dir",
return_value=Path("D:/tmp/output/databases/wxid_demo_extra"),
), mock.patch.object(
key_service,
"_resolve_account_wxid_dir",
return_value=Path("D:/tmp/xwechat_files/wxid_demo_extra"),
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock, mock.patch.object(
key_service,
"fetch_and_save_remote_keys",
new=mock.AsyncMock(return_value=remote_result),
) as remote_mock:
result = asyncio.run(key_service.get_image_key_integrated_workflow("wxid_demo_extra"))
self.assertEqual(result, remote_result)
remote_mock.assert_awaited_once_with("wxid_demo_extra", wxid_dir=None, db_storage_path=None)
upsert_mock.assert_not_called()
def test_local_image_keys_require_exact_account_match(self) -> None:
with mock.patch.object(
key_service,
"try_get_local_image_keys",
return_value=[
{"wxid": "wxid_demo", "xor_key": "0x01", "aes_key": "AAAAAAAAAAAAAAAA"},
{"wxid": "wxid_demo_extra", "xor_key": "0x8A", "aes_key": "BBBBBBBBBBBBBBBB"},
],
), mock.patch.object(
key_service,
"_resolve_account_dir",
return_value=Path("D:/tmp/output/databases/wxid_demo_extra"),
), mock.patch.object(
key_service,
"_resolve_account_wxid_dir",
return_value=Path("D:/tmp/xwechat_files/wxid_demo_extra"),
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock, mock.patch.object(
key_service,
"fetch_and_save_remote_keys",
new=mock.AsyncMock(side_effect=AssertionError("remote should not be called")),
):
result = asyncio.run(key_service.get_image_key_integrated_workflow("wxid_demo_extra"))
self.assertEqual(result["wxid"], "wxid_demo_extra")
self.assertEqual(result["xor_key"], "0x8A")
self.assertEqual(result["aes_key"], "BBBBBBBBBBBBBBBB")
upsert_mock.assert_called_once_with(
account="wxid_demo_extra",
image_xor_key="0x8A",
image_aes_key="BBBBBBBBBBBBBBBB",
)
def test_fetch_remote_keys_can_use_db_storage_path_without_decrypted_output(self) -> None:
with TemporaryDirectory() as temp_dir:
wxid_dir = Path(temp_dir) / "xwechat_files" / "wxid_v4mbduwqtzpt22"
db_storage_dir = wxid_dir / "db_storage"
db_storage_dir.mkdir(parents=True, exist_ok=True)
class _FakeResponse:
status_code = 200
@staticmethod
def json():
return {
"xorKey": "138",
"aesKey": "c3f3366e23628242",
"nickName": "demo",
}
class _FakeAsyncClient:
def __init__(self, *args, **kwargs):
pass
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def post(self, url, data=None, files=None):
self.last_url = url
self.last_data = data
self.last_files = files
return _FakeResponse()
with mock.patch.object(
key_service,
"_resolve_account_dir",
side_effect=AssertionError("should not require decrypted account dir"),
), mock.patch.object(
key_service,
"get_wechat_internal_global_config",
side_effect=[b"global-config", b"crc-bytes"],
), mock.patch.object(
key_service.httpx,
"AsyncClient",
_FakeAsyncClient,
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock:
result = asyncio.run(
key_service.fetch_and_save_remote_keys(
"wxid_v4mbduwqtzpt22",
db_storage_path=str(db_storage_dir),
)
)
self.assertEqual(result["wxid"], "wxid_v4mbduwqtzpt22")
self.assertEqual(result["xor_key"], "0x8A")
self.assertEqual(result["aes_key"], "c3f3366e23628242")
upsert_mock.assert_called_once_with(
account="wxid_v4mbduwqtzpt22",
image_xor_key="0x8A",
image_aes_key="c3f3366e23628242",
)
if __name__ == "__main__":
unittest.main()