Compare commits

...

5 Commits

10 changed files with 1016 additions and 137 deletions
+8 -2
View File
@@ -575,8 +575,14 @@ export const useApi = () => {
}
// 获取图片密钥
const getImageKey = async () => {
return await request('/get_image_key')
const getImageKey = async (params = {}) => {
const query = new URLSearchParams()
if (params && params.account) query.set('account', params.account)
if (params && params.db_storage_path) query.set('db_storage_path', params.db_storage_path)
if (params && params.wxid_dir) query.set('wxid_dir', params.wxid_dir)
const url = '/get_image_key' + (query.toString() ? `?${query.toString()}` : '')
return await request(url)
}
// 枚举服务号信息
+247 -60
View File
@@ -272,7 +272,7 @@
<div class="w-full bg-gray-200 rounded-full h-2.5 overflow-hidden">
<div
class="h-2.5 rounded-full transition-all duration-300 ease-out"
:class="decryptProgress.status === 'complete' ? 'bg-[#07C160]' : 'bg-[#91D300]'"
:class="decryptProgress.status === 'complete' ? 'bg-[#07C160]' : decryptProgress.status === 'cancelled' ? 'bg-[#FAAD14]' : 'bg-[#91D300]'"
:style="{ width: progressPercent + '%' }"
></div>
</div>
@@ -366,6 +366,13 @@
</svg>
{{ mediaDecrypting ? '解密中...' : (mediaDecryptResult ? '重新解密' : '开始解密图片') }}
</button>
<button
v-if="mediaDecrypting"
@click="cancelMediaDecrypt"
class="inline-flex items-center px-6 py-3 bg-[#FA5151] text-white rounded-lg font-medium hover:bg-[#E54D4D] transition-all duration-200"
>
停止解密
</button>
<button
@click="skipToChat"
:disabled="mediaDecrypting"
@@ -441,6 +448,7 @@ const error = ref('')
const warning = ref('') // 警告,用于密钥提示
const currentStep = ref(0)
const mediaAccount = ref('')
const activeKeyAccount = ref('')
const isGettingDbKey = ref(false)
// 步骤定义
@@ -478,6 +486,46 @@ const manualKeyErrors = reactive({
aes_key: ''
})
const normalizeAccountId = (value) => String(value || '').trim()
const summarizeAesForLog = (value) => {
const raw = String(value || '').trim()
if (!raw) return ''
if (raw.length <= 8) return raw
return `${raw.slice(0, 4)}...${raw.slice(-4)}(len=${raw.length})`
}
const summarizeKeyStateForLog = (xorKey, aesKey) => ({
xor_key: String(xorKey || '').trim(),
aes_key: summarizeAesForLog(aesKey),
has_xor: !!String(xorKey || '').trim(),
has_aes: !!String(aesKey || '').trim()
})
const formatLogError = (error) => {
if (!error) return ''
if (error instanceof Error) {
return {
name: String(error.name || 'Error'),
message: String(error.message || ''),
stack: String(error.stack || '')
}
}
if (typeof error === 'object') {
try {
return JSON.parse(JSON.stringify(error))
} catch {}
}
return String(error)
}
const logDecryptDebug = (phase, details = {}) => {
if (process.client && typeof window !== 'undefined') {
try {
window.wechatDesktop?.logDebug?.('decrypt-page', phase, details)
} catch {}
}
try {
console.info(`[decrypt-page] ${phase}`, details)
} catch {}
}
const normalizeXorKey = (value) => {
const raw = String(value || '').trim()
if (!raw) return { ok: false, value: '', message: '请输入 XOR 密钥' }
@@ -496,8 +544,9 @@ const normalizeAesKey = (value) => {
}
const prefillKeysForAccount = async (account) => {
const acc = String(account || '').trim()
const acc = normalizeAccountId(account)
if (!acc) return
logDecryptDebug('prefill:start', { account: acc })
try {
const resp = await getSavedKeys({ account: acc })
if (!resp || resp.status !== 'success') return
@@ -517,11 +566,88 @@ const prefillKeysForAccount = async (account) => {
if (aesKey && !String(manualKeys.aes_key || '').trim()) {
manualKeys.aes_key = aesKey
}
logDecryptDebug('prefill:done', {
request_account: acc,
response_account: String(resp.account || '').trim(),
db_key_present: !!dbKey,
...summarizeKeyStateForLog(
String(keys.image_xor_key || '').trim(),
String(keys.image_aes_key || '').trim()
),
applied: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key)
})
} catch (e) {
// ignore
logDecryptDebug('prefill:error', { account: acc, error: formatLogError(e) })
}
}
const tryAutoFetchImageKeys = async (account) => {
const acc = normalizeAccountId(account)
if (!acc) return
if (String(manualKeys.xor_key || '').trim() || String(manualKeys.aes_key || '').trim()) {
logDecryptDebug('auto-fetch:skip-existing', {
account: acc,
keys: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key)
})
return
}
warning.value = '正在通过云端/本地算法自动获取图片密钥,请稍候...'
logDecryptDebug('auto-fetch:start', { account: acc })
try {
const imgRes = await getImageKey({
account: acc,
db_storage_path: String(formData.db_storage_path || '').trim()
})
logDecryptDebug('auto-fetch:response', {
account: acc,
status: imgRes?.status,
errmsg: String(imgRes?.errmsg || ''),
data_account: String(imgRes?.data?.account || '').trim(),
keys: summarizeKeyStateForLog(imgRes?.data?.xor_key, imgRes?.data?.aes_key)
})
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if (warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
logDecryptDebug('auto-fetch:error', { account: acc, error: formatLogError(e) })
}
}
const ensureKeysForAccount = async (account) => {
const acc = normalizeAccountId(account)
if (!acc) return
logDecryptDebug('ensure-keys:start', {
account: acc,
previous_account: activeKeyAccount.value,
current_manual: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key)
})
if (activeKeyAccount.value && activeKeyAccount.value !== acc) {
logDecryptDebug('ensure-keys:switch-account', {
from: activeKeyAccount.value,
to: acc,
cleared_keys: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key)
})
clearManualKeys()
}
activeKeyAccount.value = acc
await prefillKeysForAccount(acc)
await tryAutoFetchImageKeys(acc)
logDecryptDebug('ensure-keys:done', {
account: acc,
manual: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key)
})
}
const handleGetDbKey = async () => {
if (isGettingDbKey.value) return
isGettingDbKey.value = true
@@ -535,7 +661,7 @@ const handleGetDbKey = async () => {
const wxStatus = statusRes?.wx_status
if (wxStatus?.is_running) {
warning.value = '检测到微信正在运行,5秒后将终止进程并重启以获取全套密钥!'
warning.value = '检测到微信正在运行,5秒后将终止进程并重启以获取数据库密钥!'
await new Promise(resolve => setTimeout(resolve, 5000))
}
@@ -547,8 +673,7 @@ const handleGetDbKey = async () => {
if (res.data?.db_key) {
formData.key = res.data.db_key
}
warning.value = '🎉 数据库解密密钥已获取成功!'
// 3秒后清除成功提示,保持 UI 干净
warning.value = '数据库解密密钥已获取成功!'
setTimeout(() => { if(warning.value.includes('获取成功')) warning.value = '' }, 3000)
} else {
error.value = '获取失败: ' + (res?.errmsg || '未知错误')
@@ -563,7 +688,6 @@ const handleGetDbKey = async () => {
}
}
const applyManualKeys = () => {
manualKeyErrors.xor_key = ''
manualKeyErrors.aes_key = ''
@@ -594,12 +718,18 @@ const applyManualKeys = () => {
}
const clearManualKeys = () => {
logDecryptDebug('keys:clear', {
active_account: activeKeyAccount.value,
manual: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key),
applied: summarizeKeyStateForLog(mediaKeys.xor_key, mediaKeys.aes_key)
})
manualKeys.xor_key = ''
manualKeys.aes_key = ''
manualKeyErrors.xor_key = ''
manualKeyErrors.aes_key = ''
mediaKeys.xor_key = ''
mediaKeys.aes_key = ''
activeKeyAccount.value = ''
}
// 图片解密相关
@@ -671,6 +801,18 @@ const validateForm = () => {
}
let dbDecryptEventSource = null
let mediaDecryptEventSource = null
const closeMediaDecryptEventSource = () => {
try {
if (mediaDecryptEventSource) mediaDecryptEventSource.close()
} catch (e) {
// ignore
} finally {
mediaDecryptEventSource = null
}
}
onBeforeUnmount(() => {
try {
if (dbDecryptEventSource) dbDecryptEventSource.close()
@@ -679,6 +821,8 @@ onBeforeUnmount(() => {
} finally {
dbDecryptEventSource = null
}
closeMediaDecryptEventSource()
})
const resetDbDecryptProgress = () => {
@@ -691,12 +835,27 @@ const resetDbDecryptProgress = () => {
dbDecryptProgress.message = ''
}
const resetMediaDecryptProgress = () => {
decryptProgress.current = 0
decryptProgress.total = 0
decryptProgress.success_count = 0
decryptProgress.skip_count = 0
decryptProgress.fail_count = 0
decryptProgress.current_file = ''
decryptProgress.fileStatus = ''
decryptProgress.status = ''
}
// 处理解密
const handleDecrypt = async () => {
if (!validateForm()) {
return
}
logDecryptDebug('decrypt:start', {
db_storage_path: String(formData.db_storage_path || '').trim(),
db_key_length: String(formData.key || '').trim().length
})
loading.value = true
error.value = ''
warning.value = ''
@@ -720,28 +879,20 @@ const handleDecrypt = async () => {
}
try {
const accounts = Object.keys(result.account_results || {})
if (accounts.length > 0) mediaAccount.value = accounts[0]
if (accounts.length > 0) {
mediaAccount.value = accounts[0]
} else {
const match = formData.db_storage_path.match(/(wxid_[a-zA-Z0-9]+)/)
if (match) mediaAccount.value = match[1]
}
} catch (e) {}
logDecryptDebug('decrypt:completed-fallback', {
media_account: mediaAccount.value,
accounts: Object.keys(result.account_results || {})
})
currentStep.value = 1
await prefillKeysForAccount(mediaAccount.value)
if (!manualKeys.xor_key && !manualKeys.aes_key) {
warning.value = '正在通过云端备选方案自动获取图片密钥,请稍候...'
try {
const imgRes = await getImageKey({ account: mediaAccount.value })
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if(warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
}
}
await ensureKeysForAccount(mediaAccount.value)
} else if (result.status === 'failed') {
if (result.failure_count > 0 && result.success_count === 0) {
@@ -810,8 +961,17 @@ const handleDecrypt = async () => {
try {
const accounts = Object.keys(data.account_results || {})
if (accounts.length > 0) mediaAccount.value = accounts[0]
if (accounts.length > 0) {
mediaAccount.value = accounts[0]
} else {
const match = formData.db_storage_path.match(/(wxid_[a-zA-Z0-9]+)/)
if (match) mediaAccount.value = match[1]
}
} catch (e) {}
logDecryptDebug('decrypt:completed-sse', {
media_account: mediaAccount.value,
accounts: Object.keys(data.account_results || {})
})
try {
eventSource.close()
@@ -821,25 +981,7 @@ const handleDecrypt = async () => {
if (data.status === 'completed') {
currentStep.value = 1
await prefillKeysForAccount(mediaAccount.value)
// 【重点】如果刚才没有通过双 Hook 拿到图片密钥,触发云端 API 备用方案自动获取
if (!manualKeys.xor_key && !manualKeys.aes_key) {
warning.value = '正在通过云端备选方案自动获取图片密钥,请稍候...'
try {
const imgRes = await getImageKey({ account: mediaAccount.value })
if (imgRes && imgRes.status === 0) {
if (imgRes.data?.xor_key) manualKeys.xor_key = imgRes.data.xor_key
if (imgRes.data?.aes_key) manualKeys.aes_key = imgRes.data.aes_key
warning.value = '已通过云端成功获取图片密钥!'
setTimeout(() => { if(warning.value.includes('成功获取')) warning.value = '' }, 3000)
} else {
warning.value = '云端获取图片密钥失败,您可以尝试手动填写。'
}
} catch (e) {
warning.value = '网络请求失败,请手动填写图片密钥。'
}
}
await ensureKeysForAccount(mediaAccount.value)
} else if (data.status === 'failed') {
error.value = data.message || '所有文件解密失败'
} else {
@@ -877,20 +1019,18 @@ const handleDecrypt = async () => {
// 批量解密所有图片(使用SSE实时进度)
const decryptAllImages = async () => {
closeMediaDecryptEventSource()
mediaDecrypting.value = true
mediaDecryptResult.value = null
error.value = ''
warning.value = ''
logDecryptDebug('media-decrypt:start', {
account: mediaAccount.value,
keys: summarizeKeyStateForLog(mediaKeys.xor_key, mediaKeys.aes_key)
})
// 重置进度
decryptProgress.current = 0
decryptProgress.total = 0
decryptProgress.success_count = 0
decryptProgress.skip_count = 0
decryptProgress.fail_count = 0
decryptProgress.current_file = ''
decryptProgress.fileStatus = ''
decryptProgress.status = ''
resetMediaDecryptProgress()
try {
// 构建SSE URL
@@ -903,8 +1043,11 @@ const decryptAllImages = async () => {
// 使用EventSource接收SSE
const eventSource = new EventSource(url)
mediaDecryptEventSource = eventSource
eventSource.onmessage = (event) => {
if (mediaDecryptEventSource !== eventSource) return
try {
const data = JSON.parse(event.data)
@@ -928,12 +1071,23 @@ const decryptAllImages = async () => {
decryptProgress.skip_count = data.skip_count
decryptProgress.fail_count = data.fail_count
mediaDecryptResult.value = data
eventSource.close()
mediaDecrypting.value = false
logDecryptDebug('media-decrypt:complete', {
account: mediaAccount.value,
total: data.total,
success_count: data.success_count,
skip_count: data.skip_count,
fail_count: data.fail_count
})
closeMediaDecryptEventSource()
} else if (data.type === 'error') {
error.value = data.message
eventSource.close()
logDecryptDebug('media-decrypt:error-event', {
account: mediaAccount.value,
message: data.message
})
mediaDecrypting.value = false
closeMediaDecryptEventSource()
}
} catch (e) {
console.error('解析SSE消息失败:', e)
@@ -941,8 +1095,10 @@ const decryptAllImages = async () => {
}
eventSource.onerror = (e) => {
if (mediaDecryptEventSource !== eventSource) return
console.error('SSE连接错误:', e)
eventSource.close()
closeMediaDecryptEventSource()
if (mediaDecrypting.value) {
error.value = 'SSE连接中断,请重试'
mediaDecrypting.value = false
@@ -951,28 +1107,49 @@ const decryptAllImages = async () => {
} catch (err) {
error.value = err.message || '图片解密过程中发生错误'
mediaDecrypting.value = false
closeMediaDecryptEventSource()
}
}
const cancelMediaDecrypt = () => {
if (!mediaDecrypting.value) return
decryptProgress.status = 'cancelled'
mediaDecrypting.value = false
warning.value = '已停止图片解密,已完成的图片会保留。'
closeMediaDecryptEventSource()
}
// 从密钥步骤进入图片解密步骤
const goToMediaDecryptStep = async () => {
error.value = ''
warning.value = ''
// 校验并应用(未填写则允许直接进入,后端会使用已保存密钥或报错提示)
const ok = applyManualKeys()
logDecryptDebug('media-step:apply-manual', {
account: mediaAccount.value,
ok,
manual: summarizeKeyStateForLog(manualKeys.xor_key, manualKeys.aes_key),
applied: summarizeKeyStateForLog(mediaKeys.xor_key, mediaKeys.aes_key),
errors: { ...manualKeyErrors }
})
if (!ok || manualKeyErrors.xor_key || manualKeyErrors.aes_key) return
// 用户已输入 XOR 时,自动保存一次,避免下次重复输入(失败不影响继续)
if (mediaKeys.xor_key) {
try {
const aesVal = String(mediaKeys.aes_key || '').trim()
logDecryptDebug('media-step:save-keys', {
account: mediaAccount.value,
keys: summarizeKeyStateForLog(mediaKeys.xor_key, aesVal)
})
await saveMediaKeys({
account: mediaAccount.value || null,
xor_key: mediaKeys.xor_key,
aes_key: aesVal ? aesVal : null
})
} catch (e) {
// ignore
logDecryptDebug('media-step:save-keys-error', { account: mediaAccount.value, error: formatLogError(e) })
}
}
currentStep.value = 2
@@ -984,6 +1161,10 @@ const skipToChat = async () => {
const ok = applyManualKeys()
if (ok && mediaKeys.xor_key) {
const aesVal = String(mediaKeys.aes_key || '').trim()
logDecryptDebug('skip-chat:save-keys', {
account: mediaAccount.value,
keys: summarizeKeyStateForLog(mediaKeys.xor_key, aesVal)
})
await saveMediaKeys({
account: mediaAccount.value || null,
xor_key: mediaKeys.xor_key,
@@ -991,7 +1172,7 @@ const skipToChat = async () => {
})
}
} catch (e) {
// ignore
logDecryptDebug('skip-chat:save-keys-error', { account: mediaAccount.value, error: formatLogError(e) })
}
navigateTo('/chat')
}
@@ -1000,6 +1181,7 @@ const skipToChat = async () => {
onMounted(async () => {
if (process.client && typeof window !== 'undefined') {
const selectedAccount = sessionStorage.getItem('selectedAccount')
logDecryptDebug('mounted:selected-account-raw', { raw: selectedAccount || '' })
if (selectedAccount) {
try {
const account = JSON.parse(selectedAccount)
@@ -1012,9 +1194,14 @@ onMounted(async () => {
}
// 清除sessionStorage
sessionStorage.removeItem('selectedAccount')
await prefillKeysForAccount(mediaAccount.value)
logDecryptDebug('mounted:selected-account-parsed', {
account_name: String(account.account_name || '').trim(),
data_dir: String(account.data_dir || '').trim()
})
await ensureKeysForAccount(mediaAccount.value)
} catch (e) {
console.error('解析账户信息失败:', e)
logDecryptDebug('mounted:selected-account-error', { error: formatLogError(e) })
}
}
}
+27 -7
View File
@@ -41,9 +41,19 @@
<svg v-if="!loading" class="w-4 h-4 mr-2" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M3 7v10a2 2 0 002 2h14a2 2 0 002-2V9a2 2 0 00-2-2h-6l-2-2H5a2 2 0 00-2 2z"/>
</svg>
<svg v-else class="animate-spin w-4 h-4 mr-2" fill="none" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
<path class="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"></path>
<svg v-else class="w-4 h-4 mr-2 animate-spin" fill="none" viewBox="0 0 48 48" aria-hidden="true">
<circle class="opacity-20" cx="24" cy="24" r="18" stroke="currentColor" stroke-width="6"></circle>
<circle
cx="24"
cy="24"
r="18"
stroke="currentColor"
stroke-width="6"
stroke-linecap="round"
stroke-dasharray="28 72"
pathLength="100"
transform="rotate(-90 24 24)"
></circle>
</svg>
{{ loading ? '检测中...' : '手动选择目录检测' }}
</button>
@@ -53,9 +63,19 @@
<div>
<!-- 检测中状态 -->
<div v-if="loading" class="absolute inset-0 bg-white/80 backdrop-blur-sm z-20 rounded-2xl flex flex-col items-center justify-center border border-[#EDEDED]">
<svg class="w-16 h-16 animate-spin text-[#07C160]" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<circle class="opacity-25" cx="12" cy="12" r="10" stroke="currentColor" stroke-width="4"></circle>
<path class="opacity-75" fill="currentColor" d="M4 12a8 8 0 018-8V0C5.373 0 0 5.373 0 12h4zm2 5.291A7.962 7.962 0 014 12H0c0 3.042 1.135 5.824 3 7.938l3-2.647z"></path>
<svg class="w-16 h-16 animate-spin text-[#07C160]" fill="none" viewBox="0 0 48 48" aria-hidden="true">
<circle class="opacity-20" cx="24" cy="24" r="18" stroke="currentColor" stroke-width="6"></circle>
<circle
cx="24"
cy="24"
r="18"
stroke="currentColor"
stroke-width="6"
stroke-linecap="round"
stroke-dasharray="28 72"
pathLength="100"
transform="rotate(-90 24 24)"
></circle>
</svg>
<p class="mt-4 text-lg text-[#7F7F7F]">正在检测微信数据...</p>
</div>
@@ -395,4 +415,4 @@ onMounted(() => {
}
startDetection()
})
</script>
</script>
+173 -18
View File
@@ -30,6 +30,85 @@ from .media_helpers import _resolve_account_dir, _resolve_account_wxid_dir
logger = logging.getLogger(__name__)
def _summarize_aes_key(value: Any) -> str:
raw = str(value or "").strip()
if not raw:
return ""
if len(raw) <= 8:
return raw
return f"{raw[:4]}...{raw[-4:]}(len={len(raw)})"
def _summarize_key_payload(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
payload = payload or {}
return {
"wxid": str(payload.get("wxid") or "").strip(),
"xor_key": str(payload.get("xor_key") or "").strip(),
"aes_key": _summarize_aes_key(payload.get("aes_key")),
}
def _resolve_wxid_dir_for_image_key(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Path:
explicit_wxid_dir = str(wxid_dir or "").strip()
if explicit_wxid_dir:
candidate = Path(explicit_wxid_dir).expanduser()
if candidate.exists() and candidate.is_dir():
logger.info("[image_key] 使用显式 wxid_dir: %s", str(candidate))
return candidate
raise FileNotFoundError(f"指定的 wxid_dir 不存在或不是目录: {candidate}")
explicit_db_storage_path = str(db_storage_path or "").strip()
if explicit_db_storage_path:
db_storage_dir = Path(explicit_db_storage_path).expanduser()
if db_storage_dir.exists() and db_storage_dir.is_dir():
if db_storage_dir.name.lower() == "db_storage":
candidate = db_storage_dir.parent
if candidate.exists() and candidate.is_dir():
logger.info(
"[image_key] 通过 db_storage_path 反推出 wxid_dir: db_storage_path=%s wxid_dir=%s",
str(db_storage_dir),
str(candidate),
)
return candidate
nested_db_storage = db_storage_dir / "db_storage"
if nested_db_storage.exists() and nested_db_storage.is_dir():
logger.info(
"[image_key] db_storage_path 指向 wxid_dir,自动使用其子目录: wxid_dir=%s",
str(db_storage_dir),
)
return db_storage_dir
logger.info(
"[image_key] 提供的 db_storage_path 无法解析 wxid_dir: %s",
explicit_db_storage_path,
)
if account:
try:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
if wx_id_dir:
logger.info(
"[image_key] 通过已解密账号目录解析 wxid_dir: account=%s account_dir=%s wxid_dir=%s",
str(account).strip(),
str(account_dir),
str(wx_id_dir),
)
return wx_id_dir
except Exception as e:
logger.info(
"[image_key] 无法通过已解密账号目录解析 wxid_dir: account=%s error=%s",
str(account).strip(),
str(e),
)
raise FileNotFoundError("无法定位该账号的 wxid_dir,请传入有效的 db_storage_path 或先完成数据库解密")
# ====================== 以下是hook逻辑 ======================================
class WeChatKeyFetcher:
@@ -143,11 +222,13 @@ def get_wechat_internal_global_config(wx_dir: Path, file_name1) -> bytes:
def try_get_local_image_keys() -> List[Dict[str, Any]]:
"""尝试通过本地算法提取图片密钥 (无需 Hook)"""
if wx_key is None or not hasattr(wx_key, 'get_image_key'):
logger.info("[image_key] 本地算法不可用:wx_key.get_image_key 缺失")
return []
try:
res_json = wx_key.get_image_key()
if not res_json:
logger.info("[image_key] 本地算法返回空结果")
return []
data = json.loads(res_json)
@@ -165,13 +246,23 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
"xor_key": f"0x{int(xor_key):02X}",
"aes_key": aes_key
})
logger.info(
"[image_key] 本地算法完成:accounts=%s results=%s",
len(accounts),
[_summarize_key_payload(item) for item in results],
)
return results
except Exception as e:
logger.error(f"本地提取图片密钥失败: {e}")
return []
async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Dict[str, Any]:
async def get_image_key_integrated_workflow(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
1. 优先尝试本地算法提取
@@ -181,30 +272,55 @@ async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Di
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account:
if account or wxid_dir or db_storage_path:
try:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
target_account_wxid = wx_id_dir.name
except:
resolved_wxid_dir = _resolve_wxid_dir_for_image_key(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
target_account_wxid = resolved_wxid_dir.name
except Exception:
target_account_wxid = account
target_account_wxid = str(target_account_wxid or "").strip().lower()
logger.info(
"[image_key] 开始集成流程:request_account=%s target_wxid=%s local_key_count=%s db_storage_path=%s wxid_dir=%s",
str(account or "").strip(),
target_account_wxid,
len(local_keys),
str(db_storage_path or "").strip(),
str(wxid_dir or "").strip(),
)
if local_keys:
# 如果指定了账号,尝试在本地结果中找匹配的
if target_account_wxid:
for k in local_keys:
if k['wxid'] == target_account_wxid:
logger.info(f"成功通过本地算法匹配到账号 {target_account_wxid} 的图片密钥")
local_wxid = str(k.get("wxid") or "").strip().lower()
if local_wxid and local_wxid == target_account_wxid:
logger.info(
"[image_key] 本地算法精确匹配成功:target_wxid=%s payload=%s",
target_account_wxid,
_summarize_key_payload(k),
)
upsert_account_keys_in_store(
account=k['wxid'],
account=str(k.get("wxid") or "").strip(),
image_xor_key=k['xor_key'],
image_aes_key=k['aes_key']
)
return k
logger.info(
"[image_key] 本地算法未匹配到目标账号:target_wxid=%s local_wxids=%s",
target_account_wxid,
[str(item.get("wxid") or "").strip() for item in local_keys],
)
else:
# 如果没指定账号,返回第一个发现的并存入 store (如果有的话)
k = local_keys[0]
logger.info(f"本地算法提取成功 (未指定账号,返回首个): {k['wxid']}")
logger.info(
"[image_key] 未指定账号,返回本地首个结果:payload=%s",
_summarize_key_payload(k),
)
upsert_account_keys_in_store(
account=k['wxid'],
image_xor_key=k['xor_key'],
@@ -213,25 +329,49 @@ async def get_image_key_integrated_workflow(account: Optional[str] = None) -> Di
return k
# 2. 本地提取失败或不匹配,尝试远程解析
logger.info("本地算法提取未命中,尝试远程 API 解析...")
return await fetch_and_save_remote_keys(account)
logger.info("[image_key] 本地算法未命中,尝试远程 API 解析")
return await fetch_and_save_remote_keys(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str, Any]:
account_dir = _resolve_account_dir(account)
wx_id_dir = _resolve_account_wxid_dir(account_dir)
async def fetch_and_save_remote_keys(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
wx_id_dir = _resolve_wxid_dir_for_image_key(
account,
wxid_dir=wxid_dir,
db_storage_path=db_storage_path,
)
wxid = wx_id_dir.name
url = "https://view.free.c3o.re/api/key"
data = {"weixinIDFolder": wxid}
logger.info(f"正在为账号 {wxid} 获取云端备选图片密钥...")
logger.info(
"[image_key] 准备请求远程密钥:request_account=%s resolved_account=%s wxid_dir=%s db_storage_path=%s",
str(account or "").strip(),
wxid,
str(wx_id_dir),
str(db_storage_path or "").strip(),
)
try:
blob1_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1="global_config")
blob2_bytes = get_wechat_internal_global_config(wx_id_dir, file_name1="global_config.crc")
except Exception as e:
raise RuntimeError(f"读取微信内部文件失败: {e}")
logger.info(
"[image_key] 远程请求输入文件已读取:wxid=%s global_config_bytes=%s crc_bytes=%s",
wxid,
len(blob1_bytes),
len(blob2_bytes),
)
files = {
'fileBytes': ('file', blob1_bytes, 'application/octet-stream'),
@@ -239,7 +379,7 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
}
async with httpx.AsyncClient(timeout=30) as client:
logger.info("向云端 API 发送请求...")
logger.info("[image_key] 向云端 API 发送请求:url=%s wxid=%s", url, wxid)
response = await client.post(url, data=data, files=files)
if response.status_code != 200:
@@ -248,6 +388,15 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
config = response.json()
if not config:
raise RuntimeError("云端解析失败: 返回数据为空")
logger.info(
"[image_key] 收到远程响应:status_code=%s keys=%s nick_name=%s",
response.status_code,
{
"xor_key": str(config.get("xorKey", config.get("xor_key", ""))),
"aes_key": _summarize_aes_key(config.get("aesKey", config.get("aes_key", ""))),
},
str(config.get("nickName", config.get("nick_name", ""))),
)
# 新 API 的字段兼容处理
xor_raw = str(config.get("xorKey", config.get("xor_key", "")))
@@ -267,10 +416,16 @@ async def fetch_and_save_remote_keys(account: Optional[str] = None) -> Dict[str,
image_xor_key=xor_hex_str,
image_aes_key=aes_val
)
logger.info(
"[image_key] 远程密钥已保存:account=%s xor_key=%s aes_key=%s",
wxid,
xor_hex_str,
_summarize_aes_key(aes_val),
)
return {
"wxid": wxid,
"xor_key": xor_hex_str,
"aes_key": aes_val,
"nick_name": config.get("nickName", config.get("nick_name", ""))
}
}
+61 -2
View File
@@ -2,12 +2,23 @@ from typing import Optional
from fastapi import APIRouter
from ..logging_config import get_logger
from ..key_store import get_account_keys_from_store
from ..key_service import get_db_key_workflow, get_image_key_integrated_workflow
from ..media_helpers import _load_media_keys, _resolve_account_dir
from ..path_fix import PathFixRoute
router = APIRouter(route_class=PathFixRoute)
logger = get_logger(__name__)
def _summarize_aes_key(value: str) -> str:
raw = str(value or "").strip()
if not raw:
return ""
if len(raw) <= 8:
return raw
return f"{raw[:4]}...{raw[-4:]}(len={len(raw)})"
@router.get("/api/keys", summary="获取账号已保存的密钥")
@@ -23,6 +34,13 @@ async def get_saved_keys(account: Optional[str] = None):
# 账号可能尚未解密;仍允许从全局 store 读取(如果传入了 account
account_name = str(account or "").strip() or None
logger.info(
"[keys] get_saved_keys start: request_account=%s resolved_account=%s account_dir=%s",
str(account or "").strip(),
str(account_name or ""),
str(account_dir) if account_dir else "",
)
keys: dict = {}
if account_name:
keys = get_account_keys_from_store(account_name)
@@ -45,6 +63,14 @@ async def get_saved_keys(account: Optional[str] = None):
"image_aes_key": str(keys.get("image_aes_key") or "").strip(),
"updated_at": str(keys.get("updated_at") or "").strip(),
}
logger.info(
"[keys] get_saved_keys done: account=%s db_key_present=%s xor_key=%s aes_key=%s updated_at=%s",
str(account_name or ""),
bool(result["db_key"]),
result["image_xor_key"],
_summarize_aes_key(result["image_aes_key"]),
result["updated_at"],
)
return {
"status": "success",
@@ -87,7 +113,11 @@ async def get_wechat_db_key():
@router.get("/api/get_image_key", summary="获取并保存微信图片密钥")
async def get_image_key(account: Optional[str] = None):
async def get_image_key(
account: Optional[str] = None,
db_storage_path: Optional[str] = None,
wxid_dir: Optional[str] = None,
):
"""
通过模拟 Next.js Server Action 协议,利用本地微信配置文件换取 AES/XOR 密钥。
@@ -97,7 +127,24 @@ async def get_image_key(account: Optional[str] = None):
4. 解析返回流,自动存入本地数据库
"""
try:
result = await get_image_key_integrated_workflow(account)
logger.info(
"[keys] get_image_key start: request_account=%s db_storage_path=%s wxid_dir=%s",
str(account or "").strip(),
str(db_storage_path or "").strip(),
str(wxid_dir or "").strip(),
)
result = await get_image_key_integrated_workflow(
account,
db_storage_path=db_storage_path,
wxid_dir=wxid_dir,
)
logger.info(
"[keys] get_image_key done: request_account=%s response_account=%s xor_key=%s aes_key=%s",
str(account or "").strip(),
str(result.get("wxid") or "").strip(),
str(result.get("xor_key") or "").strip(),
_summarize_aes_key(str(result.get("aes_key") or "").strip()),
)
return {
"status": 0,
@@ -110,6 +157,12 @@ async def get_image_key(account: Optional[str] = None):
}
}
except FileNotFoundError as e:
logger.exception(
"[keys] get_image_key file missing: request_account=%s db_storage_path=%s wxid_dir=%s",
str(account or "").strip(),
str(db_storage_path or "").strip(),
str(wxid_dir or "").strip(),
)
return {
"status": -1,
"errmsg": f"文件缺失: {str(e)}",
@@ -118,6 +171,12 @@ async def get_image_key(account: Optional[str] = None):
except Exception as e:
import traceback
traceback.print_exc()
logger.exception(
"[keys] get_image_key failed: request_account=%s db_storage_path=%s wxid_dir=%s",
str(account or "").strip(),
str(db_storage_path or "").strip(),
str(wxid_dir or "").strip(),
)
return {
"status": -1,
"errmsg": f"获取失败: {str(e)}",
+95 -1
View File
@@ -2,7 +2,7 @@ import asyncio
import json
from typing import Optional
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import Response, StreamingResponse
from pydantic import BaseModel, Field
@@ -27,6 +27,26 @@ logger = get_logger(__name__)
router = APIRouter(route_class=PathFixRoute)
def _summarize_aes_key(value: Optional[str]) -> str:
raw = str(value or "").strip()
if not raw:
return ""
if len(raw) <= 8:
return raw
return f"{raw[:4]}...{raw[-4:]}(len={len(raw)})"
def _summarize_media_keys(*, xor_key: Optional[str] = None, aes_key: Optional[str] = None) -> dict:
xor_str = str(xor_key or "").strip()
aes_str = str(aes_key or "").strip()
return {
"xor_key": xor_str,
"aes_key": _summarize_aes_key(aes_str),
"has_xor": bool(xor_str),
"has_aes": bool(aes_str),
}
class MediaKeysSaveRequest(BaseModel):
"""媒体密钥保存请求模型(用户手动提供)"""
@@ -52,6 +72,12 @@ async def save_media_keys_api(request: MediaKeysSaveRequest):
- aes_key: AES密钥(可选,至少16个字符;V4-V2需要)
"""
account_dir = _resolve_account_dir(request.account)
logger.info(
"[media] save_media_keys start: request_account=%s resolved_account=%s keys=%s",
str(request.account or "").strip(),
account_dir.name,
_summarize_media_keys(xor_key=request.xor_key, aes_key=request.aes_key),
)
# 解析XOR密钥
try:
@@ -76,6 +102,11 @@ async def save_media_keys_api(request: MediaKeysSaveRequest):
)
except Exception:
pass
logger.info(
"[media] save_media_keys done: account=%s keys=%s",
account_dir.name,
_summarize_media_keys(xor_key=f"0x{xor_int:02X}", aes_key=aes_str[:16] if aes_str else ""),
)
return {
"status": "success",
@@ -99,6 +130,12 @@ async def decrypt_all_media(request: MediaDecryptRequest):
"""
account_dir = _resolve_account_dir(request.account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
logger.info(
"[media] decrypt_all start: request_account=%s resolved_account=%s provided_keys=%s",
str(request.account or "").strip(),
account_dir.name,
_summarize_media_keys(xor_key=request.xor_key, aes_key=request.aes_key),
)
if not wxid_dir:
raise HTTPException(
@@ -125,12 +162,28 @@ async def decrypt_all_media(request: MediaDecryptRequest):
# 如果未提供密钥,尝试从缓存加载
if xor_key_int is None or aes_key16 is None:
cached = _load_media_keys(account_dir)
logger.info(
"[media] decrypt_all cache lookup: account=%s cached_keys=%s",
account_dir.name,
_summarize_media_keys(
xor_key=f"0x{int(cached.get('xor')):02X}" if cached.get("xor") is not None else "",
aes_key=str(cached.get("aes") or "").strip(),
),
)
if xor_key_int is None:
xor_key_int = cached.get("xor")
if aes_key16 is None:
aes_str = str(cached.get("aes") or "").strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
logger.info(
"[media] decrypt_all effective_keys: account=%s keys=%s",
account_dir.name,
_summarize_media_keys(
xor_key=f"0x{int(xor_key_int):02X}" if xor_key_int is not None else "",
aes_key=(aes_key16 or b"").decode("ascii", errors="ignore") if aes_key16 else "",
),
)
if xor_key_int is None:
raise HTTPException(
@@ -226,6 +279,7 @@ async def get_decrypted_resource(md5: str, account: Optional[str] = None):
@router.get("/api/media/decrypt_all_stream", summary="批量解密所有图片资源(SSE实时进度)")
async def decrypt_all_media_stream(
request: Request,
account: Optional[str] = None,
xor_key: Optional[str] = None,
aes_key: Optional[str] = None,
@@ -252,10 +306,26 @@ async def decrypt_all_media_stream(
- 解密后非有效图片格式
"""
async def is_client_disconnected() -> bool:
try:
return await request.is_disconnected()
except Exception:
return False
async def generate_progress():
try:
if await is_client_disconnected():
logger.info("[SSE] 客户端已断开,取消图片解密任务")
return
account_dir = _resolve_account_dir(account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
logger.info(
"[media] decrypt_all_stream start: request_account=%s resolved_account=%s provided_keys=%s",
str(account or "").strip(),
account_dir.name,
_summarize_media_keys(xor_key=xor_key, aes_key=aes_key),
)
if not wxid_dir:
yield f"data: {json.dumps({'type': 'error', 'message': '未找到微信数据目录'})}\n\n"
@@ -281,12 +351,28 @@ async def decrypt_all_media_stream(
# 如果未提供密钥,尝试从缓存加载
if xor_key_int is None or aes_key16 is None:
cached = _load_media_keys(account_dir)
logger.info(
"[media] decrypt_all_stream cache lookup: account=%s cached_keys=%s",
account_dir.name,
_summarize_media_keys(
xor_key=f"0x{int(cached.get('xor')):02X}" if cached.get("xor") is not None else "",
aes_key=str(cached.get("aes") or "").strip(),
),
)
if xor_key_int is None:
xor_key_int = cached.get("xor")
if aes_key16 is None:
aes_str = str(cached.get("aes") or "").strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
logger.info(
"[media] decrypt_all_stream effective_keys: account=%s keys=%s",
account_dir.name,
_summarize_media_keys(
xor_key=f"0x{int(xor_key_int):02X}" if xor_key_int is not None else "",
aes_key=(aes_key16 or b"").decode("ascii", errors="ignore") if aes_key16 else "",
),
)
if xor_key_int is None:
yield f"data: {json.dumps({'type': 'error', 'message': '未找到XOR密钥,请先使用 wx_key 获取并保存/填写'})}\n\n"
@@ -301,6 +387,10 @@ async def decrypt_all_media_stream(
total_files = len(dat_files)
logger.info(f"[SSE] 共发现 {total_files} 个.dat文件(仅图片)")
if await is_client_disconnected():
logger.info("[SSE] 扫描完成后客户端已断开,停止图片解密任务")
return
if total_files == 0:
yield f"data: {json.dumps({'type': 'complete', 'message': '未发现需要解密的图片文件', 'total': 0, 'success_count': 0, 'skip_count': 0, 'fail_count': 0})}\n\n"
return
@@ -319,6 +409,10 @@ async def decrypt_all_media_stream(
resource_dir.mkdir(parents=True, exist_ok=True)
for i, (dat_path, md5) in enumerate(dat_files):
if await is_client_disconnected():
logger.info("[SSE] 客户端已断开,停止图片解密任务")
return
current = i + 1
file_name = dat_path.name
+133 -47
View File
@@ -16,6 +16,37 @@ from datetime import datetime
from .database_filters import should_skip_source_database
COMMON_WECHAT_PATTERNS = [
"WeChat Files",
"Weixin Files",
"wechat_files",
"xwechat_files",
"wechatMSG",
"WeChat",
"微信",
"Weixin",
"wechat",
]
SYSTEM_SCAN_SKIP_NAMES = {
"$recycle.bin",
"$winreagent",
"config.msi",
"documents and settings",
"intel",
"onedrivetemp",
"perflogs",
"program files",
"program files (x86)",
"programdata",
"recovery",
"system volume information",
"windows",
"windows.old",
"windows.old(1)",
}
def get_wx_db(msg_dir: str = None,
db_types: Union[List[str], str] = None,
wxids: Union[List[str], str] = None) -> List[dict]:
@@ -285,6 +316,87 @@ def get_process_list():
return process_list
def _is_wechat_dir_candidate_name(name: str) -> bool:
normalized = str(name or "").strip().lower()
if not normalized:
return False
return any(pattern.lower() in normalized for pattern in COMMON_WECHAT_PATTERNS)
def _safe_iter_subdirs(directory: str) -> List[tuple[str, str]]:
items: List[tuple[str, str]] = []
try:
with os.scandir(directory) as entries:
for entry in entries:
try:
if entry.is_dir():
items.append((entry.name, entry.path))
except OSError:
continue
except (PermissionError, OSError):
return []
return items
def _append_detected_dir(detected_dirs: List[str], candidate: str) -> None:
if not candidate:
return
normalized = os.path.normpath(candidate)
if normalized not in detected_dirs:
detected_dirs.append(normalized)
def _build_auto_detect_scan_paths() -> List[str]:
scan_paths: List[str] = []
seen_paths = set()
def add(path_value: str | None) -> None:
raw = str(path_value or "").strip()
if not raw:
return
normalized = os.path.normpath(raw)
key = normalized.lower()
if key in seen_paths:
return
seen_paths.add(key)
scan_paths.append(normalized)
home_dir = str(Path.home())
add(home_dir)
add(os.path.join(home_dir, "Documents"))
add(os.path.join(home_dir, "Desktop"))
add(os.path.join(home_dir, "Downloads"))
user_profile = str(os.environ.get("USERPROFILE") or "").strip()
if user_profile:
add(user_profile)
add(os.path.join(user_profile, "Documents"))
add(os.path.join(user_profile, "Desktop"))
add(os.path.join(user_profile, "Downloads"))
for drive in ("C:", "D:", "E:", "F:"):
drive_root = drive + os.sep
if not os.path.exists(drive_root):
continue
add(drive_root)
for child_name, child_path in _safe_iter_subdirs(drive_root):
if child_name.strip().lower() in SYSTEM_SCAN_SKIP_NAMES:
continue
add(child_path)
users_dir = os.path.join(drive_root, "Users")
add(users_dir)
for _user_name, user_dir in _safe_iter_subdirs(users_dir):
add(user_dir)
add(os.path.join(user_dir, "Documents"))
add(os.path.join(user_dir, "Desktop"))
add(os.path.join(user_dir, "Downloads"))
return scan_paths
def auto_detect_wechat_data_dirs():
"""
自动检测微信数据目录 - 多策略组合检测
@@ -292,52 +404,27 @@ def auto_detect_wechat_data_dirs():
"""
detected_dirs = []
# 策略1注册表检测已移除
# 策略2和策略3:注册表相关检测已移除
# 策略1:常见驱动器扫描微信相关目录
common_wechat_patterns = [
"WeChat Files", "wechat_files", "xwechat_files", "wechatMSG",
"WeChat", "微信", "Weixin", "wechat"
]
# 扫描常见驱动器
drives = ['C:', 'D:', 'E:', 'F:']
for drive in drives:
if not os.path.exists(drive):
# 策略1常见驱动器 / 用户目录 / 自定义目录的浅层扫描。
# 这里既检查扫描根目录本身,也检查其直接子目录,兼容:
# - C:\Users\<user>\Documents\WeChat Files
# - D:\wechatMSG\xwechat_files
# - D:\abc\wechatMSG\xwechat_files
for scan_path in _build_auto_detect_scan_paths():
if not os.path.exists(scan_path):
continue
try:
# 扫描驱动器根目录和常见目录
scan_paths = [
drive + os.sep,
os.path.join(drive + os.sep, "Users"),
]
scan_name = os.path.basename(os.path.normpath(scan_path))
if _is_wechat_dir_candidate_name(scan_name) and has_wxid_directories(scan_path):
_append_detected_dir(detected_dirs, scan_path)
print(f"[DEBUG] 目录扫描检测成功: {scan_path}")
for scan_path in scan_paths:
if not os.path.exists(scan_path):
continue
try:
for item in os.listdir(scan_path):
item_path = os.path.join(scan_path, item)
if not os.path.isdir(item_path):
continue
# 检查是否匹配微信目录模式
for pattern in common_wechat_patterns:
if pattern.lower() in item.lower():
# 检查是否包含wxid目录
if has_wxid_directories(item_path):
if item_path not in detected_dirs:
detected_dirs.append(item_path)
print(f"[DEBUG] 目录扫描检测成功: {item_path}")
break
except (PermissionError, OSError):
continue
except (PermissionError, OSError):
continue
for item_name, item_path in _safe_iter_subdirs(scan_path):
if not _is_wechat_dir_candidate_name(item_name):
continue
if not has_wxid_directories(item_path):
continue
_append_detected_dir(detected_dirs, item_path)
print(f"[DEBUG] 目录扫描检测成功: {item_path}")
# 策略2:进程内存分析(简化版)
try:
@@ -361,12 +448,11 @@ def auto_detect_wechat_data_dirs():
break
for parent_dir in parent_dirs:
for pattern in common_wechat_patterns:
for pattern in COMMON_WECHAT_PATTERNS:
potential_dir = os.path.join(parent_dir, pattern)
if os.path.exists(potential_dir) and has_wxid_directories(potential_dir):
if potential_dir not in detected_dirs:
detected_dirs.append(potential_dir)
print(f"[DEBUG] 进程分析检测成功: {potential_dir}")
_append_detected_dir(detected_dirs, potential_dir)
print(f"[DEBUG] 进程分析检测成功: {potential_dir}")
except:
pass
except:
@@ -0,0 +1,154 @@
import asyncio
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
import wechat_decrypt_tool.key_service as key_service
class TestKeyServiceImageKeyAccountMatch(unittest.TestCase):
def test_local_image_keys_do_not_match_by_substring(self) -> None:
remote_result = {
"wxid": "wxid_demo_extra",
"xor_key": "0x8A",
"aes_key": "BBBBBBBBBBBBBBBB",
}
with mock.patch.object(
key_service,
"try_get_local_image_keys",
return_value=[
{"wxid": "wxid_demo", "xor_key": "0x01", "aes_key": "AAAAAAAAAAAAAAAA"},
],
), mock.patch.object(
key_service,
"_resolve_account_dir",
return_value=Path("D:/tmp/output/databases/wxid_demo_extra"),
), mock.patch.object(
key_service,
"_resolve_account_wxid_dir",
return_value=Path("D:/tmp/xwechat_files/wxid_demo_extra"),
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock, mock.patch.object(
key_service,
"fetch_and_save_remote_keys",
new=mock.AsyncMock(return_value=remote_result),
) as remote_mock:
result = asyncio.run(key_service.get_image_key_integrated_workflow("wxid_demo_extra"))
self.assertEqual(result, remote_result)
remote_mock.assert_awaited_once_with("wxid_demo_extra", wxid_dir=None, db_storage_path=None)
upsert_mock.assert_not_called()
def test_local_image_keys_require_exact_account_match(self) -> None:
with mock.patch.object(
key_service,
"try_get_local_image_keys",
return_value=[
{"wxid": "wxid_demo", "xor_key": "0x01", "aes_key": "AAAAAAAAAAAAAAAA"},
{"wxid": "wxid_demo_extra", "xor_key": "0x8A", "aes_key": "BBBBBBBBBBBBBBBB"},
],
), mock.patch.object(
key_service,
"_resolve_account_dir",
return_value=Path("D:/tmp/output/databases/wxid_demo_extra"),
), mock.patch.object(
key_service,
"_resolve_account_wxid_dir",
return_value=Path("D:/tmp/xwechat_files/wxid_demo_extra"),
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock, mock.patch.object(
key_service,
"fetch_and_save_remote_keys",
new=mock.AsyncMock(side_effect=AssertionError("remote should not be called")),
):
result = asyncio.run(key_service.get_image_key_integrated_workflow("wxid_demo_extra"))
self.assertEqual(result["wxid"], "wxid_demo_extra")
self.assertEqual(result["xor_key"], "0x8A")
self.assertEqual(result["aes_key"], "BBBBBBBBBBBBBBBB")
upsert_mock.assert_called_once_with(
account="wxid_demo_extra",
image_xor_key="0x8A",
image_aes_key="BBBBBBBBBBBBBBBB",
)
def test_fetch_remote_keys_can_use_db_storage_path_without_decrypted_output(self) -> None:
with TemporaryDirectory() as temp_dir:
wxid_dir = Path(temp_dir) / "xwechat_files" / "wxid_v4mbduwqtzpt22"
db_storage_dir = wxid_dir / "db_storage"
db_storage_dir.mkdir(parents=True, exist_ok=True)
class _FakeResponse:
status_code = 200
@staticmethod
def json():
return {
"xorKey": "138",
"aesKey": "c3f3366e23628242",
"nickName": "demo",
}
class _FakeAsyncClient:
def __init__(self, *args, **kwargs):
pass
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc, tb):
return False
async def post(self, url, data=None, files=None):
self.last_url = url
self.last_data = data
self.last_files = files
return _FakeResponse()
with mock.patch.object(
key_service,
"_resolve_account_dir",
side_effect=AssertionError("should not require decrypted account dir"),
), mock.patch.object(
key_service,
"get_wechat_internal_global_config",
side_effect=[b"global-config", b"crc-bytes"],
), mock.patch.object(
key_service.httpx,
"AsyncClient",
_FakeAsyncClient,
), mock.patch.object(
key_service,
"upsert_account_keys_in_store",
) as upsert_mock:
result = asyncio.run(
key_service.fetch_and_save_remote_keys(
"wxid_v4mbduwqtzpt22",
db_storage_path=str(db_storage_dir),
)
)
self.assertEqual(result["wxid"], "wxid_v4mbduwqtzpt22")
self.assertEqual(result["xor_key"], "0x8A")
self.assertEqual(result["aes_key"], "c3f3366e23628242")
upsert_mock.assert_called_once_with(
account="wxid_v4mbduwqtzpt22",
image_xor_key="0x8A",
image_aes_key="c3f3366e23628242",
)
if __name__ == "__main__":
unittest.main()
+74
View File
@@ -0,0 +1,74 @@
import asyncio
import json
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest import mock
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.routers import media as media_router # noqa: E402 pylint: disable=wrong-import-position
class _FakeDisconnectingRequest:
def __init__(self, disconnect_after: int):
self._disconnect_after = disconnect_after
self._calls = 0
async def is_disconnected(self):
self._calls += 1
return self._calls >= self._disconnect_after
class TestMediaDecryptStreamCancel(unittest.TestCase):
def test_stream_stops_processing_when_client_disconnects(self):
with TemporaryDirectory() as td:
root = Path(td)
account_dir = root / "account"
wxid_dir = root / "wxid"
dat_path = wxid_dir / "image.dat"
resource_dir = account_dir / "resource"
wxid_dir.mkdir(parents=True, exist_ok=True)
dat_path.write_bytes(b"encrypted")
request = _FakeDisconnectingRequest(disconnect_after=3)
decrypt_mock = mock.Mock(return_value=(True, "ok"))
with mock.patch.object(media_router, "_resolve_account_dir", return_value=account_dir):
with mock.patch.object(media_router, "_resolve_account_wxid_dir", return_value=wxid_dir):
with mock.patch.object(media_router, "_load_media_keys", return_value={"xor": 0xA5, "aes": ""}):
with mock.patch.object(media_router, "_collect_all_dat_files", return_value=[(dat_path, "abc123")]):
with mock.patch.object(media_router, "_get_resource_dir", return_value=resource_dir):
with mock.patch.object(media_router, "_try_find_decrypted_resource", return_value=None):
with mock.patch.object(media_router, "_decrypt_and_save_resource", decrypt_mock):
response = asyncio.run(
media_router.decrypt_all_media_stream(
request=request,
account="wxid_demo",
)
)
async def _read_chunks():
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk.decode("utf-8") if isinstance(chunk, bytes) else str(chunk))
return chunks
chunks = asyncio.run(_read_chunks())
events = []
for chunk in chunks:
for line in chunk.splitlines():
if line.startswith("data: "):
events.append(json.loads(line[len("data: ") :]))
self.assertEqual([event.get("type") for event in events], ["scanning", "start"])
decrypt_mock.assert_not_called()
if __name__ == "__main__":
unittest.main()
@@ -0,0 +1,44 @@
import sys
import unittest
from pathlib import Path
from tempfile import TemporaryDirectory
from unittest.mock import patch
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
class TestWechatDetectionAutoDetect(unittest.TestCase):
def test_detect_wechat_installation_finds_nested_custom_data_root(self):
from wechat_decrypt_tool import wechat_detection as wd
with TemporaryDirectory() as td:
nested_scan_root = Path(td) / "abc"
wechat_parent = nested_scan_root / "wechatMSG"
xwechat_root = wechat_parent / "xwechat_files"
login_dir = xwechat_root / "all_users" / "login" / "wxid_demo"
login_dir.mkdir(parents=True, exist_ok=True)
(login_dir / "key_info.db").write_bytes(b"demo")
account_dir = xwechat_root / "wxid_demo_nested"
account_dir.mkdir(parents=True, exist_ok=True)
(account_dir / "contact.db").write_bytes(b"demo")
with (
patch.object(wd, "_build_auto_detect_scan_paths", return_value=[str(nested_scan_root)]),
patch.object(wd, "get_process_list", return_value=[]),
):
detected_dirs = wd.auto_detect_wechat_data_dirs()
result = wd.detect_wechat_installation()
self.assertEqual(detected_dirs, [str(wechat_parent)])
self.assertEqual(result["total_accounts"], 1)
self.assertEqual(result["accounts"][0]["account_name"], "wxid_demo")
self.assertEqual(result["accounts"][0]["data_dir"], str(account_dir))
self.assertEqual(result["total_databases"], 1)
if __name__ == "__main__":
unittest.main()