improvement(media): 优化媒体密钥提取与解密体验

- 微信进程 AES 密钥提取:支持多 PID/更多进程名,尝试启用 SeDebugPrivilege,并扩展扫描范围与可读页过滤提升成功率
- AES key 内存匹配支持 16/32 位候选,校验覆盖 JPEG/PNG/GIF/WEBP/MP4 等常见资源头
- 解密页新增 XOR/AES 密钥一键复制与反馈提示,并补充管理员运行提示
- README 更新徽章与界面预览表格,新增/更新截图资源
This commit is contained in:
2977094657
2025-12-22 17:05:32 +08:00
parent 6c04aee6ea
commit a4d652230f
9 changed files with 323 additions and 108 deletions

View File

@@ -1,44 +1,56 @@
<p align="center">
<img src="frontend/public/logo.png" alt="微信数据库解密工具" width="200" />
</p>
<div align="center"> <div align="center">
<img src="frontend/public/logo.png" alt="微信数据库解密工具" width="200"> <h1>WeChatDataAnalysis - 微信数据库解密与分析工具</h1>
<p>一个专门用于微信4.x版本数据库解密的工具</p>
<img src="https://img.shields.io/github/v/tag/LifeArchiveProject/WeChatDataAnalysis" alt="Version" />
<img src="https://img.shields.io/github/stars/LifeArchiveProject/WeChatDataAnalysis" alt="Stars" />
<img src="https://img.shields.io/github/forks/LifeArchiveProject/WeChatDataAnalysis" alt="Forks" />
<img src="https://img.shields.io/github/license/LifeArchiveProject/WeChatDataAnalysis" alt="License" />
<img src="https://img.shields.io/badge/Python-3776AB?logo=Python&logoColor=white" alt="Python" />
<img src="https://img.shields.io/badge/FastAPI-009688?logo=FastAPI&logoColor=white" alt="FastAPI" />
<img src="https://img.shields.io/badge/Vue.js-4FC08D?logo=Vue.js&logoColor=white" alt="Vue.js" />
<img src="https://img.shields.io/badge/SQLite-003B57?logo=SQLite&logoColor=white" alt="SQLite" />
</div> </div>
# 微信数据库解密工具
一个专门用于微信4.x版本数据库解密的工具
## 界面预览 ## 界面预览
### 检测页面 <table>
<tr>
<td align="center"><b>首页</b></td>
<td align="center"><b>检测页面</b></td>
</tr>
<tr>
<td><img src="frontend/public/home.png" alt="首页" width="400"/></td>
<td><img src="frontend/public/detection.png" alt="微信检测页面" width="400"/></td>
</tr>
<tr>
<td align="center"><b>解密页面</b></td>
<td align="center"><b>图片密钥页面</b></td>
</tr>
<tr>
<td><img src="frontend/public/decrypt.png" alt="数据库解密页面" width="400"/></td>
<td><img src="frontend/public/imageAES.png" alt="图片密钥页面" width="400"/></td>
</tr>
<tr>
<td align="center"><b>图片解密页面</b></td>
<td align="center"><b>解密成功页面</b></td>
</tr>
<tr>
<td><img src="frontend/public/imageSucces.png" alt="图片解密页面" width="400"/></td>
<td><img src="frontend/public/success.png" alt="解密成功页面" width="400"/></td>
</tr>
<tr>
<td align="center" colspan="2"><b>聊天记录页面</b></td>
</tr>
<tr>
<td colspan="2" align="center"><img src="frontend/public/message.png" alt="聊天记录页面" width="800"/></td>
</tr>
</table>
<div align="center"> > **Note**: 聊天记录页面目前仅完成了基础展示功能,更多功能(搜索、导出、高级筛选等)尚在开发中。
<img src="frontend/public/detection.png" alt="微信检测页面" width="800">
</div>
自动检测微信安装路径和数据库文件位置,支持多账户识别。
### 解密页面
<div align="center">
<img src="frontend/public/decrypt.png" alt="数据库解密页面" width="800">
</div>
输入解密密钥,选择数据库文件进行批量解密操作。
### 解密成功页面
<div align="center">
<img src="frontend/public/success.png" alt="解密成功页面" width="800">
</div>
解密完成后显示统计信息,可直接跳转查看聊天记录。
### 聊天记录页面
<div align="center">
<img src="frontend/public/message.png" alt="聊天记录页面" width="800">
</div>
> **注意**: 聊天记录页面目前仅完成了基础展示功能,包括消息列表、文本/图片/语音等基本消息类型的显示。更多功能(如搜索、导出、高级筛选等)尚在开发中,当前界面不代表最终成品。
## 功能特性 ## 功能特性
@@ -164,7 +176,7 @@ uv run analyze_wechat_databases.py
#### 1. 获取图片解密密钥 #### 1. 获取图片解密密钥
```bash ```bash
# GET请求获取密钥需要微信正在运行提取AES密钥 # GET请求获取密钥需要微信正在运行;部分版本需以管理员身份运行后端才能提取AES密钥
curl http://localhost:8000/api/media/keys curl http://localhost:8000/api/media/keys
# 强制重新提取密钥 # 强制重新提取密钥

View File

@@ -133,15 +133,27 @@
<div class="bg-gray-50 rounded-lg p-4"> <div class="bg-gray-50 rounded-lg p-4">
<div class="flex justify-between items-center mb-2"> <div class="flex justify-between items-center mb-2">
<span class="text-sm font-medium text-[#000000e6]">XOR 密钥</span> <span class="text-sm font-medium text-[#000000e6]">XOR 密钥</span>
<span class="font-mono text-sm px-3 py-1 bg-white rounded border border-[#EDEDED]"> <button
type="button"
class="font-mono text-sm px-3 py-1 bg-white rounded border border-[#EDEDED] transition-colors"
:class="mediaKeys.xor_key ? 'cursor-pointer hover:bg-gray-50' : 'cursor-not-allowed opacity-60'"
:title="mediaKeys.xor_key ? '点击复制' : ''"
@click="copyKey('XOR 密钥', mediaKeys.xor_key)"
>
{{ mediaKeys.xor_key || '未获取' }} {{ mediaKeys.xor_key || '未获取' }}
</span> </button>
</div> </div>
<div class="flex justify-between items-center"> <div class="flex justify-between items-center">
<span class="text-sm font-medium text-[#000000e6]">AES 密钥</span> <span class="text-sm font-medium text-[#000000e6]">AES 密钥</span>
<span class="font-mono text-sm px-3 py-1 bg-white rounded border border-[#EDEDED]"> <button
{{ mediaKeys.aes_key ? mediaKeys.aes_key.substring(0, 8) + '...' : '未获取' }} type="button"
</span> class="font-mono text-sm px-3 py-1 bg-white rounded border border-[#EDEDED] transition-colors"
:class="mediaKeys.aes_key ? 'cursor-pointer hover:bg-gray-50' : 'cursor-not-allowed opacity-60'"
:title="mediaKeys.aes_key ? '点击复制' : ''"
@click="copyKey('AES 密钥', mediaKeys.aes_key)"
>
{{ mediaKeys.aes_key || '未获取' }}
</button>
</div> </div>
</div> </div>
@@ -151,6 +163,13 @@
</svg> </svg>
{{ mediaKeys.message }} {{ mediaKeys.message }}
</div> </div>
<div v-if="copyMessage" class="text-sm text-[#07C160] flex items-start">
<svg class="w-4 h-4 mr-2 mt-0.5 flex-shrink-0" fill="none" stroke="currentColor" viewBox="0 0 24 24">
<path stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M5 13l4 4L19 7"/>
</svg>
{{ copyMessage }}
</div>
</div> </div>
<!-- 操作按钮 --> <!-- 操作按钮 -->
@@ -300,7 +319,7 @@
<p class="mb-2">可能的失败原因</p> <p class="mb-2">可能的失败原因</p>
<ul class="list-disc list-inside space-y-1"> <ul class="list-disc list-inside space-y-1">
<li><strong>解密后非有效图片</strong>文件不是图片格式(如视频缩略图损坏)</li> <li><strong>解密后非有效图片</strong>文件不是图片格式(如视频缩略图损坏)</li>
<li><strong>V4-V2版本需要AES密钥</strong>需要微信运行时才能提取AES密钥</li> <li><strong>V4-V2版本需要AES密钥</strong>需要微信运行且部分环境需以管理员身份运行后端才能提取</li>
<li><strong>未知加密版本</strong>新版微信使用了不支持的加密方式</li> <li><strong>未知加密版本</strong>新版微信使用了不支持的加密方式</li>
<li><strong>文件为空</strong>原始文件损坏或为空文件</li> <li><strong>文件为空</strong>原始文件损坏或为空文件</li>
</ul> </ul>
@@ -411,6 +430,8 @@ const mediaKeys = reactive({
message: '' message: ''
}) })
const mediaLoading = ref(false) const mediaLoading = ref(false)
const copyMessage = ref('')
let copyMessageTimer = null
// 图片解密相关 // 图片解密相关
const mediaDecryptResult = ref(null) const mediaDecryptResult = ref(null)
@@ -527,6 +548,53 @@ const fetchMediaKeys = async (forceExtract = false) => {
} }
} }
const _copyToClipboard = async (text) => {
if (!process.client || typeof window === 'undefined') return false
if (!text) return false
try {
if (navigator?.clipboard?.writeText) {
await navigator.clipboard.writeText(text)
return true
}
} catch (e) {
// Ignore and fallback below
}
try {
const textarea = document.createElement('textarea')
textarea.value = text
textarea.setAttribute('readonly', '')
textarea.style.position = 'fixed'
textarea.style.opacity = '0'
textarea.style.left = '-9999px'
textarea.style.top = '0'
document.body.appendChild(textarea)
textarea.select()
textarea.setSelectionRange(0, textarea.value.length)
const ok = document.execCommand('copy')
document.body.removeChild(textarea)
return ok
} catch (e) {
return false
}
}
const _setCopyMessage = (message) => {
copyMessage.value = message
if (copyMessageTimer) clearTimeout(copyMessageTimer)
copyMessageTimer = setTimeout(() => {
copyMessage.value = ''
copyMessageTimer = null
}, 2000)
}
const copyKey = async (label, value) => {
if (!value) return
const ok = await _copyToClipboard(value)
_setCopyMessage(ok ? `${label}已复制` : `${label}复制失败,请手动复制`)
}
// 批量解密所有图片使用SSE实时进度 // 批量解密所有图片使用SSE实时进度
const decryptAllImages = async () => { const decryptAllImages = async () => {
mediaDecrypting.value = true mediaDecrypting.value = true

BIN
frontend/public/home.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 576 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 198 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 196 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 389 KiB

After

Width:  |  Height:  |  Size: 404 KiB

View File

@@ -13,6 +13,7 @@ from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache from functools import lru_cache
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Optional
from ctypes import wintypes
from fastapi import HTTPException from fastapi import HTTPException
@@ -999,6 +1000,14 @@ def _verify_wechat_aes_key(ciphertext: bytes, key16: bytes) -> bool:
return True return True
if plain.startswith(b"\x89PNG\r\n\x1a\n"): if plain.startswith(b"\x89PNG\r\n\x1a\n"):
return True return True
if plain.startswith(b"GIF87a") or plain.startswith(b"GIF89a"):
return True
if plain.startswith(b"wxgf"):
return True
if len(plain) >= 12 and plain.startswith(b"RIFF") and plain[8:12] == b"WEBP":
return True
if len(plain) >= 8 and plain[4:8] == b"ftyp":
return True
return False return False
except Exception: except Exception:
return False return False
@@ -1016,28 +1025,112 @@ class _MEMORY_BASIC_INFORMATION(ctypes.Structure):
] ]
def _find_weixin_pid() -> Optional[int]: def _find_weixin_pids() -> list[int]:
if psutil is None: if psutil is None:
return None return []
for p in psutil.process_iter(["name"]):
preferred = ["weixin.exe", "wechat.exe", "wechatappex.exe", "wechatapp.exe"]
preferred_set = set(preferred)
pids_by_name: dict[str, list[int]] = {n: [] for n in preferred}
extra: list[int] = []
for p in psutil.process_iter(["pid", "name"]):
try: try:
name = (p.info.get("name") or "").lower() name = (p.info.get("name") or "").lower()
if name in {"weixin.exe", "wechat.exe"}: pid = int(p.info.get("pid") or 0)
return int(p.pid)
except Exception: except Exception:
continue continue
return None if pid <= 0:
continue
if name in preferred_set:
pids_by_name[name].append(pid)
continue
if name.startswith("wechat") or name.startswith("weixin"):
extra.append(pid)
ordered: list[int] = []
for n in preferred:
ordered.extend(pids_by_name.get(n, []))
ordered.extend(extra)
seen: set[int] = set()
out: list[int] = []
for pid in ordered:
if pid in seen:
continue
seen.add(pid)
out.append(pid)
return out
def _try_enable_windows_debug_privilege() -> None:
if os.name != "nt":
return
try:
advapi32 = ctypes.windll.advapi32
kernel32 = ctypes.windll.kernel32
TOKEN_ADJUST_PRIVILEGES = 0x0020
TOKEN_QUERY = 0x0008
SE_PRIVILEGE_ENABLED = 0x0002
class _LUID(ctypes.Structure):
_fields_ = [("LowPart", wintypes.DWORD), ("HighPart", wintypes.LONG)]
class _LUID_AND_ATTRIBUTES(ctypes.Structure):
_fields_ = [("Luid", _LUID), ("Attributes", wintypes.DWORD)]
class _TOKEN_PRIVILEGES(ctypes.Structure):
_fields_ = [("PrivilegeCount", wintypes.DWORD), ("Privileges", _LUID_AND_ATTRIBUTES * 1)]
token = wintypes.HANDLE()
if not advapi32.OpenProcessToken(
kernel32.GetCurrentProcess(),
TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY,
ctypes.byref(token),
):
return
try:
luid = _LUID()
if not advapi32.LookupPrivilegeValueW(None, "SeDebugPrivilege", ctypes.byref(luid)):
return
tp = _TOKEN_PRIVILEGES()
tp.PrivilegeCount = 1
tp.Privileges[0].Luid = luid
tp.Privileges[0].Attributes = SE_PRIVILEGE_ENABLED
advapi32.AdjustTokenPrivileges(token, False, ctypes.byref(tp), 0, None, None)
finally:
kernel32.CloseHandle(token)
except Exception:
return
def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]: def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
pid = _find_weixin_pid() _try_enable_windows_debug_privilege()
if not pid: pids = _find_weixin_pids()
if not pids:
return None return None
PROCESS_VM_READ = 0x0010 PROCESS_VM_READ = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400 PROCESS_QUERY_INFORMATION = 0x0400
MEM_COMMIT = 0x1000 MEM_COMMIT = 0x1000
MEM_PRIVATE = 0x20000 MEM_PRIVATE = 0x20000
MEM_MAPPED = 0x40000
MEM_IMAGE = 0x1000000
PAGE_NOACCESS = 0x01
PAGE_READONLY = 0x02
PAGE_READWRITE = 0x04
PAGE_WRITECOPY = 0x08
PAGE_EXECUTE_READ = 0x20
PAGE_EXECUTE_READWRITE = 0x40
PAGE_EXECUTE_WRITECOPY = 0x80
PAGE_GUARD = 0x100
kernel32 = ctypes.windll.kernel32 kernel32 = ctypes.windll.kernel32
@@ -1063,68 +1156,98 @@ def _extract_wechat_aes_key_from_process(ciphertext: bytes) -> Optional[bytes]:
CloseHandle.argtypes = [ctypes.c_void_p] CloseHandle.argtypes = [ctypes.c_void_p]
CloseHandle.restype = ctypes.c_bool CloseHandle.restype = ctypes.c_bool
handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid) readable_mask = (
if not handle: PAGE_READONLY
return None | PAGE_READWRITE
| PAGE_WRITECOPY
| PAGE_EXECUTE_READ
| PAGE_EXECUTE_READWRITE
| PAGE_EXECUTE_WRITECOPY
)
stop = threading.Event() def is_readable(protect: int) -> bool:
result: list[Optional[bytes]] = [None] if protect & PAGE_GUARD:
pattern = re.compile(rb"[^a-z0-9]([a-z0-9]{32})[^a-z0-9]", flags=re.IGNORECASE) return False
if protect & PAGE_NOACCESS:
return False
return bool(protect & readable_mask)
def read_mem(addr: int, size: int) -> Optional[bytes]: pattern = re.compile(rb"(?i)(?<![0-9a-z])([0-9a-z]{16}|[0-9a-z]{32})(?![0-9a-z])")
buf = ctypes.create_string_buffer(size)
read = ctypes.c_size_t(0) def scan_pid(pid: int) -> Optional[bytes]:
ok = ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, size, ctypes.byref(read)) handle = OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, pid)
if not ok or read.value <= 0: if not handle:
return None return None
return buf.raw[: read.value]
def scan_region(base: int, region_size: int) -> Optional[bytes]: stop = threading.Event()
chunk = 4 * 1024 * 1024 result: list[Optional[bytes]] = [None]
offset = 0
tail = b"" def read_mem(addr: int, size: int) -> Optional[bytes]:
while offset < region_size and not stop.is_set(): buf = ctypes.create_string_buffer(size)
to_read = min(chunk, region_size - offset) read = ctypes.c_size_t(0)
b = read_mem(base + offset, int(to_read)) ok = ReadProcessMemory(handle, ctypes.c_void_p(addr), buf, size, ctypes.byref(read))
if not b: if not ok or read.value <= 0:
return None return None
data = tail + b return buf.raw[: read.value]
for m in pattern.finditer(data):
cand32 = m.group(1)
cand16 = cand32[:16]
if _verify_wechat_aes_key(ciphertext, cand16):
return cand16
tail = data[-64:] if len(data) > 64 else data
offset += to_read
return None
regions: list[tuple[int, int]] = [] def scan_region(base: int, region_size: int) -> Optional[bytes]:
mbi = _MEMORY_BASIC_INFORMATION() chunk = 4 * 1024 * 1024
addr = 0 offset = 0
try: tail = b""
while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)): while offset < region_size and not stop.is_set():
try: to_read = min(chunk, region_size - offset)
if int(mbi.State) == MEM_COMMIT and int(mbi.Type) == MEM_PRIVATE: b = read_mem(base + offset, int(to_read))
base = int(mbi.BaseAddress) if not b:
size = int(mbi.RegionSize) return None
if size > 0: data = tail + b
regions.append((base, size)) for m in pattern.finditer(data):
addr = int(mbi.BaseAddress) + int(mbi.RegionSize) cand = m.group(1)
except Exception: if len(cand) == 16:
addr += 0x1000 candidates = [cand]
if addr <= 0: else:
break candidates = [cand[:16], cand[16:]]
for cand16 in candidates:
if _verify_wechat_aes_key(ciphertext, cand16):
return cand16
tail = data[-64:] if len(data) > 64 else data
offset += to_read
return None
with ThreadPoolExecutor(max_workers=min(32, max(1, len(regions)))) as ex: regions: list[tuple[int, int]] = []
for found in ex.map(lambda r: scan_region(r[0], r[1]), regions): mbi = _MEMORY_BASIC_INFORMATION()
if found: addr = 0
result[0] = found try:
stop.set() while VirtualQueryEx(handle, ctypes.c_void_p(addr), ctypes.byref(mbi), ctypes.sizeof(mbi)):
try:
if int(mbi.State) == MEM_COMMIT and int(mbi.Type) in {MEM_PRIVATE, MEM_MAPPED, MEM_IMAGE}:
protect = int(mbi.Protect)
if is_readable(protect):
base = int(mbi.BaseAddress)
size = int(mbi.RegionSize)
if size > 0:
regions.append((base, size))
addr = int(mbi.BaseAddress) + int(mbi.RegionSize)
except Exception:
addr += 0x1000
if addr <= 0:
break break
finally:
CloseHandle(handle)
return result[0] with ThreadPoolExecutor(max_workers=min(32, max(1, len(regions)))) as ex:
for found in ex.map(lambda r: scan_region(r[0], r[1]), regions):
if found:
result[0] = found
stop.set()
break
finally:
CloseHandle(handle)
return result[0]
for pid in pids:
found = scan_pid(pid)
if found:
return found
return None
def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None: def _save_media_keys(account_dir: Path, xor_key: int, aes_key16: bytes) -> None:

View File

@@ -73,7 +73,19 @@ def _verify(encrypted: bytes, key: bytes) -> bool:
aes_key = key[:16] aes_key = key[:16]
cipher = AES.new(aes_key, AES.MODE_ECB) cipher = AES.new(aes_key, AES.MODE_ECB)
text = cipher.decrypt(encrypted) text = cipher.decrypt(encrypted)
return bool(text.startswith(b"\xff\xd8\xff")) if text.startswith(b"\xff\xd8\xff"):
return True
if text.startswith(b"\x89PNG\r\n\x1a\n"):
return True
if text.startswith(b"GIF87a") or text.startswith(b"GIF89a"):
return True
if text.startswith(b"wxgf"):
return True
if len(text) >= 12 and text.startswith(b"RIFF") and text[8:12] == b"WEBP":
return True
if len(text) >= 8 and text[4:8] == b"ftyp":
return True
return False
def _search_memory_chunk(process_handle, base_address: int, region_size: int, encrypted: bytes, rules): def _search_memory_chunk(process_handle, base_address: int, region_size: int, encrypted: bytes, rules):
@@ -101,7 +113,7 @@ def _get_aes_key(encrypted: bytes, pid: int) -> Any:
rules_key = r""" rules_key = r"""
rule AesKey { rule AesKey {
strings: strings:
$pattern = /[^a-z0-9][a-z0-9]{32}[^a-z0-9]/ $pattern = /[^0-9a-z]([0-9a-z]{16}|[0-9a-z]{32})[^0-9a-z]/ nocase
condition: condition:
$pattern $pattern
} }

View File

@@ -98,7 +98,7 @@ async def get_media_keys(account: Optional[str] = None, force_extract: bool = Fa
# 保存密钥到缓存 # 保存密钥到缓存
_save_media_keys(account_dir, xor_key, aes_key16) _save_media_keys(account_dir, xor_key, aes_key16)
else: else:
aes_message = "无法从微信进程提取AES密钥微信是否正在运行?" aes_message = "无法从微信进程提取AES密钥请确认微信正在运行,并尝试以管理员身份运行后端;部分新版微信可能暂不兼容"
else: else:
aes_message = "未找到V2加密模板文件" aes_message = "未找到V2加密模板文件"
else: else: