Compare commits

...

13 Commits

35 changed files with 2009 additions and 252 deletions
-4
View File
@@ -176,10 +176,6 @@ npm run dist
## 使用指南
### 导出
侧边栏提供“导出”入口,点击下载图标可打开统一导出弹窗。当前导出界面只保留两个选项:导出数据库、导出资源文件;用户选择导出目录后会生成一个账号归档 ZIP。当两个选项都勾选时,导出会按账号目录直接归档,使用 ZIP 存储模式打包,尽量避免二次压缩带来的耗时。
### 获取解密密钥
在使用本工具之前,您需要先获取微信数据库的解密密钥。推荐使用以下工具:
+2 -2
View File
@@ -1,12 +1,12 @@
{
"name": "wechat-data-analysis-desktop",
"version": "1.7.20",
"version": "1.8.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "wechat-data-analysis-desktop",
"version": "1.7.20",
"version": "1.8.0",
"dependencies": {
"electron-updater": "^6.7.3"
},
+1 -1
View File
@@ -1,7 +1,7 @@
{
"name": "wechat-data-analysis-desktop",
"private": true,
"version": "1.7.20",
"version": "1.8.0",
"main": "src/main.cjs",
"scripts": {
"dev": "node scripts/dev.cjs",
+44
View File
@@ -195,6 +195,33 @@
</div>
</div>
<!-- ImgHelper (Auto download large images) -->
<div
class="sidebar-rail-action w-full h-[var(--sidebar-rail-step)] flex items-center justify-center group"
:class="imgHelperBusy ? 'opacity-60 cursor-not-allowed' : 'cursor-pointer'"
:title="imgHelperTitle"
@click="toggleImgHelper"
>
<div class="sidebar-rail-plate w-[var(--sidebar-rail-btn)] h-[var(--sidebar-rail-btn)] rounded-md flex items-center justify-center transition-colors bg-transparent">
<svg
class="sidebar-rail-icon w-[var(--sidebar-rail-icon)] h-[var(--sidebar-rail-icon)]"
:class="{ 'sidebar-rail-icon-active': imgHelperEnabled }"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
stroke-width="1.8"
stroke-linecap="round"
stroke-linejoin="round"
aria-hidden="true"
>
<rect x="3" y="3" width="18" height="18" rx="2" ry="2" />
<circle cx="8.5" cy="8.5" r="1.5" />
<polyline points="21 15 16 10 5 21" />
<path d="M12 9v5m-2-2l2 2 2-2" />
</svg>
</div>
</div>
<!-- Privacy -->
<div
class="sidebar-rail-action w-full h-[var(--sidebar-rail-step)] flex items-center justify-center cursor-pointer group"
@@ -368,6 +395,7 @@
import { storeToRefs } from 'pinia'
import { useChatAccountsStore } from '~/stores/chatAccounts'
import { useChatRealtimeStore } from '~/stores/chatRealtime'
import { useImgHelperStore } from '~/stores/imgHelper'
import { usePrivacyStore } from '~/stores/privacy'
import { useThemeStore } from '~/stores/theme'
@@ -384,6 +412,10 @@ themeStore.init()
const realtimeStore = useChatRealtimeStore()
const { enabled: realtimeEnabled, available: realtimeAvailable, checking: realtimeChecking, statusError: realtimeStatusError, toggling: realtimeToggling } = storeToRefs(realtimeStore)
const imgHelperStore = useImgHelperStore()
const { enabled: imgHelperEnabled, checking: imgHelperChecking, toggling: imgHelperToggling, error: imgHelperError } = storeToRefs(imgHelperStore)
const { open: settingsDialogOpen, openDialog: openSettingsDialog } = useSettingsDialog()
const { getChatAccountInfo, deleteChatAccount } = useApi()
@@ -620,6 +652,18 @@ const toggleRealtime = async () => {
if (realtimeBusy.value) return
await realtimeStore.toggle({ silent: false })
}
const imgHelperBusy = computed(() => !!imgHelperChecking.value || !!imgHelperToggling.value)
const imgHelperTitle = computed(() => {
if (imgHelperEnabled.value) return '关闭自动下载大图'
return imgHelperError.value || '开启自动下载大图'
})
const toggleImgHelper = async () => {
if (imgHelperBusy.value) return
await imgHelperStore.toggle()
}
</script>
<style scoped>
+13
View File
@@ -636,8 +636,21 @@ export const useApi = () => {
return await request(url)
}
const getImgHelperStatus = async () => {
return await request('/system/img_helper/status')
}
const toggleImgHelper = async (enabled) => {
return await request('/system/img_helper/toggle', {
method: 'POST',
body: { enabled: !!enabled }
})
}
return {
pickSystemDirectory,
getImgHelperStatus,
toggleImgHelper,
detectWechat,
detectCurrentAccount,
decryptDatabase,
+68 -8
View File
@@ -1,4 +1,4 @@
import { createPerfTrace, getLatestResourceTiming } from '~/lib/chat/perf-logger'
import { createPerfTrace, getLatestResourceTiming, logPerfChannel, nowPerfMs } from '~/lib/chat/perf-logger'
const CHAT_LAZY_SRC_EVENT = 'chat-lazy-src:start'
const CHAT_LAZY_ROOT_MARGIN = '240px 0px 520px 0px'
@@ -17,6 +17,12 @@ const nextRenderTick = (callback) => {
})
}
const roundPerfMs = (value) => {
const numeric = Number(value)
if (!Number.isFinite(numeric)) return null
return Number(numeric.toFixed(1))
}
const readImageSrc = (element) => {
return String(
element?.currentSrc
@@ -44,7 +50,8 @@ const ensurePerfState = (element) => {
finalized: true,
onLoad: null,
onError: null,
onLazyStart: null
onLazyStart: null,
lazyPendingLoggedSrc: ''
}
}
return element.__chatMediaPerfState
@@ -63,7 +70,11 @@ const ensureLazySrcState = (element) => {
src: '',
loadedSrc: '',
observer: null,
timer: null
timer: null,
requestedAt: 0,
observerStartedAt: 0,
appliedAt: 0,
lastApplyReason: ''
}
}
return element.__chatLazySrcState
@@ -88,11 +99,22 @@ const applyLazySrc = (element, reason = '') => {
if (!element || !src) return
if (state.loadedSrc === src && readImageSrc(element) === src) return
const appliedAt = nowPerfMs()
state.loadedSrc = src
state.appliedAt = appliedAt
state.lastApplyReason = String(reason || '')
element.setAttribute('src', src)
try {
element.dispatchEvent(new CustomEvent(CHAT_LAZY_SRC_EVENT, {
detail: { src, reason }
detail: {
src,
reason,
requestedAt: state.requestedAt || 0,
observerStartedAt: state.observerStartedAt || 0,
appliedAt,
waitSinceRequestMs: state.requestedAt ? roundPerfMs(appliedAt - state.requestedAt) : null,
waitSinceObserverMs: state.observerStartedAt ? roundPerfMs(appliedAt - state.observerStartedAt) : null
}
}))
} catch {}
}
@@ -103,6 +125,10 @@ const updateLazySrc = (element, binding, reason = '') => {
cleanupLazySrcObserver(element)
state.src = nextSrc
state.requestedAt = nowPerfMs()
state.observerStartedAt = 0
state.appliedAt = 0
state.lastApplyReason = ''
if (!nextSrc) {
state.loadedSrc = ''
@@ -121,6 +147,7 @@ const updateLazySrc = (element, binding, reason = '') => {
return
}
state.observerStartedAt = nowPerfMs()
state.observer = new window.IntersectionObserver((entries) => {
const entry = entries?.[0]
if (!entry?.isIntersecting) return
@@ -150,7 +177,30 @@ const finalizeTracking = (element, status, reason = '') => {
state.finalized = true
}
const beginTracking = (element, binding, reason = '') => {
const logPendingLazy = (element, binding, reason = '') => {
const perfState = ensurePerfState(element)
const lazyState = element?.__chatLazySrcState
const src = String(lazyState?.src || '').trim()
if (!src || readImageSrc(element)) return
const logKey = `${src}:${reason}`
if (perfState.lazyPendingLoggedSrc === logKey) return
perfState.lazyPendingLoggedSrc = logKey
const { kind, meta } = normalizeBindingValue(binding?.value)
const now = nowPerfMs()
logPerfChannel('chat-media-ui', 'lazy:pending', {
kind,
src,
...meta,
reason,
hasObserver: !!lazyState?.observer,
hasTimer: !!lazyState?.timer,
waitSinceRequestMs: lazyState?.requestedAt ? roundPerfMs(now - lazyState.requestedAt) : null,
waitSinceObserverMs: lazyState?.observerStartedAt ? roundPerfMs(now - lazyState.observerStartedAt) : null
})
}
const beginTracking = (element, binding, reason = '', lazyDetail = null) => {
const state = ensurePerfState(element)
const src = readImageSrc(element)
if (!src) return
@@ -164,11 +214,16 @@ const beginTracking = (element, binding, reason = '') => {
src,
...meta
})
const lazyState = element?.__chatLazySrcState
state.trace.log('resource:start', {
reason,
complete: !!element?.complete,
loading: String(element?.getAttribute?.('loading') || '').trim(),
decoding: String(element?.getAttribute?.('decoding') || '').trim()
decoding: String(element?.getAttribute?.('decoding') || '').trim(),
lazyTriggerReason: String(lazyDetail?.reason || lazyState?.lastApplyReason || '').trim(),
waitSinceLazyRequestMs: lazyDetail?.waitSinceRequestMs ?? (lazyState?.requestedAt ? roundPerfMs(nowPerfMs() - lazyState.requestedAt) : null),
waitSinceLazyObserverMs: lazyDetail?.waitSinceObserverMs ?? (lazyState?.observerStartedAt ? roundPerfMs(nowPerfMs() - lazyState.observerStartedAt) : null),
waitSinceLazyApplyMs: lazyState?.appliedAt ? roundPerfMs(nowPerfMs() - lazyState.appliedAt) : null
})
if (element?.complete) {
@@ -182,16 +237,20 @@ export default defineNuxtPlugin((nuxtApp) => {
const state = ensurePerfState(element)
state.onLoad = () => finalizeTracking(element, 'load', 'load-event')
state.onError = () => finalizeTracking(element, 'error', 'error-event')
state.onLazyStart = () => beginTracking(element, binding, 'lazy-src')
state.onLazyStart = (event) => beginTracking(element, binding, 'lazy-src', event?.detail || null)
element.addEventListener('load', state.onLoad)
element.addEventListener('error', state.onError)
element.addEventListener(CHAT_LAZY_SRC_EVENT, state.onLazyStart)
beginTracking(element, binding, 'mounted')
logPendingLazy(element, binding, 'mounted')
},
updated(element, binding) {
const state = ensurePerfState(element)
const nextSrc = readImageSrc(element)
if (!nextSrc) return
if (!nextSrc) {
logPendingLazy(element, binding, 'updated-no-src')
return
}
if (nextSrc !== state.src) {
beginTracking(element, binding, 'updated-src')
return
@@ -199,6 +258,7 @@ export default defineNuxtPlugin((nuxtApp) => {
if (element?.complete && !state.finalized) {
nextRenderTick(() => finalizeTracking(element, 'load', 'updated-complete'))
}
logPendingLazy(element, binding, 'updated')
},
beforeUnmount(element) {
const state = element?.__chatMediaPerfState
+71
View File
@@ -0,0 +1,71 @@
import { defineStore } from 'pinia'
export const useImgHelperStore = defineStore('imgHelper', () => {
const enabled = ref(false)
const checking = ref(false)
const toggling = ref(false)
const error = ref('')
const fetchStatus = async () => {
if (!process.client) return
const api = useApi()
checking.value = true
error.value = ''
try {
const resp = await api.getImgHelperStatus()
enabled.value = !!resp?.enabled
} catch (e) {
error.value = e?.message || '获取插件状态失败'
} finally {
checking.value = false
}
}
const toggle = async () => {
if (toggling.value) return
const targetState = !enabled.value
if (targetState) {
// Show warning for first time or every time? User said "首次开启提示hook可能存在风控风险"
// We can use localStorage to track if it's the first time.
const hasWarned = localStorage.getItem('img_helper_warned')
if (!hasWarned) {
const confirmed = window.confirm('【安全提示】\n开启“自动下载大图”功能将使用 Hook 技术修改微信内存逻辑。这可能存在一定的风控风险,建议仅在需要时开启。\n\n确认开启吗?')
if (!confirmed) return
localStorage.setItem('img_helper_warned', 'true')
}
}
toggling.value = true
error.value = ''
const api = useApi()
try {
const resp = await api.toggleImgHelper(targetState)
enabled.value = !!resp?.enabled
return true
} catch (e) {
error.value = e?.message || '操作失败'
if (process.client) {
window.alert(error.value)
}
return false
} finally {
toggling.value = false
}
}
// Initialize status
if (process.client) {
fetchStatus()
}
return {
enabled,
checking,
toggling,
error,
fetchStatus,
toggle
}
})
+2 -2
View File
@@ -1,6 +1,6 @@
[project]
name = "wechat-decrypt-tool"
version = "1.7.20"
version = "1.8.0"
description = "Modern WeChat database decryption tool with React frontend"
readme = "README.md"
requires-python = ">=3.11"
@@ -20,7 +20,7 @@ dependencies = [
"pilk>=0.2.4",
"pypinyin>=0.53.0",
"jieba>=0.42.1",
"wx_key>=2.0.0",
"wx_key>=2.0.1",
"packaging",
"httpx",
]
+1 -1
View File
@@ -1,5 +1,5 @@
"""微信数据库解密工具
"""
__version__ = "1.7.20"
__version__ = "1.8.0"
__author__ = "WeChat Decrypt Tool"
+8
View File
@@ -37,6 +37,7 @@ from .routers.wechat_detection import router as _wechat_detection_router
from .routers.wrapped import router as _wrapped_router
from .request_logging import log_server_errors_middleware
from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown
from .img_helper import IMG_HELPER
from .routers.biz import router as _biz_router
from .routers.system import router as _system_router
@@ -188,6 +189,13 @@ async def _shutdown_wcdb_realtime() -> None:
CHAT_REALTIME_AUTOSYNC.stop()
except Exception:
pass
# Uninstall img_helper hook if enabled
try:
IMG_HELPER.disable()
except Exception:
pass
close_ok = False
lock_timeout_s: float | None = 0.2
try:
+114
View File
@@ -0,0 +1,114 @@
import ctypes
import os
from pathlib import Path
from typing import Optional
from .logging_config import get_logger
logger = get_logger(__name__)
class ImgHelper:
def __init__(self):
self._lib: Optional[ctypes.CDLL] = None
self._enabled = False
self._lock = __import__("threading").Lock()
@staticmethod
def _resolve_dll_path() -> Path:
# 1. Default (source code layout)
base = Path(__file__).resolve().parent
path = base / "native" / "img_helper.dll"
if path.exists():
return path
# 2. Frozen (bundled exe)
import sys
if getattr(sys, "frozen", False):
exe_dir = Path(sys.executable).resolve().parent
# Try native subfolder or same folder as exe
for p in [exe_dir / "native" / "img_helper.dll", exe_dir / "img_helper.dll"]:
if p.exists():
return p
# 3. Current working directory
for p in [Path.cwd() / "native" / "img_helper.dll", Path.cwd() / "img_helper.dll"]:
if p.exists():
return p
return path # Fallback to default for error message
def _load_lib(self):
if self._lib is not None:
return self._lib
dll_path = self._resolve_dll_path()
if not dll_path.exists():
raise FileNotFoundError(f"Missing img_helper.dll at: {dll_path}")
try:
# On Windows, ensure the DLL's directory is in the search path for dependencies
if hasattr(os, 'add_dll_directory'):
try:
os.add_dll_directory(str(dll_path.parent))
except Exception:
pass
lib = ctypes.CDLL(str(dll_path))
lib.InitImgHelper.argtypes = [ctypes.c_uint32]
lib.InitImgHelper.restype = ctypes.c_bool
lib.UninstallImgHelper.argtypes = []
lib.UninstallImgHelper.restype = None
lib.GetImgHelperError.argtypes = []
lib.GetImgHelperError.restype = ctypes.c_char_p
self._lib = lib
return lib
except Exception as e:
logger.error(f"Failed to load img_helper.dll: {e}")
raise
def enable(self, pid: int) -> tuple[bool, str]:
with self._lock:
try:
lib = self._load_lib()
if self._enabled:
# If already enabled, we uninstall first to be safe as per DLL docs suggestion
# about being designed to hook one process at a time.
lib.UninstallImgHelper()
if lib.InitImgHelper(pid):
self._enabled = True
logger.info(f"ImgHelper hook applied to PID {pid}")
return True, "Success"
else:
err_ptr = lib.GetImgHelperError()
err_msg = err_ptr.decode('utf-8', errors='ignore') if err_ptr else "Unknown error"
logger.error(f"ImgHelper hook failed: {err_msg}")
return False, err_msg
except Exception as e:
logger.error(f"ImgHelper enable exception: {e}")
return False, str(e)
def disable(self) -> bool:
with self._lock:
if not self._enabled:
return True
try:
lib = self._load_lib()
lib.UninstallImgHelper()
self._enabled = False
logger.info("ImgHelper hook uninstalled")
return True
except Exception as e:
logger.error(f"Failed to uninstall img helper: {e}")
return False
@property
def is_enabled(self) -> bool:
return self._enabled
IMG_HELPER = ImgHelper()
+34 -22
View File
@@ -51,10 +51,10 @@ def _summarize_key_payload(payload: Optional[Dict[str, Any]]) -> Dict[str, Any]:
def _resolve_wxid_dir_for_image_key(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Path:
explicit_wxid_dir = str(wxid_dir or "").strip()
if explicit_wxid_dir:
@@ -193,15 +193,26 @@ class WeChatKeyFetcher:
process = subprocess.Popen(normalized_exe_path)
time.sleep(2)
candidates = []
target_process_name = Path(normalized_exe_path).name.lower()
for proc in psutil.process_iter(['pid', 'name', 'create_time']):
proc_name = str(proc.info.get('name') or "").strip().lower()
if proc_name == target_process_name or self._is_wechat_process(proc_name):
candidates.append(proc)
for proc in psutil.process_iter(['pid', 'name', 'exe', 'cmdline']):
try:
p_name = proc.info.get('name')
if p_name and p_name.lower() in self.process_names:
cmdline_list = proc.info.get('cmdline') or []
cmdline_str = " ".join(cmdline_list).lower()
if any(target.lower() in cmdline_str for target in WECHAT_EXECUTABLE_NAMES):
candidates.append({
"pid": proc.info['pid'],
"cmd_len": len(cmdline_str)
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if candidates:
candidates.sort(key=lambda x: x.info['create_time'], reverse=True)
target_pid = candidates[0].info['pid']
# 选择命令行最短的一个作为主进程
main_proc = min(candidates, key=lambda x: x['cmd_len'])
target_pid = main_proc["pid"]
return target_pid
return process.pid
@@ -275,6 +286,7 @@ class WeChatKeyFetcher:
"db_key": found_db_key
}
def get_db_key_workflow(wechat_install_path: Optional[str] = None):
fetcher = WeChatKeyFetcher()
return fetcher.fetch_db_key(wechat_install_path=wechat_install_path)
@@ -295,13 +307,13 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
if wx_key is None or not hasattr(wx_key, 'get_image_key'):
logger.info("[image_key] 本地算法不可用:wx_key.get_image_key 缺失")
return []
try:
res_json = wx_key.get_image_key()
if not res_json:
logger.info("[image_key] 本地算法返回空结果")
return []
data = json.loads(res_json)
accounts = data.get('accounts', [])
results = []
@@ -329,10 +341,10 @@ def try_get_local_image_keys() -> List[Dict[str, Any]]:
async def get_image_key_integrated_workflow(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
集成图片密钥获取流程:
@@ -341,7 +353,7 @@ async def get_image_key_integrated_workflow(
"""
# 1. 尝试本地提取
local_keys = try_get_local_image_keys()
target_account_wxid = None
if account or wxid_dir or db_storage_path:
try:
@@ -409,10 +421,10 @@ async def get_image_key_integrated_workflow(
async def fetch_and_save_remote_keys(
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
account: Optional[str] = None,
*,
wxid_dir: Optional[str] = None,
db_storage_path: Optional[str] = None,
) -> Dict[str, Any]:
wx_id_dir = _resolve_wxid_dir_for_image_key(
account,
+144 -20
View File
@@ -1184,6 +1184,131 @@ def _load_account_source_info(account_dir: Path) -> dict[str, Any]:
return {}
def _clean_weflow_account_dir_name(dir_name: str) -> str:
"""按 WeFlow 的账号目录规则清理 wxid。
WeFlow 在连接 WCDB 前会把形如 `xxx_abcd` 的账号目录清理为 `xxx`
再传给 native `wcdb_set_my_wxid`。这里保持同样规则,避免 suffix 目录名
影响实时读取。
"""
trimmed = str(dir_name or "").strip()
if not trimmed:
return trimmed
if trimmed.lower().startswith("wxid_"):
match = re.match(r"^(wxid_[^_]+)", trimmed, flags=re.IGNORECASE)
if match:
return match.group(1)
return trimmed
suffix_match = re.match(r"^(.+)_([a-zA-Z0-9]{4})$", trimmed)
return suffix_match.group(1) if suffix_match else trimmed
def _find_db_storage_recursive(dir_path: Path, max_depth: int) -> Optional[Path]:
"""有限深度递归查找 db_storage,逻辑对齐 WeFlow。"""
if max_depth <= 0:
return None
try:
entries = list(dir_path.iterdir())
except Exception:
return None
for entry in entries:
try:
if entry.is_dir() and entry.name.lower() == "db_storage":
return entry
except Exception:
continue
for entry in entries:
try:
if not entry.is_dir():
continue
except Exception:
continue
found = _find_db_storage_recursive(entry, max_depth - 1)
if found is not None:
return found
return None
def _resolve_db_storage_path_like_weflow(base_path: str | Path, account_name: str) -> Optional[Path]:
"""按 WeFlow 的 resolveDbStoragePath 规则解析 db_storage。"""
raw = str(base_path or "").strip()
if not raw:
return None
try:
normalized = Path(raw).expanduser()
except Exception:
normalized = Path(raw)
def existing_dir(candidate: Path) -> Optional[Path]:
try:
return candidate if candidate.exists() and candidate.is_dir() else None
except Exception:
return None
direct_self = existing_dir(normalized)
if direct_self is not None and direct_self.name.lower() == "db_storage":
return direct_self
direct_child = existing_dir(normalized / "db_storage")
if direct_child is not None:
return direct_child
wxid_candidates: list[str] = []
for item in (account_name, _clean_weflow_account_dir_name(account_name)):
item = str(item or "").strip()
if item and item not in wxid_candidates:
wxid_candidates.append(item)
for wxid in wxid_candidates:
via_wxid = existing_dir(normalized / wxid / "db_storage")
if via_wxid is not None:
return via_wxid
# 兼容目录名包含额外后缀(如 wxid_xxx_1234)。
try:
entries = list(normalized.iterdir())
except Exception:
entries = []
lower_wxid = wxid.lower()
for entry in entries:
try:
if not entry.is_dir():
continue
except Exception:
continue
lower_entry = entry.name.lower()
if lower_entry == lower_wxid or lower_entry.startswith(f"{lower_wxid}_"):
candidate = existing_dir(entry / "db_storage")
if candidate is not None:
return candidate
# 兜底:向上查找 db_storage(最多 2 级),处理用户选择了子目录的情况。
try:
parent = normalized
for _ in range(2):
up = parent.parent
if up == parent:
break
parent = up
candidate_up = existing_dir(parent / "db_storage")
if candidate_up is not None:
return candidate_up
for wxid in wxid_candidates:
via_wxid_up = existing_dir(parent / wxid / "db_storage")
if via_wxid_up is not None:
return via_wxid_up
except Exception:
pass
return _find_db_storage_recursive(normalized, 3)
def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
try:
home = Path.home()
@@ -1195,14 +1320,18 @@ def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
home / "Documents" / "WeChat Files",
]
candidates = [account_name, _clean_weflow_account_dir_name(account_name)]
candidates = [x for i, x in enumerate(candidates) if x and x not in candidates[:i]]
# Exact match first
for root in roots:
c = root / account_name
try:
if c.exists() and c.is_dir():
return c
except Exception:
continue
for name in candidates:
c = root / name
try:
if c.exists() and c.is_dir():
return c
except Exception:
continue
# Then try prefix match: wxid_xxx_yyyy
for root in roots:
@@ -1212,8 +1341,9 @@ def _guess_wxid_dir_from_common_paths(account_name: str) -> Optional[Path]:
for p in root.iterdir():
if not p.is_dir():
continue
if p.name.startswith(account_name + "_"):
return p
for name in candidates:
if p.name.startswith(name + "_"):
return p
except Exception:
continue
return None
@@ -1236,21 +1366,15 @@ def _resolve_account_db_storage_dir(account_dir: Path) -> Optional[Path]:
info = _load_account_source_info(account_dir)
db_storage_path = str(info.get("db_storage_path") or "").strip()
if db_storage_path:
try:
p = Path(db_storage_path)
if p.exists() and p.is_dir():
return p
except Exception:
pass
resolved = _resolve_db_storage_path_like_weflow(db_storage_path, account_dir.name)
if resolved is not None:
return resolved
wxid_dir = _resolve_account_wxid_dir(account_dir)
if wxid_dir:
c = wxid_dir / "db_storage"
try:
if c.exists() and c.is_dir():
return c
except Exception:
pass
resolved = _resolve_db_storage_path_like_weflow(wxid_dir, account_dir.name)
if resolved is not None:
return resolved
return None
Binary file not shown.
Binary file not shown.
+60 -14
View File
@@ -3246,19 +3246,20 @@ def _append_full_messages_from_rows(
create_time=create_time,
)
# Some WeChat builds store the on-disk thumbnail basename (32-hex) in packed_info_data (protobuf),
# while the message XML only carries a long cdnthumburl file_id. Prefer packed_info_data when present.
if not _is_hex_md5(video_thumb_md5):
# Match WeFlow's video strategy: packed_info_data often stores the local msg/video basename.
# Prefer this token for video lookup; keep XML CDN/file_id as fallback query parameters.
try:
packed_val = r["packed_info_data"]
except Exception:
try:
packed_val = r["packed_info_data"]
packed_val = r.get("packed_info_data") # type: ignore[attr-defined]
except Exception:
try:
packed_val = r.get("packed_info_data") # type: ignore[attr-defined]
except Exception:
packed_val = None
packed_md5 = _extract_md5_from_packed_info(packed_val)
if packed_md5:
video_thumb_md5 = packed_md5
packed_val = None
packed_video_token = _extract_md5_from_packed_info(packed_val)
if packed_video_token:
video_md5 = packed_video_token
if not _is_hex_md5(video_thumb_md5):
video_thumb_md5 = packed_video_token
content_text = "[视频]"
elif local_type == 47:
render_type = "emoji"
@@ -3823,6 +3824,7 @@ def _postprocess_full_messages(
m["videoThumbUrl"] = (
base_url
+ f"/api/chat/media/video_thumb?account={quote(account_dir.name)}&md5={quote(video_thumb_md5)}&username={quote(username)}"
+ (f"&file_id={quote(video_thumb_file_id)}" if video_thumb_file_id else "")
)
elif video_thumb_file_id:
m["videoThumbUrl"] = (
@@ -3838,6 +3840,7 @@ def _postprocess_full_messages(
m["videoUrl"] = (
base_url
+ f"/api/chat/media/video?account={quote(account_dir.name)}&md5={quote(video_md5)}&username={quote(username)}"
+ (f"&file_id={quote(video_file_id)}" if video_file_id else "")
)
elif video_file_id:
m["videoUrl"] = (
@@ -4509,9 +4512,10 @@ def _collect_chat_messages(
contact_conn = None
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn: Optional[sqlite3.Connection] = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
table_name = _resolve_msg_table_name(conn, username)
if not table_name:
continue
@@ -4830,6 +4834,20 @@ def _collect_chat_messages(
packed_md5 = _extract_md5_from_packed_info(r["packed_info_data"])
if packed_md5:
video_thumb_md5 = packed_md5
# Match WeFlow video lookup: packed_info_data may be the local msg/video basename.
# Keep XML md5/file_id as fallback, but prefer the packed token for local playback.
try:
packed_val = r["packed_info_data"]
except Exception:
try:
packed_val = r.get("packed_info_data") # type: ignore[attr-defined]
except Exception:
packed_val = None
packed_video_token = _extract_md5_from_packed_info(packed_val)
if packed_video_token:
video_md5 = packed_video_token
if not _is_hex_md5(video_thumb_md5):
video_thumb_md5 = packed_video_token
content_text = "[视频]"
elif local_type == 47:
render_type = "emoji"
@@ -5007,8 +5025,20 @@ def _collect_chat_messages(
"_rawText": raw_text if local_type in (10000, 266287972401) else "",
}
)
except sqlite3.DatabaseError as e:
# 单个解密库损坏时不要让整个聊天详情接口 500;保留诊断日志,继续尝试其他 message_*.db。
logger.warning(
"[chat.messages] malformed message db skipped account=%s username=%s db=%s error=%s diag=%s",
account_dir.name,
username,
str(db_path),
str(e),
format_sqlite_diagnostics(collect_sqlite_diagnostics(db_path, quick_check=True)),
)
continue
finally:
conn.close()
if conn is not None:
conn.close()
if contact_conn is not None:
try:
@@ -5782,6 +5812,20 @@ def list_chat_messages(
local_id=local_id,
create_time=create_time,
)
# Match WeFlow video lookup: packed_info_data may be the local msg/video basename.
# Keep XML md5/file_id as fallback, but prefer the packed token for local playback.
try:
packed_val = r["packed_info_data"]
except Exception:
try:
packed_val = r.get("packed_info_data") # type: ignore[attr-defined]
except Exception:
packed_val = None
packed_video_token = _extract_md5_from_packed_info(packed_val)
if packed_video_token:
video_md5 = packed_video_token
if not _is_hex_md5(video_thumb_md5):
video_thumb_md5 = packed_video_token
content_text = "[视频]"
elif local_type == 47:
render_type = "emoji"
@@ -6214,6 +6258,7 @@ def list_chat_messages(
m["videoThumbUrl"] = (
base_url
+ f"/api/chat/media/video_thumb?account={quote(account_dir.name)}&md5={quote(video_thumb_md5)}&username={quote(username)}"
+ (f"&file_id={quote(video_thumb_file_id)}" if video_thumb_file_id else "")
)
elif video_thumb_file_id:
m["videoThumbUrl"] = (
@@ -6229,6 +6274,7 @@ def list_chat_messages(
m["videoUrl"] = (
base_url
+ f"/api/chat/media/video?account={quote(account_dir.name)}&md5={quote(video_md5)}&username={quote(username)}"
+ (f"&file_id={quote(video_file_id)}" if video_file_id else "")
)
elif video_file_id:
m["videoUrl"] = (
+645 -47
View File
@@ -7,6 +7,8 @@ import mimetypes
import os
import sqlite3
import subprocess
import time
import re
from pathlib import Path
from typing import Any, Optional
from urllib.parse import urlparse
@@ -61,7 +63,7 @@ from ..media_helpers import (
from ..chat_helpers import _extract_md5_from_packed_info, _load_contact_rows, _pick_avatar_url
from ..path_fix import PathFixRoute
from ..perf_trace import create_perf_trace
from ..wcdb_realtime import WCDB_REALTIME, get_avatar_urls as _wcdb_get_avatar_urls
from ..wcdb_realtime import WCDB_REALTIME, exec_query as _wcdb_exec_query, get_avatar_urls as _wcdb_get_avatar_urls
logger = get_logger(__name__)
@@ -70,6 +72,244 @@ router = APIRouter(route_class=PathFixRoute)
CHAT_MEDIA_BROWSER_CACHE_SECONDS = 24 * 60 * 60
VIDEO_DIR_INDEX_TTL_SECONDS = 90.0
_VIDEO_DIR_INDEX_CACHE: dict[str, tuple[float, dict[str, dict[str, str]]]] = {}
_VIDEO_DIR_INDEX_MAX_ENTRIES = 32
def _normalize_video_lookup_key(value: str) -> str:
text = str(value or "").strip().lower()
if not text:
return ""
text = text.replace("\\", "/").split("/")[-1]
text = re.sub(r"\.(?:mp4|mov|m4v|avi|mkv|flv|jpg|jpeg|png|gif|webp|dat)$", "", text, flags=re.I)
text = re.sub(r"_thumb$", "", text, flags=re.I)
direct = re.fullmatch(r"([a-f0-9]{16,64})(?:_raw)?", text, flags=re.I)
if direct:
suffix = "_raw" if text.endswith("_raw") else ""
return f"{direct.group(1).lower()}{suffix}"
preferred32 = re.search(r"([a-f0-9]{32})(?![a-f0-9])", text, flags=re.I)
if preferred32:
return preferred32.group(1).lower()
fallback = re.search(r"([a-f0-9]{16,64})(?![a-f0-9])", text, flags=re.I)
return fallback.group(1).lower() if fallback else ""
def _is_video_month_dir_name(name: str) -> bool:
n = str(name or "")
return len(n) == 7 and n[4] == "-" and n[:4].isdigit() and n[5:7].isdigit()
def _get_or_build_video_dir_index(video_base_dir: Path) -> dict[str, dict[str, str]]:
"""Build a WeFlow-style index for msg/video/YYYY-MM files."""
try:
base = video_base_dir.resolve()
except Exception:
base = video_base_dir
cache_key = str(base)
now = time.monotonic()
cached = _VIDEO_DIR_INDEX_CACHE.get(cache_key)
if cached and (now - cached[0]) < VIDEO_DIR_INDEX_TTL_SECONDS:
return cached[1]
index: dict[str, dict[str, str]] = {}
def ensure_entry(key: str) -> dict[str, str]:
entry = index.get(key)
if entry is None:
entry = {}
index[key] = entry
return entry
try:
if not base.exists() or not base.is_dir():
return {}
month_dirs: list[Path] = []
try:
for child in base.iterdir():
try:
if child.is_dir() and _is_video_month_dir_name(child.name):
month_dirs.append(child)
except Exception:
continue
except Exception:
month_dirs = []
month_dirs.sort(key=lambda x: x.name, reverse=True)
dirs_to_scan = [*month_dirs, base]
for d in dirs_to_scan:
try:
files = list(d.iterdir())
except Exception:
continue
for file_path in files:
try:
if not file_path.is_file():
continue
except Exception:
continue
lower = file_path.name.lower()
if lower.endswith((".mp4", ".m4v", ".mov")):
stem = lower.rsplit(".", 1)[0]
key = _normalize_video_lookup_key(stem)
if not key:
continue
entry = ensure_entry(key)
entry.setdefault("video", str(file_path))
if key.endswith("_raw"):
base_key = key[:-4]
ensure_entry(base_key).setdefault("video", str(file_path))
else:
ensure_entry(f"{key}_raw").setdefault("video", str(file_path))
continue
if not lower.endswith((".jpg", ".jpeg", ".png", ".webp")):
continue
stem = lower.rsplit(".", 1)[0]
is_thumb = stem.endswith("_thumb")
if is_thumb:
stem = stem[:-6]
key = _normalize_video_lookup_key(stem)
if not key:
continue
entry = ensure_entry(key)
entry.setdefault("thumb" if is_thumb else "cover", str(file_path))
if key.endswith("_raw"):
base_key = key[:-4]
ensure_entry(base_key).setdefault("thumb" if is_thumb else "cover", str(file_path))
finally:
if len(_VIDEO_DIR_INDEX_CACHE) >= _VIDEO_DIR_INDEX_MAX_ENTRIES:
try:
oldest_key = min(_VIDEO_DIR_INDEX_CACHE.items(), key=lambda kv: kv[1][0])[0]
_VIDEO_DIR_INDEX_CACHE.pop(oldest_key, None)
except Exception:
_VIDEO_DIR_INDEX_CACHE.clear()
_VIDEO_DIR_INDEX_CACHE[cache_key] = (now, index)
return index
def _resolve_video_path_from_weflow_index(
*,
md5: str,
wxid_dir: Optional[Path],
db_storage_dir: Optional[Path],
want_thumb: bool,
) -> Optional[Path]:
lookup_key = _normalize_video_lookup_key(md5)
if not lookup_key:
return None
bases: list[Path] = []
for root in [wxid_dir, db_storage_dir]:
if not root:
continue
bases.extend([root / "msg" / "video", root / "video"])
seen: set[str] = set()
keys = [lookup_key]
if lookup_key.endswith("_raw"):
keys.append(lookup_key[:-4])
else:
keys.append(f"{lookup_key}_raw")
for base in bases:
try:
base_key = str(base.resolve())
except Exception:
base_key = str(base)
if base_key in seen:
continue
seen.add(base_key)
try:
if not base.exists() or not base.is_dir():
continue
except Exception:
continue
index = _get_or_build_video_dir_index(base)
for key in keys:
entry = index.get(key) or {}
candidates = [entry.get("thumb"), entry.get("cover")] if want_thumb else [entry.get("video")]
for candidate in candidates:
if not candidate:
continue
p = Path(candidate)
try:
if p.exists() and p.is_file():
return p
except Exception:
continue
return None
_REALTIME_VIDEO_HARDLINK_CACHE_TTL_SECONDS = 120.0
_REALTIME_VIDEO_HARDLINK_CACHE: dict[tuple[str, str], tuple[float, str]] = {}
def _sql_quote(value: str) -> str:
return "'" + str(value or "").replace("'", "''") + "'"
def _resolve_video_file_token_from_realtime_hardlink(account_dir: Path, md5: str) -> str:
"""Resolve XML video md5 to the real local msg/video basename via encrypted hardlink.db."""
md5_norm = _normalize_video_lookup_key(md5)
if not md5_norm:
return ""
cache_key = (str(account_dir.name), md5_norm)
now = time.monotonic()
cached = _REALTIME_VIDEO_HARDLINK_CACHE.get(cache_key)
if cached and (now - cached[0]) < _REALTIME_VIDEO_HARDLINK_CACHE_TTL_SECONDS:
return cached[1]
resolved = ""
try:
conn = WCDB_REALTIME.ensure_connected(account_dir, timeout=5.0)
hardlink_db_path = Path(conn.db_storage_dir) / "hardlink" / "hardlink.db"
if not hardlink_db_path.exists():
return ""
md5_lit = _sql_quote(md5_norm)
sql = (
"SELECT md5, file_name, file_size, modify_time, dir1, dir2 "
"FROM video_hardlink_info_v4 "
f"WHERE md5 = {md5_lit} OR file_name LIKE '%' || {md5_lit} || '%' "
"ORDER BY modify_time DESC, dir1 DESC, rowid DESC LIMIT 1"
)
rows = _wcdb_exec_query(conn.handle, kind="hardlink", path=str(hardlink_db_path), sql=sql) or []
if rows:
file_name = str((rows[0] or {}).get("file_name") or "").strip()
resolved = _normalize_video_lookup_key(file_name) or file_name.lower()
except Exception:
resolved = ""
_REALTIME_VIDEO_HARDLINK_CACHE[cache_key] = (now, resolved)
return resolved
def _resolve_video_path_from_realtime_hardlink(
*,
account_dir: Path,
md5: str,
wxid_dir: Optional[Path],
db_storage_dir: Optional[Path],
want_thumb: bool,
) -> tuple[Optional[Path], str]:
token = _resolve_video_file_token_from_realtime_hardlink(account_dir, md5)
if not token:
return None, ""
path = _resolve_video_path_from_weflow_index(
md5=token,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=want_thumb,
)
if path is not None:
return path, token
path = _fast_probe_video_path_by_md5(
md5=token,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=want_thumb,
)
return path, token
def _build_cached_media_response(request: Optional[Request], data: bytes, media_type: str) -> Response:
payload = bytes(data or b"")
@@ -1481,11 +1721,28 @@ async def get_chat_image(
# md5 模式:优先检查解密资源目录;如果微信目录里已经有更高质量版本,会在后面自动升级。
if md5:
cache_started_at = time.perf_counter()
decrypted_path = _try_find_decrypted_resource(account_dir, str(md5).lower())
trace(
"decrypted-cache:path-lookup",
hasPath=bool(decrypted_path),
path=str(decrypted_path or ""),
elapsedMsLocal=round((time.perf_counter() - cache_started_at) * 1000.0, 1),
)
if decrypted_path:
read_started_at = time.perf_counter()
data = decrypted_path.read_bytes()
media_type = _detect_image_media_type(data[:32])
if media_type != "application/octet-stream" and _is_probably_valid_image(data, media_type):
valid_image = bool(media_type != "application/octet-stream" and _is_probably_valid_image(data, media_type))
trace(
"decrypted-cache:read-validate",
path=str(decrypted_path),
bytes=len(data or b""),
mediaType=media_type,
validImage=valid_image,
elapsedMsLocal=round((time.perf_counter() - read_started_at) * 1000.0, 1),
)
if valid_image:
cached_path = decrypted_path
cached_data = data
cached_media_type = media_type
@@ -1512,6 +1769,7 @@ async def get_chat_image(
return _build_cached_media_response(request, cached_data, cached_media_type)
# 回退:从微信数据目录实时定位并解密
roots_started_at = time.perf_counter()
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
@@ -1521,6 +1779,7 @@ async def get_chat_image(
hasWxidDir=bool(wxid_dir),
hasDbStorageDir=bool(db_storage_dir),
hardlinkHasImageTable=bool(hardlink_has_image_table),
elapsedMsLocal=round((time.perf_counter() - roots_started_at) * 1000.0, 1),
)
roots: list[Path] = []
@@ -1544,6 +1803,7 @@ async def get_chat_image(
allow_deep_scan = False
if md5:
hardlink_started_at = time.perf_counter()
p = await asyncio.to_thread(
_resolve_media_path_from_hardlink,
hardlink_db_path,
@@ -1553,12 +1813,42 @@ async def get_chat_image(
username=username,
extra_roots=roots[1:],
)
trace(
"source:hardlink-lookup",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - hardlink_started_at) * 1000.0, 1),
)
# Fast fallback for thumbnails not indexed by hardlink.db: scan only this chat's attach directory.
# Keep this before the file_id fallback: file_id search can be very expensive on large WeChat folders,
# while md5 + conversation-scoped attach probing usually resolves current chat images in milliseconds.
if (not p) and wxid_dir and username:
fast_probe_started_at = time.perf_counter()
hit = await asyncio.to_thread(
_fast_probe_image_path_in_chat_attach,
wxid_dir_str=str(wxid_dir),
username=str(username),
md5=str(md5),
)
if hit:
p = Path(hit)
trace(
"source:chat-attach-fast-probe",
found=bool(hit),
path=str(hit or ""),
elapsedMsLocal=round((time.perf_counter() - fast_probe_started_at) * 1000.0, 1),
)
# Some WeChat versions send both md5 + file_id; md5 may be missing from hardlink.db while file_id still works.
# Only run this broader fallback after the scoped md5 probe misses.
if (not p) and file_id:
file_id_started_at = time.perf_counter()
file_id_roots_checked = 0
for r in [wxid_dir, db_storage_dir]:
if not r:
continue
file_id_roots_checked += 1
hit = await asyncio.to_thread(
_fallback_search_media_by_file_id,
str(r),
@@ -1569,24 +1859,27 @@ async def get_chat_image(
if hit:
p = Path(hit)
break
# Fast fallback for thumbnails not indexed by hardlink.db: scan only this chat's attach directory.
if (not p) and wxid_dir and username:
hit = await asyncio.to_thread(
_fast_probe_image_path_in_chat_attach,
wxid_dir_str=str(wxid_dir),
username=str(username),
md5=str(md5),
trace(
"source:file-id-fallback-after-md5",
found=bool(p),
rootsChecked=file_id_roots_checked,
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - file_id_started_at) * 1000.0, 1),
)
if hit:
p = Path(hit)
# Deep scan is extremely expensive for misses (~seconds per md5). Only enable when:
# - user explicitly requests `deep_scan=1`, OR
# - hardlink.db doesn't have the image table (older/partial data).
allow_deep_scan = bool(deep_scan) or (not hardlink_has_image_table)
if (not p) and wxid_dir and allow_deep_scan:
deep_scan_started_at = time.perf_counter()
hit = await asyncio.to_thread(_fallback_search_media_by_md5, str(wxid_dir), str(md5), kind="image")
trace(
"source:deep-scan",
found=bool(hit),
path=str(hit or ""),
elapsedMsLocal=round((time.perf_counter() - deep_scan_started_at) * 1000.0, 1),
)
if hit:
p = Path(hit)
try:
@@ -1594,10 +1887,13 @@ async def get_chat_image(
except Exception:
pass
elif file_id:
# 一些版本图片消息无 MD5,仅提供 cdnthumburl 等“文件标识”
# Some image messages have no MD5 and only provide a cdnthumburl-like file identifier.
file_id_started_at = time.perf_counter()
file_id_roots_checked = 0
for r in [wxid_dir, db_storage_dir]:
if not r:
continue
file_id_roots_checked += 1
hit = await asyncio.to_thread(
_fallback_search_media_by_file_id,
str(r),
@@ -1608,6 +1904,13 @@ async def get_chat_image(
if hit:
p = Path(hit)
break
trace(
"source:file-id-lookup",
found=bool(p),
rootsChecked=file_id_roots_checked,
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - file_id_started_at) * 1000.0, 1),
)
if not p:
if cached_path:
@@ -1621,7 +1924,9 @@ async def get_chat_image(
)
raise HTTPException(status_code=404, detail="Image not found.")
candidates_started_at = time.perf_counter()
candidates.extend(await asyncio.to_thread(_iter_media_source_candidates, p))
candidate_count_before_order = len(candidates)
candidates = await asyncio.to_thread(_order_media_candidates, candidates)
trace(
"candidates:resolved",
@@ -1629,6 +1934,8 @@ async def get_chat_image(
candidateCount=len(candidates),
hasCachedPath=bool(cached_path),
allowDeepScan=bool(allow_deep_scan),
candidateCountBeforeOrder=candidate_count_before_order,
elapsedMsLocal=round((time.perf_counter() - candidates_started_at) * 1000.0, 1),
)
if cached_path:
@@ -1661,8 +1968,11 @@ async def get_chat_image(
chosen: Optional[Path] = None
decode_attempts = 0
trace("decode:start", candidateCount=len(candidates))
slow_decode_logged = 0
for src_path in candidates:
decode_attempts += 1
decode_one_started_at = time.perf_counter()
decode_error = ""
try:
data, media_type = await asyncio.to_thread(
_read_and_maybe_decrypt_media,
@@ -1670,10 +1980,30 @@ async def get_chat_image(
account_dir=account_dir,
weixin_root=wxid_dir,
)
except Exception:
except Exception as e:
decode_error = str(e)
data = b""
media_type = "application/octet-stream"
decode_elapsed_ms = round((time.perf_counter() - decode_one_started_at) * 1000.0, 1)
valid_image = not (media_type.startswith("image/") and (not _is_probably_valid_image(data, media_type)))
should_log_attempt = bool(decode_error) or decode_attempts <= 3 or decode_elapsed_ms >= 100 or media_type != "application/octet-stream"
if should_log_attempt and slow_decode_logged < 8:
trace(
"decode:attempt",
attempt=decode_attempts,
path=str(src_path),
mediaType=media_type,
bytes=len(data or b""),
validImage=bool(valid_image),
error=decode_error[:200],
elapsedMsLocal=decode_elapsed_ms,
)
slow_decode_logged += 1
if decode_error:
continue
if media_type.startswith("image/") and (not _is_probably_valid_image(data, media_type)):
if not valid_image:
continue
if media_type != "application/octet-stream":
@@ -1803,7 +2133,7 @@ async def get_chat_emoji(
return Response(content=data, media_type=media_type)
@router.get("/api/chat/media/video_thumb", summary="获取视频缩略图资源")
@router.get("/api/chat/media/video_thumb", summary="Get video thumbnail media")
async def get_chat_video_thumb(
md5: Optional[str] = None,
file_id: Optional[str] = None,
@@ -1814,16 +2144,47 @@ async def get_chat_video_thumb(
if (not md5) and (not file_id):
raise HTTPException(status_code=400, detail="Missing md5/file_id.")
account_dir = _resolve_account_dir(account)
md5_norm = str(md5 or "").strip().lower() if md5 else ""
file_id_norm = str(file_id or "").strip()
_trace_id, trace = create_perf_trace(
logger,
"chat.video_thumb",
account=account_dir.name,
username=str(username or ""),
md5=md5_norm,
fileId=file_id_norm,
deepScan=bool(deep_scan),
)
trace("request:start")
# 优先从解密资源目录读取(更快)
if md5:
decrypted_path = _try_find_decrypted_resource(account_dir, str(md5).lower())
# Fast path: cached decoded thumbnail resource.
if md5_norm:
cache_started_at = time.perf_counter()
decrypted_path = _try_find_decrypted_resource(account_dir, md5_norm)
trace(
"decrypted-cache:path-lookup",
hasPath=bool(decrypted_path),
path=str(decrypted_path or ""),
elapsedMsLocal=round((time.perf_counter() - cache_started_at) * 1000.0, 1),
)
if decrypted_path:
read_started_at = time.perf_counter()
data = decrypted_path.read_bytes()
media_type = _detect_image_media_type(data[:32])
trace(
"decrypted-cache:read-validate",
path=str(decrypted_path),
bytes=len(data or b""),
mediaType=media_type,
elapsedMsLocal=round((time.perf_counter() - read_started_at) * 1000.0, 1),
)
trace("response:ready", result="decrypted-cache-hit", mediaType=media_type, bytes=len(data or b""))
return Response(content=data, media_type=media_type)
else:
trace("decrypted-cache:skipped", reason="missing-md5")
# 回退到原始逻辑
# Fallback: locate and decode from WeChat data directories.
roots_started_at = time.perf_counter()
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
extra_roots: list[Path] = []
@@ -1837,53 +2198,149 @@ async def get_chat_video_thumb(
roots.append(wxid_dir)
if db_storage_dir:
roots.append(db_storage_dir)
trace(
"roots:resolved",
hasWxidDir=bool(wxid_dir),
wxidDir=str(wxid_dir or ""),
hasDbStorageDir=bool(db_storage_dir),
dbStorageDir=str(db_storage_dir or ""),
hardlinkHasVideoTable=bool(hardlink_has_video_table),
elapsedMsLocal=round((time.perf_counter() - roots_started_at) * 1000.0, 1),
)
if not roots:
trace("response:error", result="roots-not-found")
raise HTTPException(
status_code=404,
detail="wxid_dir/db_storage_path not found. Please decrypt with db_storage_path to enable media lookup.",
)
p: Optional[Path] = None
if md5:
p = _resolve_media_path_from_hardlink(
allow_deep_scan = False
if md5_norm:
hardlink_started_at = time.perf_counter()
p = await asyncio.to_thread(
_resolve_media_path_from_hardlink,
hardlink_db_path,
roots[0],
md5=str(md5),
md5=md5_norm,
kind="video_thumb",
username=username,
extra_roots=roots[1:],
)
trace(
"source:hardlink-lookup",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - hardlink_started_at) * 1000.0, 1),
)
# Many WeChat builds store video thumbnails directly as `{md5}_thumb.jpg` under msg/video/YYYY-MM.
# This fast probe avoids an expensive recursive scan on misses.
# WeFlow-style lookup: build a short-lived index of msg/video/YYYY-MM and resolve by local file token.
if (not p) and (wxid_dir or db_storage_dir):
p = _fast_probe_video_path_by_md5(
md5=str(md5),
index_started_at = time.perf_counter()
p = await asyncio.to_thread(
_resolve_video_path_from_weflow_index,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=True,
)
trace(
"source:weflow-video-index",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - index_started_at) * 1000.0, 1),
)
# Many WeChat builds store video thumbnails directly as `{md5}_thumb.jpg` under msg/video/YYYY-MM.
# This direct probe is retained as a cheap fallback when the index misses.
if (not p) and (wxid_dir or db_storage_dir):
fast_probe_started_at = time.perf_counter()
p = await asyncio.to_thread(
_fast_probe_video_path_by_md5,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=True,
)
trace(
"source:fast-probe",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - fast_probe_started_at) * 1000.0, 1),
)
if (not p) and (wxid_dir or db_storage_dir):
realtime_started_at = time.perf_counter()
p, resolved_token = await asyncio.to_thread(
_resolve_video_path_from_realtime_hardlink,
account_dir=account_dir,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=True,
)
trace(
"source:realtime-hardlink",
found=bool(p),
resolvedToken=str(resolved_token or ""),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - realtime_started_at) * 1000.0, 1),
)
allow_deep_scan = bool(deep_scan) or (not hardlink_has_video_table)
if (not p) and wxid_dir and allow_deep_scan:
hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5), kind="video_thumb")
deep_scan_started_at = time.perf_counter()
hit = await asyncio.to_thread(_fallback_search_media_by_md5, str(wxid_dir), md5_norm, kind="video_thumb")
if hit:
p = Path(hit)
if (not p) and file_id:
trace(
"source:deep-scan",
found=bool(hit),
path=str(hit or ""),
elapsedMsLocal=round((time.perf_counter() - deep_scan_started_at) * 1000.0, 1),
)
if (not p) and file_id_norm:
file_id_started_at = time.perf_counter()
file_id_roots_checked = 0
for r in [wxid_dir, db_storage_dir]:
if not r:
continue
hit = _fallback_search_media_by_file_id(str(r), str(file_id), kind="video_thumb", username=str(username or ""))
file_id_roots_checked += 1
hit = await asyncio.to_thread(
_fallback_search_media_by_file_id,
str(r),
file_id_norm,
kind="video_thumb",
username=str(username or ""),
)
if hit:
p = Path(hit)
break
trace(
"source:file-id-lookup",
found=bool(p),
rootsChecked=file_id_roots_checked,
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - file_id_started_at) * 1000.0, 1),
)
if not p:
trace("response:error", result="source-not-found", allowDeepScan=bool(allow_deep_scan))
raise HTTPException(status_code=404, detail="Video thumbnail not found.")
data, media_type = _read_and_maybe_decrypt_media(p, account_dir=account_dir, weixin_root=wxid_dir)
read_started_at = time.perf_counter()
data, media_type = await asyncio.to_thread(_read_and_maybe_decrypt_media, p, account_dir=account_dir, weixin_root=wxid_dir)
trace(
"decode:done",
path=str(p),
mediaType=media_type,
bytes=len(data or b""),
elapsedMsLocal=round((time.perf_counter() - read_started_at) * 1000.0, 1),
)
trace("response:ready", result="decoded", mediaType=media_type, bytes=len(data or b""))
return Response(content=data, media_type=media_type)
@router.get("/api/chat/media/video", summary="获取视频资源")
@router.get("/api/chat/media/video", summary="Get video media")
async def get_chat_video(
md5: Optional[str] = None,
file_id: Optional[str] = None,
@@ -1895,14 +2352,36 @@ async def get_chat_video(
raise HTTPException(status_code=400, detail="Missing md5/file_id.")
account_dir = _resolve_account_dir(account)
md5_norm = str(md5 or "").strip().lower() if md5 else ""
file_id_norm = str(file_id or "").strip()
_trace_id, trace = create_perf_trace(
logger,
"chat.video",
account=account_dir.name,
username=str(username or ""),
md5=md5_norm,
fileId=file_id_norm,
deepScan=bool(deep_scan),
)
trace("request:start")
if md5_norm:
# 优先从解密资源目录读取(更快,且支持 Range
# Fast path Range?
cache_started_at = time.perf_counter()
decrypted_path = _try_find_decrypted_resource(account_dir, md5_norm)
trace(
"decrypted-cache:path-lookup",
hasPath=bool(decrypted_path),
path=str(decrypted_path or ""),
elapsedMsLocal=round((time.perf_counter() - cache_started_at) * 1000.0, 1),
)
if decrypted_path:
mt = _guess_media_type_by_path(decrypted_path, fallback="video/mp4")
trace("response:ready", result="decrypted-cache-hit", mediaType=mt, path=str(decrypted_path))
return FileResponse(str(decrypted_path), media_type=mt)
else:
trace("decrypted-cache:skipped", reason="missing-md5")
roots_started_at = time.perf_counter()
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
extra_roots: list[Path] = []
@@ -1916,14 +2395,28 @@ async def get_chat_video(
roots.append(wxid_dir)
if db_storage_dir:
roots.append(db_storage_dir)
trace(
"roots:resolved",
hasWxidDir=bool(wxid_dir),
wxidDir=str(wxid_dir or ""),
hasDbStorageDir=bool(db_storage_dir),
dbStorageDir=str(db_storage_dir or ""),
hardlinkHasVideoTable=bool(hardlink_has_video_table),
elapsedMsLocal=round((time.perf_counter() - roots_started_at) * 1000.0, 1),
)
if not roots:
trace("response:error", result="roots-not-found")
raise HTTPException(
status_code=404,
detail="wxid_dir/db_storage_path not found. Please decrypt with db_storage_path to enable media lookup.",
)
p: Optional[Path] = None
allow_deep_scan = False
if md5_norm:
p = _resolve_media_path_from_hardlink(
hardlink_started_at = time.perf_counter()
p = await asyncio.to_thread(
_resolve_media_path_from_hardlink,
hardlink_db_path,
roots[0],
md5=md5_norm,
@@ -1931,58 +2424,163 @@ async def get_chat_video(
username=username,
extra_roots=roots[1:],
)
trace(
"source:hardlink-lookup",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - hardlink_started_at) * 1000.0, 1),
)
if (not p) and (wxid_dir or db_storage_dir):
p = _fast_probe_video_path_by_md5(
index_started_at = time.perf_counter()
p = await asyncio.to_thread(
_resolve_video_path_from_weflow_index,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=False,
)
trace(
"source:weflow-video-index",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - index_started_at) * 1000.0, 1),
)
if (not p) and (wxid_dir or db_storage_dir):
fast_probe_started_at = time.perf_counter()
p = await asyncio.to_thread(
_fast_probe_video_path_by_md5,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=False,
)
trace(
"source:fast-probe",
found=bool(p),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - fast_probe_started_at) * 1000.0, 1),
)
if (not p) and (wxid_dir or db_storage_dir):
realtime_started_at = time.perf_counter()
p, resolved_token = await asyncio.to_thread(
_resolve_video_path_from_realtime_hardlink,
account_dir=account_dir,
md5=md5_norm,
wxid_dir=wxid_dir,
db_storage_dir=db_storage_dir,
want_thumb=False,
)
trace(
"source:realtime-hardlink",
found=bool(p),
resolvedToken=str(resolved_token or ""),
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - realtime_started_at) * 1000.0, 1),
)
allow_deep_scan = bool(deep_scan) or (not hardlink_has_video_table)
if (not p) and wxid_dir and allow_deep_scan:
hit = _fallback_search_media_by_md5(str(wxid_dir), md5_norm, kind="video")
deep_scan_started_at = time.perf_counter()
hit = await asyncio.to_thread(_fallback_search_media_by_md5, str(wxid_dir), md5_norm, kind="video")
if hit:
p = Path(hit)
if (not p) and file_id:
trace(
"source:deep-scan",
found=bool(hit),
path=str(hit or ""),
elapsedMsLocal=round((time.perf_counter() - deep_scan_started_at) * 1000.0, 1),
)
if (not p) and file_id_norm:
file_id_started_at = time.perf_counter()
file_id_roots_checked = 0
for r in [wxid_dir, db_storage_dir]:
if not r:
continue
hit = _fallback_search_media_by_file_id(str(r), str(file_id), kind="video", username=str(username or ""))
file_id_roots_checked += 1
hit = await asyncio.to_thread(
_fallback_search_media_by_file_id,
str(r),
file_id_norm,
kind="video",
username=str(username or ""),
)
if hit:
p = Path(hit)
break
trace(
"source:file-id-lookup",
found=bool(p),
rootsChecked=file_id_roots_checked,
path=str(p or ""),
elapsedMsLocal=round((time.perf_counter() - file_id_started_at) * 1000.0, 1),
)
if not p:
trace("response:error", result="source-not-found", allowDeepScan=bool(allow_deep_scan))
raise HTTPException(status_code=404, detail="Video not found.")
# 直接可播放的 MP4:直接 FileResponse(支持 Range
# Fast path MP4??? FileResponse??? Range?
probe_started_at = time.perf_counter()
try:
with open(p, "rb") as f:
head = f.read(8)
if len(head) >= 8 and head[4:8] == b"ftyp":
is_plain_mp4 = bool(len(head) >= 8 and head[4:8] == b"ftyp")
trace(
"decode:probe-plain-mp4",
path=str(p),
isPlainMp4=is_plain_mp4,
elapsedMsLocal=round((time.perf_counter() - probe_started_at) * 1000.0, 1),
)
if is_plain_mp4:
media_type = _guess_media_type_by_path(p, fallback="video/mp4")
trace("response:ready", result="plain-file", mediaType=media_type, path=str(p))
return FileResponse(str(p), media_type=media_type)
except Exception:
pass
except Exception as e:
trace(
"decode:probe-plain-mp4",
path=str(p),
error=str(e)[:200],
elapsedMsLocal=round((time.perf_counter() - probe_started_at) * 1000.0, 1),
)
# 尝试解密/去前缀并落盘(避免一次性返回大文件 bytes
# Fast path/????????????????? bytes?
if md5_norm:
materialize_started_at = time.perf_counter()
try:
materialized = _ensure_decrypted_resource_for_md5(
materialized = await asyncio.to_thread(
_ensure_decrypted_resource_for_md5,
account_dir,
md5=md5_norm,
source_path=p,
weixin_root=wxid_dir,
)
except Exception:
materialize_error = ""
except Exception as e:
materialized = None
materialize_error = str(e)
trace(
"decode:materialize",
found=bool(materialized),
path=str(materialized or ""),
error=materialize_error[:200],
elapsedMsLocal=round((time.perf_counter() - materialize_started_at) * 1000.0, 1),
)
if materialized:
media_type = _guess_media_type_by_path(materialized, fallback="video/mp4")
trace("response:ready", result="materialized", mediaType=media_type, path=str(materialized))
return FileResponse(str(materialized), media_type=media_type)
# 最后兜底:直接返回处理后的 bytes(不支持 Range
data, media_type = _read_and_maybe_decrypt_media(p, account_dir=account_dir, weixin_root=wxid_dir)
# Fast path bytes???? Range?
read_started_at = time.perf_counter()
data, media_type = await asyncio.to_thread(_read_and_maybe_decrypt_media, p, account_dir=account_dir, weixin_root=wxid_dir)
if media_type == "application/octet-stream":
media_type = _guess_media_type_by_path(p, fallback="video/mp4")
trace(
"decode:bytes-fallback",
path=str(p),
mediaType=media_type,
bytes=len(data or b""),
elapsedMsLocal=round((time.perf_counter() - read_started_at) * 1000.0, 1),
)
trace("response:ready", result="bytes-fallback", mediaType=media_type, bytes=len(data or b""))
return Response(content=data, media_type=media_type)
@@ -403,6 +403,7 @@ async def decrypt_databases_stream(
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or int(db_diagnostic.get("hmac_warning_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
@@ -434,8 +435,11 @@ async def decrypt_databases_stream(
if db_diagnostic:
payload["diagnostic_status"] = str(db_diagnostic.get("diagnostic_status") or "")
payload["page_failures"] = int(db_diagnostic.get("failed_pages") or 0)
payload["hmac_warning_pages"] = int(db_diagnostic.get("hmac_warning_pages") or 0)
if db_diagnostic.get("failed_page_samples"):
payload["failed_page_samples"] = db_diagnostic.get("failed_page_samples")
if db_diagnostic.get("hmac_warning_samples"):
payload["hmac_warning_samples"] = db_diagnostic.get("hmac_warning_samples")
if db_diagnostic.get("diagnostics"):
payload["diagnostics"] = db_diagnostic.get("diagnostics")
+35 -2
View File
@@ -1,7 +1,9 @@
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
import asyncio
from concurrent.futures import ThreadPoolExecutor
from ..img_helper import IMG_HELPER
from .wechat_detection import check_wechat_status
router = APIRouter()
@@ -32,4 +34,35 @@ async def pick_directory(title: str = "请选择目录", initial_dir: str = ""):
# 在子线程中执行 GUI 操作
folder_path = await loop.run_in_executor(pool, _open_folder_dialog, title, initial_dir)
return {"path": folder_path}
return {"path": folder_path}
@router.get("/api/system/img_helper/status", summary="获取大图下载辅助插件状态")
async def get_img_helper_status():
return {
"enabled": IMG_HELPER.is_enabled
}
class ImgHelperToggleRequest(BaseModel):
enabled: bool
@router.post("/api/system/img_helper/toggle", summary="开启/关闭大图下载辅助插件")
async def toggle_img_helper(req: ImgHelperToggleRequest):
if not req.enabled:
IMG_HELPER.disable()
return {"status": "success", "enabled": False}
# Attempt to enable
status_res = await check_wechat_status()
wx_status = status_res.get("wx_status", {})
if not wx_status.get("is_running") or not wx_status.get("pid"):
raise HTTPException(status_code=400, detail="未检测到微信正在运行,请先打开微信再尝试!")
pid = wx_status["pid"]
ok, err = IMG_HELPER.enable(pid)
if not ok:
raise HTTPException(status_code=500, detail=f"开启失败: {err}")
return {"status": "success", "enabled": True}
@@ -90,8 +90,11 @@ async def detect_current_account(data_root_path: Optional[str] = None):
@router.get("/api/wechat/status", summary="检查微信运行状态")
async def check_wechat_status():
"""
检查系统中是否有 Weixin.exe WeChat.exe 进程在运行
返回: status=0 成功, wx_status={is_running: bool, pid: int, ...}
检查系统微信主进程状态
逻辑
1. 匹配进程名 Weixin.exe WeChat.exe
2. 校验命令行必须包含 exe 名称排除崩溃后的残留/无效进程
3. 在有效进程中选择命令行最短的一个作为主进程
"""
process_name_targets = ["Weixin.exe", "WeChat.exe"]
@@ -103,21 +106,37 @@ async def check_wechat_status():
}
try:
for proc in psutil.process_iter(['pid', 'name', 'exe', 'memory_info']):
candidates = []
for proc in psutil.process_iter(['pid', 'name', 'exe', 'memory_info', 'cmdline']):
try:
if proc.info['name'] and proc.info['name'] in process_name_targets:
wx_status["is_running"] = True
wx_status["pid"] = proc.info['pid']
wx_status["exe_path"] = proc.info['exe']
p_name = proc.info.get('name')
if p_name and p_name in process_name_targets:
# 获取命令行并合并为字符串
cmdline_list = proc.info.get('cmdline') or []
cmdline_str = " ".join(cmdline_list).lower()
mem = proc.info['memory_info']
if mem:
wx_status["memory_usage_mb"] = round(mem.rss / (1024 * 1024), 2)
break
if any(target.lower() in cmdline_str for target in process_name_targets):
candidates.append({
"pid": proc.info['pid'],
"exe_path": proc.info['exe'],
"cmd_len": len(cmdline_str),
"memory_info": proc.info['memory_info']
})
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
continue
if candidates:
main_proc = min(candidates, key=lambda x: x['cmd_len'])
wx_status["is_running"] = True
wx_status["pid"] = main_proc["pid"]
wx_status["exe_path"] = main_proc["exe_path"]
mem = main_proc["memory_info"]
if mem:
wx_status["memory_usage_mb"] = round(mem.rss / (1024 * 1024), 2)
return {
"status": 0,
"errmsg": "ok",
@@ -125,9 +144,8 @@ async def check_wechat_status():
}
except Exception as e:
# 即使出错也返回 JSON,但 status 非 0
return {
"status": -1,
"errmsg": f"检查进程失败: {str(e)}",
"errmsg": f"检查微信主进程失败: {str(e)}",
"wx_status": wx_status
}
}
@@ -18,6 +18,7 @@ from .chat_helpers import (
_should_keep_session,
)
from .logging_config import get_logger
from .sqlite_diagnostics import collect_sqlite_diagnostics, format_sqlite_diagnostics
logger = get_logger(__name__)
@@ -241,11 +242,13 @@ def build_session_last_message_table(
best: dict[str, tuple[tuple[int, int, int], dict[str, Any]]] = {}
skipped_dbs = 0
for db_path in db_paths:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
conn: Optional[sqlite3.Connection] = None
try:
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
conn.text_factory = bytes
trows = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
md5_to_table: dict[str, str] = {}
for tr in trows:
@@ -414,11 +417,22 @@ def build_session_last_message_table(
"table_name": str(table_name),
},
)
except sqlite3.DatabaseError as e:
skipped_dbs += 1
logger.warning(
"[session_last_message] malformed message db skipped account=%s db=%s error=%s diag=%s",
account_dir.name,
str(db_path),
str(e),
format_sqlite_diagnostics(collect_sqlite_diagnostics(db_path, quick_check=True)),
)
continue
finally:
try:
conn.close()
except Exception:
pass
if conn is not None:
try:
conn.close()
except Exception:
pass
# Fallback: always have a non-empty preview for UI.
for r in sessions:
@@ -493,7 +507,7 @@ def build_session_last_message_table(
duration = max(0.0, time.time() - started)
logger.info(
f"[session_last_message] build done account={account_dir.name} sessions={len(best)} "
f"durationSec={round(duration, 3)} table={_TABLE_NAME}"
f"durationSec={round(duration, 3)} table={_TABLE_NAME} skippedDbs={skipped_dbs}"
)
return {
"status": "success",
@@ -501,4 +515,5 @@ def build_session_last_message_table(
"built": len(best),
"table": _TABLE_NAME,
"durationSec": round(duration, 3),
"skippedDbs": int(skipped_dbs),
}
+57 -3
View File
@@ -26,6 +26,46 @@ class WCDBRealtimeError(RuntimeError):
pass
def _clean_weflow_account_dir_name(dir_name: str) -> str:
"""调用 WCDB 前使用与 WeFlow 相同的账号/wxid 清理规则。"""
trimmed = str(dir_name or "").strip()
if not trimmed:
return trimmed
if trimmed.lower().startswith("wxid_"):
match = re.match(r"^(wxid_[^_]+)", trimmed, flags=re.IGNORECASE)
if match:
return match.group(1)
return trimmed
suffix_match = re.match(r"^(.+)_([a-zA-Z0-9]{4})$", trimmed)
return suffix_match.group(1) if suffix_match else trimmed
def _derive_weflow_wcdb_wxid(account: str, db_storage_dir: Optional[Path] = None) -> str:
"""推导传给 native WCDB 的 wxid,语义对齐 WeFlow。
output 账号目录可能带随机后缀例如 `Murderers_0e5d`
WeFlow 在调用 `wcdb_set_my_wxid` 前会去掉这个后缀如果传带后缀的名字
native 会话/消息查询可能只返回很少结果
"""
candidates: list[str] = []
if db_storage_dir is not None:
try:
parent_name = Path(db_storage_dir).parent.name
if parent_name:
candidates.append(parent_name)
except Exception:
pass
candidates.append(str(account or ""))
for item in candidates:
cleaned = _clean_weflow_account_dir_name(item)
if cleaned:
return cleaned
return str(account or "").strip()
_NATIVE_DIR = Path(__file__).resolve().parent / "native"
_DEFAULT_WCDB_API_DLL = _NATIVE_DIR / "wcdb_api.dll"
_WCDB_API_DLL_SELECTED: Optional[Path] = None
@@ -1459,6 +1499,7 @@ def _resolve_session_db_path(db_storage_dir: Path) -> Path:
@dataclass(frozen=True)
class WCDBRealtimeConnection:
account: str
native_wxid: str
handle: int
db_storage_dir: Path
session_db_path: Path
@@ -1484,13 +1525,16 @@ class WCDBRealtimeManager:
db_storage_dir = None
session_db_path = None
native_wxid = ""
err = ""
try:
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
if db_storage_dir is not None:
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
session_db_path = _resolve_session_db_path(db_storage_dir)
except Exception as e:
err = str(e)
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
dll_path = _resolve_wcdb_api_dll_path()
try:
@@ -1503,6 +1547,7 @@ class WCDBRealtimeManager:
"dll_present": bool(dll_ok),
"wcdb_api_dll": str(dll_path),
"key_present": bool(key_ok),
"native_wxid": native_wxid,
"db_storage_dir": str(db_storage_dir) if db_storage_dir else "",
"session_db_path": str(session_db_path) if session_db_path else "",
"connected": bool(connected),
@@ -1565,6 +1610,7 @@ class WCDBRealtimeManager:
raise WCDBRealtimeError("Cannot resolve db_storage directory for this account.")
session_db_path = _resolve_session_db_path(db_storage_dir)
native_wxid = _derive_weflow_wcdb_wxid(account, db_storage_dir)
# Run open_account in a daemon thread with a timeout to avoid
# blocking indefinitely when the native library hangs (locked DB).
@@ -1609,14 +1655,16 @@ class WCDBRealtimeManager:
raise WCDBRealtimeError("open_account returned no handle.")
handle = _handle_box[0]
# Some WCDB APIs (e.g. exec_query on non-session DBs) may require this context.
# 对齐 WeFlow:传清理后的 wxid/account 名称给 native WCDB
# 不传带 4 位随机后缀的导出目录名。
try:
set_my_wxid(handle, account)
set_my_wxid(handle, native_wxid)
except Exception:
pass
conn = WCDBRealtimeConnection(
account=account,
native_wxid=native_wxid,
handle=handle,
db_storage_dir=db_storage_dir,
session_db_path=session_db_path,
@@ -1627,7 +1675,13 @@ class WCDBRealtimeManager:
with self._mu:
self._conns[account] = conn
self._failed.pop(account, None)
logger.info("[wcdb] connected account=%s handle=%s session_db=%s", account, int(handle), session_db_path)
logger.info(
"[wcdb] connected account=%s native_wxid=%s handle=%s session_db=%s",
account,
native_wxid,
int(handle),
session_db_path,
)
return conn
finally:
with self._mu:
+536 -96
View File
@@ -13,12 +13,13 @@ import hashlib
import hmac
import os
import json
import struct
import time
from pathlib import Path
from typing import Any
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .app_paths import get_output_databases_dir
from .database_filters import should_skip_source_database
@@ -28,6 +29,270 @@ from .sqlite_diagnostics import collect_sqlite_diagnostics, sqlite_diagnostics_s
# SQLite文件头
SQLITE_HEADER = b"SQLite format 3\x00"
PAGE_SIZE = 4096
KEY_SIZE = 32
SALT_SIZE = 16
IV_SIZE = 16
HMAC_SIZE = 64
# WeChat 4.x SQLCipher/WCDB pages reserve IV + HMAC at the tail.
# When exporting to plain SQLite, do not keep encrypted IV/HMAC bytes in output pages.
RESERVE_SIZE = IV_SIZE + HMAC_SIZE
def _derive_mac_key(enc_key: bytes, salt: bytes) -> bytes:
"""Derive SQLCipher/WCDB page HMAC key."""
mac_salt = bytes(b ^ 0x3A for b in salt)
return hashlib.pbkdf2_hmac("sha512", enc_key, mac_salt, 2, dklen=KEY_SIZE)
def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes:
"""Derive AES enc_key from SQLCipher passphrase/base key."""
return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE)
def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes:
offset = SALT_SIZE if page_num == 1 else 0
data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(page[offset:data_end])
mac.update(page_num.to_bytes(4, "little"))
return mac.digest()
def _compute_page_hmac_variant(
mac_key: bytes,
page: bytes,
page_num: int,
*,
endian: str = "little",
include_iv: bool = True,
) -> bytes:
"""用于诊断的 HMAC 变体计算,不参与实际解密决策。"""
offset = SALT_SIZE if page_num == 1 else 0
data_end = PAGE_SIZE - RESERVE_SIZE + (IV_SIZE if include_iv else 0)
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(page[offset:data_end])
mac.update(page_num.to_bytes(4, endian))
return mac.digest()
def _hash_prefix(data: bytes, *, length: int = 16) -> str:
"""返回 SHA256 前缀,避免日志输出明文数据。"""
try:
return hashlib.sha256(bytes(data or b"")).hexdigest()[: max(int(length), 8)]
except Exception:
return ""
def _hex_prefix(data: bytes, *, length: int = 32) -> str:
try:
return bytes(data or b"")[: max(int(length), 0)].hex()
except Exception:
return ""
def _safe_file_snapshot(path: str | Path) -> dict[str, Any]:
"""采集源/输出文件与 WAL 旁路文件信息,用于定位解密时文件是否变化。"""
p = Path(path)
out: dict[str, Any] = {"path": str(p), "exists": False}
try:
st = p.stat()
out.update(
{
"exists": True,
"size": int(st.st_size),
"mtime_ns": int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))),
}
)
except Exception as exc:
out["stat_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}"
siblings: dict[str, Any] = {}
for suffix in ("-wal", "-shm", "-journal"):
sp = Path(str(p) + suffix)
try:
st = sp.stat()
siblings[suffix] = {
"exists": True,
"size": int(st.st_size),
"mtime_ns": int(getattr(st, "st_mtime_ns", int(st.st_mtime * 1_000_000_000))),
}
except FileNotFoundError:
siblings[suffix] = {"exists": False}
except Exception as exc:
siblings[suffix] = {
"exists": False,
"stat_error": f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}",
}
out["siblings"] = siblings
return out
def _read_plain_sqlite_header_debug(path: str | Path) -> dict[str, Any]:
"""解析明文 SQLite 头部关键字段,帮助定位输出库结构问题。"""
p = Path(path)
out: dict[str, Any] = {"path": str(p)}
try:
with p.open("rb") as f:
header = f.read(100)
out["header_len"] = len(header)
out["header_ok"] = header.startswith(SQLITE_HEADER)
out["header_hex"] = header[:32].hex()
if len(header) >= 100:
raw_page_size = struct.unpack(">H", header[16:18])[0]
out.update(
{
"page_size_header": 65536 if raw_page_size == 1 else int(raw_page_size),
"write_version": int(header[18]),
"read_version": int(header[19]),
"reserved_space": int(header[20]),
"max_payload_fraction": int(header[21]),
"min_payload_fraction": int(header[22]),
"leaf_payload_fraction": int(header[23]),
"file_change_counter": int.from_bytes(header[24:28], "big"),
"db_size_pages_header": int.from_bytes(header[28:32], "big"),
"freelist_trunk_page": int.from_bytes(header[32:36], "big"),
"freelist_pages": int.from_bytes(header[36:40], "big"),
"schema_cookie": int.from_bytes(header[40:44], "big"),
"schema_format": int.from_bytes(header[44:48], "big"),
"text_encoding": int.from_bytes(header[56:60], "big"),
}
)
except Exception as exc:
out["error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}"
return out
def _plain_page_btree_debug(page_plain: bytes, page_num: int) -> dict[str, Any]:
"""解析明文页 B-tree 页头摘要,不输出任何业务明文。"""
out: dict[str, Any] = {"page": int(page_num), "plain_sha256": _hash_prefix(page_plain, length=24)}
try:
hdr = 100 if int(page_num) == 1 else 0
if len(page_plain) >= hdr + 12:
page_type = int(page_plain[hdr])
out["btree_header_offset"] = int(hdr)
out["btree_page_type"] = page_type
out["btree_page_type_name"] = {
2: "interior_index",
5: "interior_table",
10: "leaf_index",
13: "leaf_table",
}.get(page_type, "unknown")
out["first_freeblock"] = int.from_bytes(page_plain[hdr + 1 : hdr + 3], "big")
out["cell_count"] = int.from_bytes(page_plain[hdr + 3 : hdr + 5], "big")
out["cell_content_area"] = int.from_bytes(page_plain[hdr + 5 : hdr + 7], "big")
out["fragmented_free_bytes"] = int(page_plain[hdr + 7])
if page_type in (2, 5):
out["right_most_pointer"] = int.from_bytes(page_plain[hdr + 8 : hdr + 12], "big")
except Exception as exc:
out["btree_parse_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:160]}"
return out
def _build_page_anomaly_debug(
enc_key: bytes,
mac_key: bytes,
page: bytes,
page_num: int,
*,
stored_hmac: bytes | None = None,
expected_hmac: bytes | None = None,
reason: str = "hmac",
) -> dict[str, Any]:
"""构造异常页诊断信息,默认只记录哈希/页头摘要。"""
page = bytes(page or b"")
stored = stored_hmac if stored_hmac is not None else page[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
expected = expected_hmac if expected_hmac is not None else _compute_page_hmac(mac_key, page, page_num)
iv = page[PAGE_SIZE - RESERVE_SIZE : PAGE_SIZE - RESERVE_SIZE + IV_SIZE]
encrypted_payload = page[SALT_SIZE if page_num == 1 else 0 : PAGE_SIZE - RESERVE_SIZE]
out: dict[str, Any] = {
"reason": str(reason),
"page": int(page_num),
"byte_start": int((int(page_num) - 1) * PAGE_SIZE),
"byte_end_exclusive": int(int(page_num) * PAGE_SIZE),
"page_size": int(len(page)),
"page_sha256": _hash_prefix(page, length=24),
"encrypted_payload_sha256": _hash_prefix(encrypted_payload, length=24),
"iv_hex": _hex_prefix(iv, length=16),
"stored_hmac_prefix": _hex_prefix(stored, length=16),
"expected_hmac_prefix": _hex_prefix(expected, length=16),
"hmac_match_current": bool(hmac.compare_digest(stored, expected)),
}
variants: dict[str, bool] = {}
for candidate_page in (page_num - 1, page_num, page_num + 1):
if candidate_page <= 0:
continue
for endian in ("little", "big"):
for include_iv in (True, False):
key = f"page={candidate_page};endian={endian};include_iv={int(include_iv)}"
try:
variants[key] = bool(
hmac.compare_digest(
stored,
_compute_page_hmac_variant(
mac_key,
page,
int(candidate_page),
endian=endian,
include_iv=include_iv,
),
)
)
except Exception:
variants[key] = False
out["hmac_variant_matches"] = [k for k, v in variants.items() if v]
try:
plain_page = _decrypt_page(enc_key, page, int(page_num))
out["aes_decrypt_ok"] = True
out["plain"] = _plain_page_btree_debug(plain_page, int(page_num))
except Exception as exc:
out["aes_decrypt_ok"] = False
out["aes_error"] = f"{type(exc).__name__}: {' '.join(str(exc).split())[:180]}"
return out
def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None:
"""Detect whether input key is raw enc_key or SQLCipher passphrase by page-1 HMAC."""
if len(page1) < PAGE_SIZE:
return None
salt = page1[:SALT_SIZE]
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE: PAGE_SIZE]
candidates = [
("raw_enc_key", key_material, _derive_mac_key(key_material, salt)),
]
derived_key = _derive_sqlcipher_enc_key(key_material, salt)
candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt)))
for mode, enc_key, mac_key in candidates:
if hmac.compare_digest(stored_page1_hmac, _compute_page_hmac(mac_key, page1, 1)):
return enc_key, mac_key, mode
return None
def _decrypt_page(enc_key: bytes, page: bytes, page_num: int) -> bytes:
iv = page[PAGE_SIZE - RESERVE_SIZE: PAGE_SIZE - RESERVE_SIZE + IV_SIZE]
offset = SALT_SIZE if page_num == 1 else 0
encrypted_page = page[offset: PAGE_SIZE - RESERVE_SIZE]
cipher = Cipher(
algorithms.AES(enc_key),
modes.CBC(iv),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize()
# Plain SQLite pages do not carry SQLCipher/WCDB IV/HMAC reserve bytes.
# Keep page size stable by zero-filling the reserve tail.
if page_num == 1:
return SQLITE_HEADER + decrypted_page + (b"\x00" * RESERVE_SIZE)
return decrypted_page + (b"\x00" * RESERVE_SIZE)
def _normalize_account_name(name: str) -> str:
@@ -161,8 +426,13 @@ def _resolve_db_storage_roots(storage_path: Path) -> list[Path]:
def scan_account_databases_from_path(db_storage_path: str) -> dict:
from .logging_config import get_logger
logger = get_logger(__name__)
storage_path = Path(str(db_storage_path or "").strip())
logger.info("[decrypt.scan] start db_storage_path=%s", str(storage_path))
if not storage_path.exists():
logger.warning("[decrypt.scan] path_not_exists db_storage_path=%s", str(storage_path))
return {
"status": "error",
"message": f"指定的数据库路径不存在: {db_storage_path}",
@@ -172,6 +442,10 @@ def scan_account_databases_from_path(db_storage_path: str) -> dict:
}
db_roots = _resolve_db_storage_roots(storage_path)
logger.info(
"[decrypt.scan] resolved_roots %s",
json.dumps([str(x) for x in db_roots], ensure_ascii=False),
)
if not db_roots:
return {
"status": "error",
@@ -223,6 +497,30 @@ def scan_account_databases_from_path(db_storage_path: str) -> dict:
}
)
logger.info(
"[decrypt.scan] databases_found %s",
json.dumps(
{
"account": account_name,
"db_storage_path": str(db_root),
"wxid_dir": str(db_root.parent),
"count": len(databases),
"files": [
{
"name": str(item.get("name") or ""),
"relative": str(Path(str(item.get("path") or "")).relative_to(db_root))
if str(item.get("path") or "").startswith(str(db_root))
else str(item.get("path") or ""),
}
for item in databases[:80]
],
"truncated": max(0, len(databases) - 80),
},
ensure_ascii=False,
sort_keys=True,
),
)
if not databases:
return {
"status": "error",
@@ -303,6 +601,18 @@ class WeChatDatabaseDecryptor:
"failed_pages": 0,
"failed_page_samples": [],
"failure_reasons": {},
"hmac_warning_pages": 0,
"hmac_warning_samples": [],
"hmac_debug_samples": [],
"aes_debug_samples": [],
"source_snapshot_before": {},
"source_snapshot_after": {},
"source_changed_during_read": False,
"read_ms": 0,
"key_mode": "",
"input_layout": {},
"expected_output_size": 0,
"output_header_debug": {},
"diagnostics": {},
"diagnostic_status": "not_run",
"error": "",
@@ -319,6 +629,14 @@ class WeChatDatabaseDecryptor:
item["error"] = err[:200]
result["failed_page_samples"].append(item)
def _append_hmac_warning_page(page_num: int) -> None:
# 非首页 HMAC 异常不再直接丢弃页面:部分微信 4.x 大库在 1GiB 边界会出现
# 单页 HMAC 不匹配,但页面本身仍可正常解密。丢页会导致后续页号整体错位。
result["hmac_warning_pages"] = int(result.get("hmac_warning_pages") or 0) + 1
if len(result["hmac_warning_samples"]) >= 8:
return
result["hmac_warning_samples"].append({"page": int(page_num), "reason": "hmac"})
def _finalize(success: bool, error: str = "") -> bool:
normalized_success = bool(success)
result["success"] = normalized_success
@@ -335,6 +653,7 @@ class WeChatDatabaseDecryptor:
diagnostics = collect_sqlite_diagnostics(output_file, quick_check=True)
result["diagnostics"] = diagnostics
result["diagnostic_status"] = sqlite_diagnostics_status(diagnostics)
result["output_header_debug"] = _read_plain_sqlite_header_debug(output_file)
if normalized_success:
failure_message = _build_decrypt_failure_message(result)
@@ -362,6 +681,18 @@ class WeChatDatabaseDecryptor:
"failed_pages": result["failed_pages"],
"failure_reasons": result["failure_reasons"],
"failed_page_samples": result["failed_page_samples"],
"hmac_warning_pages": result["hmac_warning_pages"],
"hmac_warning_samples": result["hmac_warning_samples"],
"hmac_debug_samples": result["hmac_debug_samples"],
"aes_debug_samples": result["aes_debug_samples"],
"source_snapshot_before": result["source_snapshot_before"],
"source_snapshot_after": result["source_snapshot_after"],
"source_changed_during_read": result["source_changed_during_read"],
"read_ms": result["read_ms"],
"key_mode": result["key_mode"],
"input_layout": result["input_layout"],
"expected_output_size": result["expected_output_size"],
"output_header_debug": result["output_header_debug"],
"diagnostic_status": result["diagnostic_status"],
"diagnostics": result["diagnostics"],
"error": result["error"],
@@ -370,6 +701,7 @@ class WeChatDatabaseDecryptor:
if (
(not result["success"])
or int(result["failed_pages"] or 0) > 0
or int(result.get("hmac_warning_pages") or 0) > 0
or str(result["diagnostic_status"] or "") != "ok"
):
log_fn = logger.warning
@@ -380,11 +712,81 @@ class WeChatDatabaseDecryptor:
logger.info(f"开始解密数据库: {db_path}")
try:
source_snapshot_before = _safe_file_snapshot(db_path)
result["source_snapshot_before"] = source_snapshot_before
logger.info(
"[decrypt.pipeline] source_snapshot_before %s",
json.dumps(
{
"db_name": result["db_name"],
"snapshot": source_snapshot_before,
},
ensure_ascii=False,
sort_keys=True,
),
)
read_t0 = time.perf_counter()
with open(db_path, 'rb') as f:
encrypted_data = f.read()
result["read_ms"] = round((time.perf_counter() - read_t0) * 1000.0, 1)
source_snapshot_after = _safe_file_snapshot(db_path)
result["source_snapshot_after"] = source_snapshot_after
before_size = int(source_snapshot_before.get("size") or 0)
after_size = int(source_snapshot_after.get("size") or 0)
before_mtime = int(source_snapshot_before.get("mtime_ns") or 0)
after_mtime = int(source_snapshot_after.get("mtime_ns") or 0)
source_changed = bool(before_size != after_size or before_mtime != after_mtime)
result["source_changed_during_read"] = source_changed
logger.info(
"[decrypt.pipeline] source_snapshot_after %s",
json.dumps(
{
"db_name": result["db_name"],
"snapshot": source_snapshot_after,
"read_ms": result["read_ms"],
"source_changed_during_read": source_changed,
},
ensure_ascii=False,
sort_keys=True,
),
)
if source_changed:
logger.warning(
"[decrypt.pipeline] source_changed_during_read db=%s before_size=%s after_size=%s before_mtime_ns=%s after_mtime_ns=%s",
result["db_name"],
before_size,
after_size,
before_mtime,
after_mtime,
)
logger.info(f"读取文件大小: {len(encrypted_data)} bytes")
result["input_size"] = int(len(encrypted_data))
result["input_layout"] = {
"page_size": PAGE_SIZE,
"reserve_size": RESERVE_SIZE,
"iv_size": IV_SIZE,
"hmac_size": HMAC_SIZE,
"input_size": int(len(encrypted_data)),
"input_size_mod_page": int(len(encrypted_data) % PAGE_SIZE),
"total_pages_floor": int(len(encrypted_data) // PAGE_SIZE),
"total_pages_ceil": int((len(encrypted_data) + PAGE_SIZE - 1) // PAGE_SIZE),
"starts_with_sqlite_header": bool(encrypted_data.startswith(SQLITE_HEADER)),
"first16_hex": encrypted_data[:16].hex(),
}
logger.info(
"[decrypt.pipeline] input_layout %s",
json.dumps(
{
"db_name": result["db_name"],
"input_layout": result["input_layout"],
},
ensure_ascii=False,
sort_keys=True,
),
)
if len(encrypted_data) < 4096:
logger.warning(f"文件太小,跳过解密: {db_path}")
@@ -398,113 +800,134 @@ class WeChatDatabaseDecryptor:
result["copied_as_sqlite"] = True
return _finalize(True)
# 提取salt (前16字节)
salt = encrypted_data[:16]
# 计算mac_salt (salt XOR 0x3a)
mac_salt = bytes(b ^ 0x3a for b in salt)
# 使用PBKDF2-SHA512派生密钥
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=salt,
iterations=256000,
backend=default_backend()
page1 = encrypted_data[:PAGE_SIZE]
resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1)
if resolved_key_material is None:
_append_failed_page(1, "hmac")
result["total_pages"] = int(len(encrypted_data) // PAGE_SIZE)
result["failed_pages"] = 1
logger.warning("Page 1 HMAC verification failed; key does not match database: %s", db_path)
return _finalize(False, "key_mismatch")
enc_key, mac_key, key_mode = resolved_key_material
result["key_mode"] = key_mode
logger.info("Page 1 HMAC verification passed: mode=%s path=%s", key_mode, db_path)
logger.info(
"[decrypt.pipeline] key_material_resolved %s",
json.dumps(
{
"db_name": result["db_name"],
"key_mode": key_mode,
"salt_sha256": _hash_prefix(page1[:SALT_SIZE], length=24),
"page1_stored_hmac_prefix": _hex_prefix(page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE], length=16),
"page1_expected_hmac_prefix": _hex_prefix(_compute_page_hmac(mac_key, page1, 1), length=16),
},
ensure_ascii=False,
sort_keys=True,
),
)
derived_key = kdf.derive(self.key_bytes)
# 派生MAC密钥
mac_kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=mac_salt,
iterations=2,
backend=default_backend()
)
mac_key = mac_kdf.derive(derived_key)
# 解密数据
decrypted_data = bytearray()
decrypted_data.extend(SQLITE_HEADER)
page_size = 4096
iv_size = 16
hmac_size = 64 # SHA512的HMAC是64字节
# 计算保留区域大小 (对齐到AES块大小)
reserve_size = iv_size + hmac_size
if reserve_size % 16 != 0:
reserve_size = ((reserve_size // 16) + 1) * 16
total_pages = len(encrypted_data) // page_size
total_pages = (len(encrypted_data) + PAGE_SIZE - 1) // PAGE_SIZE
successful_pages = 0
failed_pages = 0
result["total_pages"] = int(total_pages)
# 逐页解密
result["expected_output_size"] = int(total_pages * PAGE_SIZE)
logger.info(
"[decrypt.pipeline] page_loop_start db=%s total_pages=%s expected_output_size=%s",
result["db_name"],
int(total_pages),
int(result["expected_output_size"]),
)
for cur_page in range(total_pages):
start = cur_page * page_size
end = start + page_size
page = encrypted_data[start:end]
page_num = cur_page + 1 # 页面编号从1开始
if len(page) < page_size:
logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes")
page_num = cur_page + 1
start = cur_page * PAGE_SIZE
page = encrypted_data[start:start + PAGE_SIZE]
if not page:
break
# 确定偏移量:第一页(cur_page == 0)需要跳过salt
offset = 16 if cur_page == 0 else 0 # SALT_SIZE = 16
# 提取存储的HMAC
hmac_start = page_size - reserve_size + iv_size
hmac_end = hmac_start + hmac_size
stored_hmac = page[hmac_start:hmac_end]
# 按照wechat-dump-rs的方式验证HMAC
data_end = page_size - reserve_size + iv_size
hmac_data = page[offset:data_end]
# 分步计算HMAC:先更新数据,再更新页面编号
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(hmac_data) # 包含加密数据+IV
mac.update(page_num.to_bytes(4, 'little')) # 页面编号(小端序)
expected_hmac = mac.digest()
if stored_hmac != expected_hmac:
logger.warning(f"页面 {page_num} HMAC验证失败")
failed_pages += 1
_append_failed_page(page_num, "hmac")
continue
# 提取IV和加密数据用于AES解密
iv = page[page_size - reserve_size:page_size - reserve_size + iv_size]
encrypted_page = page[offset:page_size - reserve_size]
# AES-CBC解密
try:
cipher = Cipher(
algorithms.AES(derived_key),
modes.CBC(iv),
backend=default_backend()
if len(page) < PAGE_SIZE:
logger.warning(
"Page %s is short: %s bytes; padding to %s bytes",
page_num,
len(page),
PAGE_SIZE,
)
decryptor = cipher.decryptor()
decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize()
# 按照wechat-dump-rs的方式重组页面数据
decrypted_data.extend(decrypted_page)
decrypted_data.extend(page[page_size - reserve_size:]) # 保留区域
page = page + (b"\x00" * (PAGE_SIZE - len(page)))
stored_hmac = page[PAGE_SIZE - HMAC_SIZE: PAGE_SIZE]
expected_hmac = _compute_page_hmac(mac_key, page, page_num)
if not hmac.compare_digest(stored_hmac, expected_hmac):
logger.warning("Page %s HMAC verification failed; decrypting page anyway", page_num)
_append_hmac_warning_page(page_num)
anomaly_debug = _build_page_anomaly_debug(
enc_key,
mac_key,
page,
page_num,
stored_hmac=stored_hmac,
expected_hmac=expected_hmac,
reason="hmac",
)
if len(result["hmac_debug_samples"]) < 8:
result["hmac_debug_samples"].append(anomaly_debug)
logger.warning(
"[decrypt.page_anomaly] %s",
json.dumps(
{
"db_name": result["db_name"],
"anomaly": anomaly_debug,
},
ensure_ascii=False,
sort_keys=True,
),
)
try:
decrypted_data.extend(_decrypt_page(enc_key, page, page_num))
successful_pages += 1
except Exception as e:
logger.error(f"页面 {page_num} AES解密失败: {e}")
logger.error("Page %s AES decryption failed: %s", page_num, e)
failed_pages += 1
_append_failed_page(page_num, "aes", str(e))
aes_debug = _build_page_anomaly_debug(
enc_key,
mac_key,
page,
page_num,
stored_hmac=stored_hmac,
expected_hmac=expected_hmac,
reason="aes",
)
if len(result["aes_debug_samples"]) < 8:
result["aes_debug_samples"].append(aes_debug)
logger.error(
"[decrypt.page_anomaly] %s",
json.dumps(
{
"db_name": result["db_name"],
"anomaly": aes_debug,
},
ensure_ascii=False,
sort_keys=True,
),
)
# 保留页占位,避免后续页整体错位导致 SQLite 必然损坏。
decrypted_data.extend(b"\x00" * PAGE_SIZE)
continue
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages}")
if total_pages >= 100000 and page_num % 50000 == 0:
logger.info(
"[decrypt.pipeline] page_loop_progress db=%s page=%s/%s successful_pages=%s failed_pages=%s hmac_warning_pages=%s output_bytes=%s",
result["db_name"],
int(page_num),
int(total_pages),
int(successful_pages),
int(failed_pages),
int(result.get("hmac_warning_pages") or 0),
int(len(decrypted_data)),
)
result["successful_pages"] = int(successful_pages)
result["failed_pages"] = int(failed_pages)
@@ -513,6 +936,14 @@ class WeChatDatabaseDecryptor:
f.write(decrypted_data)
logger.info(f"解密文件大小: {len(decrypted_data)} bytes")
if int(len(decrypted_data)) != int(result["expected_output_size"]):
logger.warning(
"[decrypt.pipeline] output_size_mismatch db=%s output_size=%s expected_output_size=%s delta=%s",
result["db_name"],
int(len(decrypted_data)),
int(result["expected_output_size"]),
int(len(decrypted_data)) - int(result["expected_output_size"]),
)
if failed_pages > 0:
logger.warning(
"解密输出包含页失败: db=%s total_pages=%s failed_pages=%s failure_reasons=%s samples=%s",
@@ -522,6 +953,14 @@ class WeChatDatabaseDecryptor:
json.dumps(result["failure_reasons"], ensure_ascii=False, sort_keys=True),
json.dumps(result["failed_page_samples"], ensure_ascii=False),
)
if int(result.get("hmac_warning_pages") or 0) > 0:
logger.warning(
"解密输出包含HMAC告警页但已保留页内容: db=%s total_pages=%s hmac_warning_pages=%s samples=%s",
result["db_name"],
int(total_pages),
int(result.get("hmac_warning_pages") or 0),
json.dumps(result["hmac_warning_samples"], ensure_ascii=False),
)
return _finalize(True)
except Exception as e:
@@ -710,6 +1149,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
if (
(not bool(db_diagnostic.get("success", ok)))
or int(db_diagnostic.get("failed_pages") or 0) > 0
or int(db_diagnostic.get("hmac_warning_pages") or 0) > 0
or str(db_diagnostic.get("diagnostic_status") or "") != "ok"
):
account_diagnostic_warning_count += 1
+107
View File
@@ -0,0 +1,107 @@
import hashlib
import hmac
import tempfile
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
import wechat_decrypt_tool.wechat_decrypt as wechat_decrypt
from wechat_decrypt_tool.wechat_decrypt import (
HMAC_SIZE,
PAGE_SIZE,
RESERVE_SIZE,
SALT_SIZE,
SQLITE_HEADER,
WeChatDatabaseDecryptor,
_derive_mac_key,
_derive_sqlcipher_enc_key,
)
def _build_plain_page(fill: int, *, first_page: bool) -> bytes:
body = bytes([fill]) * (PAGE_SIZE - RESERVE_SIZE)
if first_page:
body = SQLITE_HEADER + body[len(SQLITE_HEADER):]
return body + (b"\x00" * RESERVE_SIZE)
def _encrypt_page(key_material: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes, *, passphrase: bool = False) -> bytes:
enc_key = _derive_sqlcipher_enc_key(key_material, salt) if passphrase else key_material
if page_num == 1:
encrypted_input = plain_page[SALT_SIZE: PAGE_SIZE - RESERVE_SIZE]
prefix = salt
else:
encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE]
prefix = b""
cipher = Cipher(algorithms.AES(enc_key), modes.CBC(iv), backend=default_backend())
encryptor = cipher.encryptor()
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
page_without_hmac = prefix + encrypted + iv
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0:])
mac.update(page_num.to_bytes(4, "little"))
return page_without_hmac + mac.digest()
def _decrypt_sample(key_hex: str, encrypted_db: bytes, monkeypatch) -> bytes:
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True})
monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok")
decryptor = WeChatDatabaseDecryptor(key_hex)
assert decryptor.decrypt_database(str(src), str(dst))
return dst.read_bytes()
def test_decrypt_database_accepts_raw_enc_key_like_weflow(monkeypatch):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x41, first_page=True)
page2 = _build_plain_page(0x42, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("0102030405060708090a0b0c0d0e0f10"))
encrypted_db += _encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("1112131415161718191a1b1c1d1e1f20"))
assert _decrypt_sample(raw_key.hex(), encrypted_db, monkeypatch) == page1 + page2
def test_decrypt_database_keeps_sqlcipher_passphrase_compatibility(monkeypatch):
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
salt = bytes.fromhex("40f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x51, first_page=True)
page2 = _build_plain_page(0x52, first_page=False)
encrypted_db = _encrypt_page(passphrase_key, page1, 1, salt, bytes.fromhex("2122232425262728292a2b2c2d2e2f30"), passphrase=True)
encrypted_db += _encrypt_page(passphrase_key, page2, 2, salt, bytes.fromhex("3132333435363738393a3b3c3d3e3f40"), passphrase=True)
assert _decrypt_sample(passphrase_key.hex(), encrypted_db, monkeypatch) == page1 + page2
def test_decrypt_database_keeps_page_when_non_first_hmac_mismatch(monkeypatch):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("60f4090ef6897e146f94109f13743e34")
page1 = _build_plain_page(0x61, first_page=True)
page2 = _build_plain_page(0x62, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, bytes.fromhex("4142434445464748494a4b4c4d4e4f50"))
encrypted_page2 = bytearray(_encrypt_page(raw_key, page2, 2, salt, bytes.fromhex("5152535455565758595a5b5c5d5e5f60")))
encrypted_page2[-1] ^= 0x01 # 只破坏 HMAC,不破坏密文和 IV。
encrypted_db += bytes(encrypted_page2)
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
monkeypatch.setattr(wechat_decrypt, "collect_sqlite_diagnostics", lambda *args, **kwargs: {"quick_check_ok": True})
monkeypatch.setattr(wechat_decrypt, "sqlite_diagnostics_status", lambda diagnostics: "ok")
decryptor = WeChatDatabaseDecryptor(raw_key.hex())
assert decryptor.decrypt_database(str(src), str(dst))
assert dst.read_bytes() == page1 + page2
assert decryptor.last_result["failed_pages"] == 0
assert decryptor.last_result["hmac_warning_pages"] == 1
Generated
+7 -7
View File
@@ -872,7 +872,7 @@ wheels = [
[[package]]
name = "wechat-decrypt-tool"
version = "1.7.20"
version = "1.8.0"
source = { editable = "." }
dependencies = [
{ name = "aiofiles" },
@@ -919,7 +919,7 @@ requires-dist = [
{ name = "requests", specifier = ">=2.32.4" },
{ name = "typing-extensions", specifier = ">=4.8.0" },
{ name = "uvicorn", extras = ["standard"], specifier = ">=0.24.0" },
{ name = "wx-key", specifier = ">=2.0.0" },
{ name = "wx-key", specifier = ">=2.0.1" },
{ name = "zstandard", specifier = ">=0.23.0" },
]
provides-extras = ["build"]
@@ -935,13 +935,13 @@ wheels = [
[[package]]
name = "wx-key"
version = "2.0.0"
version = "2.0.1"
source = { registry = "tools/key_wheels" }
wheels = [
{ path = "wx_key-2.0.0-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-2.0.0-cp314-cp314-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp311-cp311-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp312-cp312-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp313-cp313-win_amd64.whl" },
{ path = "wx_key-2.0.1-cp314-cp314-win_amd64.whl" },
]
[[package]]