Compare commits

...

5 Commits

15 changed files with 567 additions and 560 deletions
+1 -10
View File
@@ -6,6 +6,7 @@
<h1>WeChatDataAnalysis - 微信数据库解密与分析工具</h1>
<p>微信4.x数据解密并生成年度总结,高仿微信,支持实时更新,导出聊天记录,朋友圈等大量便捷功能</p>
<p><b>特别致谢</b><a href="https://github.com/H3CoF6">H3CoF6</a>(密钥与朋友圈等核心内容的技术支持)、<a href="https://github.com/ycccccccy/echotrace">echotrace</a>、<a href="https://github.com/hicccc77/WeFlow">WeFlow</a>(本项目大量功能参考其实现)</p>
<p>如需定制功能,请联系 QQ2977094657。</p>
<img src="https://img.shields.io/github/v/tag/LifeArchiveProject/WeChatDataAnalysis" alt="Version" />
<img src="https://img.shields.io/github/stars/LifeArchiveProject/WeChatDataAnalysis" alt="Stars" />
<img src="https://gh-down-badges.linkof.link/LifeArchiveProject/WeChatDataAnalysis" alt="Downloads" />
@@ -192,16 +193,6 @@ npm run dist
3. **数据隐私**: 解密后的数据包含个人隐私信息,请谨慎处理
4. **合法使用**: 请遵守相关法律法规,不得用于非法目的
## 修改消息
支持在聊天页对单条消息进行本地修改(如修改消息文本/字段、修复为我发送、反转本地气泡方向),并在“修改记录”页查看原始与当前对比,支持单条恢复或按会话一键恢复。
该功能只修改本机本地数据库(`db_storage` 与解密副本),不会调用远端回写接口。
<p align="center">
<img src="frontend/public/edit.gif" alt="本地消息修改" width="800" />
</p>
## 致谢
本项目的开发过程中参考了以下优秀的开源项目和资源:
+126
View File
@@ -1139,6 +1139,132 @@
flex-shrink: 0;
}
.wechat-link-card-finder {
width: 135px;
min-width: 135px;
max-width: 135px;
border: none;
box-shadow: none;
outline: none;
cursor: pointer;
text-decoration: none;
}
.wechat-link-card-finder.wechat-link-card--disabled {
cursor: default;
}
.wechat-link-finder-cover {
width: 135px;
height: 185px;
position: relative;
overflow: hidden;
border-radius: 4px;
background: var(--app-surface-muted);
}
.wechat-link-finder-cover--empty {
background: linear-gradient(180deg, #37cc6a 0%, #118f42 100%);
}
.wechat-link-finder-cover-img {
width: 100%;
height: 100%;
object-fit: cover;
object-position: center;
display: block;
}
.wechat-link-finder-cover-placeholder {
position: absolute;
inset: 0;
display: flex;
align-items: center;
justify-content: center;
color: rgba(255, 255, 255, 0.92);
}
.wechat-link-finder-cover-placeholder svg {
width: 34px;
height: 34px;
}
.wechat-link-finder-cover-shade {
position: absolute;
inset: 0;
background: linear-gradient(180deg, rgba(0, 0, 0, 0.04) 0%, rgba(0, 0, 0, 0.12) 42%, rgba(0, 0, 0, 0.68) 100%);
}
.wechat-link-finder-play {
position: absolute;
left: 50%;
top: 50%;
transform: translate(-50%, -66%);
width: 40px;
height: 40px;
border-radius: 50%;
background: rgba(0, 0, 0, 0.42);
display: flex;
align-items: center;
justify-content: center;
color: #fff;
box-shadow: 0 8px 24px rgba(0, 0, 0, 0.18);
}
.wechat-link-finder-play svg {
width: 20px;
height: 20px;
margin-left: 2px;
}
.wechat-link-finder-meta {
position: absolute;
left: 8px;
right: 8px;
bottom: 8px;
display: flex;
flex-direction: column;
gap: 0;
}
.wechat-link-finder-author {
display: flex;
align-items: center;
gap: 5px;
min-width: 0;
padding: 5px 7px;
border-radius: 999px;
background: rgba(0, 0, 0, 0.28);
backdrop-filter: blur(6px);
}
.wechat-link-finder-author-avatar {
width: 18px;
height: 18px;
flex-shrink: 0;
display: flex;
align-items: center;
justify-content: center;
}
.wechat-link-finder-author-avatar-img {
width: 100%;
height: 100%;
object-fit: contain;
display: block;
}
.wechat-link-finder-author-name {
min-width: 0;
flex: 1 1 auto;
font-size: 10px;
color: rgba(255, 255, 255, 0.96);
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
text-shadow: 0 1px 2px rgba(0, 0, 0, 0.28);
}
/* 隐私模式模糊效果 */
.privacy-blur {
filter: blur(9px);
+6
View File
@@ -1437,14 +1437,20 @@
.session-list-item-name {
color: var(--session-list-name);
font-weight: 400;
font-synthesis: none;
}
.session-list-item-time {
color: var(--session-list-meta);
font-weight: 400;
font-synthesis: none;
}
.session-list-item-preview {
color: var(--session-list-preview);
font-weight: 400;
font-synthesis: none;
}
.contact-search-wrapper {
+68 -3
View File
@@ -2,6 +2,8 @@
import { defineComponent, h, ref, watch } from 'vue'
import miniProgramIconUrl from '~/assets/images/wechat/mini-program.svg'
const finderLogoUrl = '/assets/images/wechat/channels-logo.svg'
export default defineComponent({
name: 'LinkCard',
props: {
@@ -51,7 +53,11 @@ export default defineComponent({
return text ? (Array.from(text)[0] || '') : ''
})()
const fromAvatarUrl = String(props.fromAvatar || '').trim()
const headingText = String(props.heading || href || '').trim()
let abstractText = String(props.abstract || '').trim()
if (abstractText && headingText && abstractText === headingText) abstractText = ''
const isMiniProgram = String(props.linkType || '').trim() === 'mini_program'
const isFinder = String(props.linkType || '').trim() === 'finder'
const isCoverVariant = !isMiniProgram && String(props.variant || '').trim() === 'cover'
const Tag = canNavigate ? 'a' : 'div'
@@ -140,9 +146,68 @@ export default defineComponent({
)
}
const headingText = String(props.heading || href || '').trim()
let abstractText = String(props.abstract || '').trim()
if (abstractText && headingText && abstractText === headingText) abstractText = ''
if (isFinder) {
return h(
Tag,
{
...(canNavigate ? { href, target: '_blank', rel: 'noreferrer' } : { role: 'group', 'aria-disabled': 'true' }),
class: [
'wechat-link-card-finder',
!canNavigate ? 'wechat-link-card--disabled' : '',
'wechat-special-card',
'msg-radius',
props.isSent ? 'wechat-special-sent-side' : ''
].filter(Boolean).join(' '),
style: {
width: '135px',
minWidth: '135px',
maxWidth: '135px',
display: 'flex',
flexDirection: 'column',
boxSizing: 'border-box',
flex: '0 0 auto',
border: 'none',
boxShadow: 'none',
textDecoration: 'none',
outline: 'none'
}
},
[
h('div', { class: ['wechat-link-finder-cover', !props.preview ? 'wechat-link-finder-cover--empty' : ''].filter(Boolean).join(' ') }, [
props.preview
? h('img', {
src: props.preview,
alt: props.heading || '视频号封面',
class: 'wechat-link-finder-cover-img',
referrerpolicy: 'no-referrer'
})
: h('div', { class: 'wechat-link-finder-cover-placeholder', 'aria-hidden': 'true' }, [
h('svg', { viewBox: '0 0 24 24', fill: 'currentColor' }, [
h('path', { d: 'M8 5v14l11-7z' })
])
]),
h('div', { class: 'wechat-link-finder-cover-shade', 'aria-hidden': 'true' }),
h('div', { class: 'wechat-link-finder-play', 'aria-hidden': 'true' }, [
h('svg', { viewBox: '0 0 24 24', fill: 'currentColor' }, [
h('path', { d: 'M8 5v14l11-7z' })
])
]),
h('div', { class: 'wechat-link-finder-meta' }, [
h('div', { class: 'wechat-link-finder-author' }, [
h('div', { class: 'wechat-link-finder-author-avatar', 'aria-hidden': 'true' }, [
h('img', {
src: finderLogoUrl,
alt: '',
class: 'wechat-link-finder-author-avatar-img'
})
]),
h('div', { class: 'wechat-link-finder-author-name' }, fromText || '视频号')
])
])
])
]
)
}
if (isMiniProgram) {
return h(
@@ -98,7 +98,7 @@
<!-- 联系人信息 -->
<div class="flex-1 min-w-0">
<div class="flex items-center justify-between">
<h3 class="session-list-item-name text-sm font-medium truncate" :class="{ 'privacy-blur': privacyMode }">{{ contact.name }}</h3>
<h3 class="session-list-item-name text-sm truncate" :class="{ 'privacy-blur': privacyMode }">{{ contact.name }}</h3>
<div class="flex items-center flex-shrink-0 ml-2">
<span class="session-list-item-time text-xs">{{ contact.lastMessageTime }}</span>
</div>
-3
View File
@@ -73,9 +73,6 @@
</svg>
点击按钮将自动获取数据库图片双重密钥您也可以手动输入已知的64位密钥使用<a href="https://github.com/ycccccccy/wx_key" target="_blank" class="text-[#07C160] hover:text-[#06AD56]">wx_key</a>等工具获取
</p>
<div class="mt-3 rounded-lg border border-amber-200 bg-amber-50 px-3 py-2 text-xs leading-5 text-amber-900">
提示数据库密钥跟随账号 + 设备下发同一账号在另一台电脑生成的聊天记录复制到当前设备后通常无法在当前设备重新获取原设备对应的密钥因此也无法直接解密
</div>
</div>
<!-- 数据库路径输入 -->
@@ -0,0 +1,5 @@
<?xml version="1.0" standalone="no"?>
<!DOCTYPE svg PUBLIC "-//W3C//DTD SVG 1.1//EN" "http://www.w3.org/Graphics/SVG/1.1/DTD/svg11.dtd">
<svg t="1774499781741" class="icon" viewBox="0 0 1024 1024" version="1.1" xmlns="http://www.w3.org/2000/svg" p-id="7897" xmlns:xlink="http://www.w3.org/1999/xlink" width="200" height="200">
<path d="M118.81813333 106.3936c87.27893333-26.2144 192.03413333 75.1616 358.46826667 346.9312 18.70506667 30.5152 34.7136 55.46666667 35.60106667 55.5008 0.88746667 0 17.74933333-26.4192 37.41013333-58.70933333 165.51253333-271.53066667 270.336-371.3024 359.35573333-342.08426667 56.55893333 18.56853333 80.41813333 73.18186667 80.21333334 183.63733333-0.4096 214.86933333-103.69706667 551.59466667-188.0064 612.89813334-69.4272 50.44906667-173.1584-13.07306667-269.85813334-165.30773334-9.59146667-15.1552-18.36373333-27.57973333-19.42186666-27.61386666-1.05813333 0-9.59146667 12.32213333-18.944 27.4432-50.96106667 82.39786667-113.4592 146.26133333-167.04853334 170.7008-26.04373333 11.8784-71.33866667 13.5168-90.45333333 3.24266666-52.08746667-27.98933333-110.72853333-149.504-156.16-323.72053333C7.3728 310.95466667 21.504 135.68 118.81813333 106.3936zM848.31573333 217.088c-55.26186667 42.93973333-126.49813333 138.58133333-230.8096 309.93066667l-49.2544 80.82773333 16.86186667 30.17386667c42.35946667 75.94666667 91.30666667 139.81013333 130.79893333 170.66666666 26.76053333 20.95786667 35.60106667 16.55466667 58.9824-29.4912 73.5232-144.55466667 136.192-440.7296 115.712-547.19146666-6.144-32.0512-15.80373333-35.46453333-42.2912-14.91626667zM143.73546667 207.9744c-19.72906667 19.49013333-14.60906667 145.8176 10.99093333 271.90613333 30.89066667 152.23466667 95.91466667 329.3184 124.5184 339.2512 27.81866667 9.65973333 104.31146667-77.824 164.38613333-188.0064l13.14133334-24.13226666-42.15466667-69.18826667c-112.98133333-185.344-186.64106667-284.3648-240.8448-323.72053333-16.65706667-12.0832-22.7328-13.312-30.03733333-6.10986667z" fill="#FF9908" p-id="7898"></path>
</svg>

After

Width:  |  Height:  |  Size: 2.0 KiB

+64
View File
@@ -1220,6 +1220,70 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"linkStyle": link_style,
}
if app_type == 51:
# 视频号分享(Finder / Channels
# 常见特征:
# - title 是「当前版本不支持展示该内容,请升级至最新版本。」
# - 真正标题在 <finderFeed><desc> 或其它 finder 节点里
finder_feed = _extract_xml_tag_text(text, "finderFeed")
finder_desc = (
(_extract_xml_tag_text(finder_feed, "desc") if finder_feed else "")
or _extract_xml_tag_text(text, "finderdesc")
or des
)
finder_nickname = (
_extract_xml_tag_text(text, "findernickname")
or _extract_xml_tag_text(text, "finder_nickname")
or (_extract_xml_tag_text(finder_feed, "nickname") if finder_feed else "")
or (_extract_xml_tag_text(finder_feed, "findernickname") if finder_feed else "")
)
finder_username = (
_extract_xml_tag_text(text, "finderusername")
or _extract_xml_tag_text(text, "finder_username")
or (_extract_xml_tag_text(finder_feed, "username") if finder_feed else "")
or (_extract_xml_tag_text(finder_feed, "finderusername") if finder_feed else "")
)
thumb_url = _normalize_xml_url(
_extract_xml_tag_or_attr(text, "thumburl")
or _extract_xml_tag_or_attr(text, "cdnthumburl")
or _extract_xml_tag_or_attr(text, "coverurl")
or _extract_xml_tag_or_attr(text, "cover")
or (_extract_xml_tag_or_attr(finder_feed, "thumbUrl") if finder_feed else "")
or (_extract_xml_tag_or_attr(finder_feed, "thumburl") if finder_feed else "")
or (_extract_xml_tag_or_attr(finder_feed, "coverUrl") if finder_feed else "")
or (_extract_xml_tag_or_attr(finder_feed, "coverurl") if finder_feed else "")
)
finder_url = url or _normalize_xml_url(
(_extract_xml_tag_text(finder_feed, "url") if finder_feed else "")
or (_extract_xml_tag_text(text, "playurl"))
or (_extract_xml_tag_text(text, "dataurl"))
)
display_title = str(title or "").strip()
if (not display_title) or ("不支持" in display_title):
display_title = str(finder_desc or "").strip()
if not display_title:
display_title = str(des or "").strip()
display_title = display_title or "[视频号]"
summary_text = str(finder_desc or "").strip() or display_title
from_display = str(finder_nickname or source_display_name or "").strip() or "视频号"
from_u = str(finder_username or source_username or "").strip()
return {
"renderType": "link",
"content": summary_text,
"title": display_title,
"url": finder_url or "",
"thumbUrl": thumb_url or "",
"from": from_display,
"fromUsername": from_u,
"linkType": "finder",
"linkStyle": "finder",
}
if app_type in (33, 36):
# 小程序分享(WeChat v4 常见:local_type = 49 + (33<<32) / 49 + (36<<32)
# 注:部分 payload 的 <url> 为空;前端会按需渲染为不可点击卡片。
+14 -12
View File
@@ -1626,12 +1626,13 @@ def sync_chat_realtime_messages(
inserted = 0
backfilled = 0
if new_rows and (not name2id_synced):
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
rows=new_rows,
)
if new_rows:
if not name2id_synced:
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
rows=new_rows,
)
# Insert older -> newer to keep sqlite btree locality similar to existing data.
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
@@ -1991,12 +1992,13 @@ def _sync_chat_realtime_messages_for_table(
inserted = 0
backfilled = 0
if new_rows and (not name2id_synced):
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
rows=new_rows,
)
if new_rows:
if not name2id_synced:
_best_effort_upsert_output_name2id_rows(
msg_conn,
account_name=account_dir.name,
rows=new_rows,
)
values = [tuple(r.get(c) for c in insert_cols) for r in reversed(new_rows)]
insert_t0 = time.perf_counter()
+3 -27
View File
@@ -14,12 +14,7 @@ from ..app_paths import get_output_databases_dir
from ..logging_config import get_logger
from ..path_fix import PathFixRoute
from ..key_store import upsert_account_keys_in_store
from ..wechat_decrypt import (
WeChatDatabaseDecryptor,
build_decrypt_result_message,
decrypt_wechat_databases,
scan_account_databases_from_path,
)
from ..wechat_decrypt import WeChatDatabaseDecryptor, decrypt_wechat_databases, scan_account_databases_from_path
logger = get_logger(__name__)
@@ -81,7 +76,6 @@ async def decrypt_databases(request: DecryptRequest):
"message": results["message"],
"processed_files": results["processed_files"],
"failed_files": results["failed_files"],
"failure_details": results.get("failure_details", []),
"account_results": results.get("account_results", {}),
}
@@ -165,7 +159,6 @@ async def decrypt_databases_stream(
fail_count = 0
processed_files: list[str] = []
failed_files: list[str] = []
failure_details: list[dict] = []
account_results: dict = {}
overall_current = 0
@@ -188,7 +181,6 @@ async def decrypt_databases_stream(
account_success = 0
account_processed: list[str] = []
account_failed: list[str] = []
account_failure_details: list[dict] = []
for db_info in dbs:
if await request.is_disconnected():
@@ -240,20 +232,11 @@ async def decrypt_databases_stream(
status = "success"
msg = "解密成功"
else:
failure_detail = {
"account": account,
"file": db_path,
"name": db_name,
"code": str(decryptor.last_error_code or "").strip(),
"reason": str(decryptor.last_error_message or "").strip() or "解密失败",
}
account_failed.append(db_path)
account_failure_details.append(failure_detail)
failed_files.append(db_path)
failure_details.append(failure_detail)
fail_count += 1
status = "fail"
msg = failure_detail["reason"]
msg = "解密失败"
yield _sse(
{
@@ -278,7 +261,6 @@ async def decrypt_databases_stream(
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed,
"failure_details": account_failure_details,
}
# Build cache table (keep behavior consistent with the POST endpoint).
@@ -325,15 +307,9 @@ async def decrypt_databases_stream(
"success_count": success_count,
"failure_count": total_databases - success_count,
"output_directory": str(base_output_dir.absolute()),
"message": build_decrypt_result_message(
total_databases=total_databases,
success_count=success_count,
failed_count=total_databases - success_count,
failure_details=failure_details,
),
"message": f"解密完成: 成功 {success_count}/{total_databases}",
"processed_files": processed_files,
"failed_files": failed_files,
"failure_details": failure_details,
"account_results": account_results,
}
+130 -232
View File
@@ -13,12 +13,12 @@ import hashlib
import hmac
import os
import json
import shutil
import tempfile
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from .app_paths import get_output_databases_dir
@@ -26,117 +26,6 @@ from .app_paths import get_output_databases_dir
# SQLite文件头
SQLITE_HEADER = b"SQLite format 3\x00"
PAGE_SIZE = 4096
KEY_SIZE = 32
SALT_SIZE = 16
IV_SIZE = 16
HMAC_SIZE = 64
RESERVE_SIZE = 80
KEY_MISMATCH_GUIDANCE = (
"请在当前设备登录该账号后重新获取密钥;"
"如果聊天记录是从另一台设备复制过来的,当前设备通常无法获取原设备对应的密钥。"
)
def _derive_mac_key(raw_key: bytes, salt: bytes) -> bytes:
mac_salt = bytes(b ^ 0x3A for b in salt)
return hashlib.pbkdf2_hmac("sha512", raw_key, mac_salt, 2, dklen=KEY_SIZE)
def _derive_sqlcipher_enc_key(key_material: bytes, salt: bytes) -> bytes:
return hashlib.pbkdf2_hmac("sha512", key_material, salt, 256000, dklen=KEY_SIZE)
def _resolve_page1_key_material(key_material: bytes, page1: bytes) -> tuple[bytes, bytes, str] | None:
salt = page1[:SALT_SIZE]
stored_page1_hmac = page1[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
candidates = [
("raw_enc_key", key_material, _derive_mac_key(key_material, salt)),
]
derived_key = _derive_sqlcipher_enc_key(key_material, salt)
candidates.append(("sqlcipher_passphrase", derived_key, _derive_mac_key(derived_key, salt)))
for mode, enc_key, mac_key in candidates:
expected_page1_hmac = _compute_page_hmac(mac_key, page1, 1)
if stored_page1_hmac == expected_page1_hmac:
return enc_key, mac_key, mode
return None
def _compute_page_hmac(mac_key: bytes, page: bytes, page_num: int) -> bytes:
offset = SALT_SIZE if page_num == 1 else 0
data_end = PAGE_SIZE - RESERVE_SIZE + IV_SIZE
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(page[offset:data_end])
mac.update(page_num.to_bytes(4, "little"))
return mac.digest()
def _decrypt_page(raw_key: bytes, page: bytes, page_num: int) -> bytes:
iv = page[PAGE_SIZE - RESERVE_SIZE : PAGE_SIZE - RESERVE_SIZE + IV_SIZE]
offset = SALT_SIZE if page_num == 1 else 0
encrypted = page[offset : PAGE_SIZE - RESERVE_SIZE]
cipher = Cipher(
algorithms.AES(raw_key),
modes.CBC(iv),
backend=default_backend(),
)
decryptor = cipher.decryptor()
decrypted = decryptor.update(encrypted) + decryptor.finalize()
if page_num == 1:
return SQLITE_HEADER + decrypted + (b"\x00" * RESERVE_SIZE)
return decrypted + (b"\x00" * RESERVE_SIZE)
def _failure_matches_key_mismatch(detail: dict | None) -> bool:
if not isinstance(detail, dict):
return False
code = str(detail.get("code") or "").strip().lower()
reason = str(detail.get("reason") or "").strip()
if code == "key_mismatch":
return True
return ("密钥" in reason and "不匹配" in reason) or ("当前数据库密钥不正确" in reason)
def build_decrypt_result_message(
total_databases: int,
success_count: int,
failed_count: int,
failure_details: list[dict] | None = None,
) -> str:
total = max(int(total_databases or 0), 0)
success = max(int(success_count or 0), 0)
failed = max(int(failed_count or 0), 0)
details = list(failure_details or [])
if total == 0:
return "未找到可解密的数据库文件"
if failed == 0:
return f"解密完成: 成功 {success}/{total}"
key_mismatch_count = sum(1 for item in details if _failure_matches_key_mismatch(item))
if success == 0 and failed == total:
if key_mismatch_count == failed:
return (
f"解密失败:当前数据库密钥不正确,或该密钥不属于当前账号/当前设备(0/{total} 成功)。"
+ KEY_MISMATCH_GUIDANCE
)
return f"解密失败:0/{total} 个数据库解密成功,请检查密钥、账号与数据库路径是否匹配。"
if key_mismatch_count > 0:
return (
f"解密完成:成功 {success}/{total},失败 {failed}/{total}"
"失败文件中包含密钥不匹配的数据库,请确认使用的是当前账号在当前设备上的密钥。"
)
return f"解密完成:成功 {success}/{total},失败 {failed}/{total}"
def _normalize_account_name(name: str) -> str:
@@ -332,123 +221,153 @@ class WeChatDatabaseDecryptor:
self.key_bytes = bytes.fromhex(key_hex)
except ValueError:
raise ValueError("密钥必须是有效的十六进制字符串")
self.last_error_code = ""
self.last_error_message = ""
def _set_last_error(self, code: str, message: str) -> None:
self.last_error_code = str(code or "").strip()
self.last_error_message = str(message or "").strip()
def _clear_last_error(self) -> None:
self.last_error_code = ""
self.last_error_message = ""
def decrypt_database(self, db_path: str, output_path: str) -> bool:
"""解密微信4.x版本数据库
兼容两种输入形态:
- raw enc_key(部分内存扫描/工具直接返回)
- SQLCipher 口令/基础 key(需先用数据库 salt 做一轮 PBKDF2
使用SQLCipher 4.0参数:
- PBKDF2-SHA512, 256000轮迭代
- AES-256-CBC加密
- HMAC-SHA512验证
- 页面大小4096字节
"""
from .logging_config import get_logger
logger = get_logger(__name__)
logger.info(f"开始解密数据库: {db_path}")
tmp_output_path = ""
self._clear_last_error()
try:
file_size = os.path.getsize(db_path)
logger.info(f"读取文件大小: {file_size} bytes")
with open(db_path, 'rb') as f:
encrypted_data = f.read()
logger.info(f"读取文件大小: {len(encrypted_data)} bytes")
if file_size < PAGE_SIZE:
message = f"数据库文件小,无法解密: {db_path}"
self._set_last_error("file_too_small", message)
logger.warning(message)
return False
output_dir = Path(output_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
with open(db_path, "rb") as source:
page1 = source.read(PAGE_SIZE)
if len(page1) < PAGE_SIZE:
message = f"数据库首页大小不足,无法解密: {db_path}"
self._set_last_error("page_too_small", message)
logger.warning(message)
if len(encrypted_data) < 4096:
logger.warning(f"文件小,跳过解密: {db_path}")
return False
# 检查是否已经是解密的数据库
if page1.startswith(SQLITE_HEADER):
if encrypted_data.startswith(SQLITE_HEADER):
logger.info(f"文件已是SQLite格式,直接复制: {db_path}")
fd, tmp_output_path = tempfile.mkstemp(
prefix=f".{Path(output_path).name}.",
suffix=".tmp",
dir=str(output_dir),
)
os.close(fd)
with open(db_path, "rb") as src, open(tmp_output_path, "wb") as dst:
shutil.copyfileobj(src, dst, length=1024 * 1024)
os.replace(tmp_output_path, output_path)
tmp_output_path = ""
with open(output_path, 'wb') as f:
f.write(encrypted_data)
return True
resolved_key_material = _resolve_page1_key_material(self.key_bytes, page1)
if resolved_key_material is None:
message = f"当前数据库密钥不正确,或该密钥不属于当前账号/当前设备: {db_path}"
self._set_last_error("key_mismatch", message)
logger.error(f"页面 1 HMAC验证失败,密钥与数据库不匹配: {db_path}")
return False
enc_key, mac_key, key_mode = resolved_key_material
logger.info(f"页面 1 HMAC验证通过: mode={key_mode} path={db_path}")
total_pages = (file_size + PAGE_SIZE - 1) // PAGE_SIZE
successful_pages = 0
fd, tmp_output_path = tempfile.mkstemp(
prefix=f".{Path(output_path).name}.",
suffix=".tmp",
dir=str(output_dir),
# 提取salt (前16字节)
salt = encrypted_data[:16]
# 计算mac_salt (salt XOR 0x3a)
mac_salt = bytes(b ^ 0x3a for b in salt)
# 使用PBKDF2-SHA512派生密钥
kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=salt,
iterations=256000,
backend=default_backend()
)
os.close(fd)
with open(db_path, "rb") as source, open(tmp_output_path, "wb") as target:
for page_num in range(1, total_pages + 1):
page = source.read(PAGE_SIZE)
if not page:
break
if len(page) < PAGE_SIZE:
logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes,自动补齐到 {PAGE_SIZE} bytes")
page = page + (b"\x00" * (PAGE_SIZE - len(page)))
stored_hmac = page[PAGE_SIZE - HMAC_SIZE : PAGE_SIZE]
expected_hmac = _compute_page_hmac(mac_key, page, page_num)
if stored_hmac != expected_hmac:
message = f"数据库校验失败,文件可能损坏或密钥不匹配: {db_path}"
self._set_last_error("page_hmac_mismatch", message)
logger.error(f"页面 {page_num} HMAC验证失败,终止解密: {db_path}")
return False
target.write(_decrypt_page(enc_key, page, page_num))
derived_key = kdf.derive(self.key_bytes)
# 派生MAC密钥
mac_kdf = PBKDF2HMAC(
algorithm=hashes.SHA512(),
length=32,
salt=mac_salt,
iterations=2,
backend=default_backend()
)
mac_key = mac_kdf.derive(derived_key)
# 解密数据
decrypted_data = bytearray()
decrypted_data.extend(SQLITE_HEADER)
page_size = 4096
iv_size = 16
hmac_size = 64 # SHA512的HMAC是64字节
# 计算保留区域大小 (对齐到AES块大小)
reserve_size = iv_size + hmac_size
if reserve_size % 16 != 0:
reserve_size = ((reserve_size // 16) + 1) * 16
total_pages = len(encrypted_data) // page_size
successful_pages = 0
failed_pages = 0
# 逐页解密
for cur_page in range(total_pages):
start = cur_page * page_size
end = start + page_size
page = encrypted_data[start:end]
page_num = cur_page + 1 # 页面编号从1开始
if len(page) < page_size:
logger.warning(f"页面 {page_num} 大小不足: {len(page)} bytes")
break
# 确定偏移量:第一页(cur_page == 0)需要跳过salt
offset = 16 if cur_page == 0 else 0 # SALT_SIZE = 16
# 提取存储的HMAC
hmac_start = page_size - reserve_size + iv_size
hmac_end = hmac_start + hmac_size
stored_hmac = page[hmac_start:hmac_end]
# 按照wechat-dump-rs的方式验证HMAC
data_end = page_size - reserve_size + iv_size
hmac_data = page[offset:data_end]
# 分步计算HMAC:先更新数据,再更新页面编号
mac = hmac.new(mac_key, digestmod=hashlib.sha512)
mac.update(hmac_data) # 包含加密数据+IV
mac.update(page_num.to_bytes(4, 'little')) # 页面编号(小端序)
expected_hmac = mac.digest()
if stored_hmac != expected_hmac:
logger.warning(f"页面 {page_num} HMAC验证失败")
failed_pages += 1
continue
# 提取IV和加密数据用于AES解密
iv = page[page_size - reserve_size:page_size - reserve_size + iv_size]
encrypted_page = page[offset:page_size - reserve_size]
# AES-CBC解密
try:
cipher = Cipher(
algorithms.AES(derived_key),
modes.CBC(iv),
backend=default_backend()
)
decryptor = cipher.decryptor()
decrypted_page = decryptor.update(encrypted_page) + decryptor.finalize()
# 按照wechat-dump-rs的方式重组页面数据
decrypted_data.extend(decrypted_page)
decrypted_data.extend(page[page_size - reserve_size:]) # 保留区域
successful_pages += 1
except Exception as e:
logger.error(f"页面 {page_num} AES解密失败: {e}")
failed_pages += 1
continue
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 0")
os.replace(tmp_output_path, output_path)
tmp_output_path = ""
logger.info(f"解密文件大小: {os.path.getsize(output_path)} bytes")
self._clear_last_error()
logger.info(f"解密完成: 成功 {successful_pages} 页, 失败 {failed_pages}")
# 写入解密后的文件
with open(output_path, 'wb') as f:
f.write(decrypted_data)
logger.info(f"解密文件大小: {len(decrypted_data)} bytes")
return True
except Exception as e:
self._set_last_error("exception", f"解密过程中发生异常: {e}")
logger.error(f"解密失败: {db_path}, 错误: {e}")
return False
finally:
if tmp_output_path:
try:
os.remove(tmp_output_path)
except OSError:
pass
def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> dict:
"""
@@ -573,7 +492,6 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
success_count = 0
processed_files = []
failed_files = []
failure_details = []
account_results = {}
for account_name, databases in account_databases.items():
@@ -605,7 +523,6 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
account_success = 0
account_processed = []
account_failed = []
account_failure_details = []
for db_info in databases:
db_path = db_info['path']
@@ -625,16 +542,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
else:
account_failed.append(db_path)
failed_files.append(db_path)
failure_detail = {
"account": account_name,
"file": db_path,
"name": db_name,
"code": str(decryptor.last_error_code or "").strip(),
"reason": str(decryptor.last_error_message or "").strip() or "解密失败",
}
account_failure_details.append(failure_detail)
failure_details.append(failure_detail)
logger.error(f"解密失败: {account_name}/{db_name} reason={failure_detail['reason']}")
logger.error(f"解密失败: {account_name}/{db_name}")
# 记录账号解密结果
account_results[account_name] = {
@@ -643,8 +551,7 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
"failed": len(databases) - account_success,
"output_dir": str(account_output_dir),
"processed_files": account_processed,
"failed_files": account_failed,
"failure_details": account_failure_details,
"failed_files": account_failed
}
# 构建“会话最后一条消息”缓存表:把耗时挪到解密阶段,后续会话列表直接查表
@@ -668,23 +575,15 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
logger.info(f"账号 {account_name} 解密完成: 成功 {account_success}/{len(databases)}")
# 返回结果
failed_count = total_databases - success_count
message = build_decrypt_result_message(
total_databases=total_databases,
success_count=success_count,
failed_count=failed_count,
failure_details=failure_details,
)
result = {
"status": "success" if success_count > 0 else "error",
"message": message,
"message": f"解密完成: 成功 {success_count}/{total_databases}",
"total_databases": total_databases,
"successful_count": success_count,
"failed_count": failed_count,
"failed_count": total_databases - success_count,
"output_directory": str(base_output_dir.absolute()),
"processed_files": processed_files,
"failed_files": failed_files,
"failure_details": failure_details,
"account_results": account_results, # 新增:按账号的详细结果
"detected_accounts": detected_accounts,
}
@@ -692,9 +591,8 @@ def decrypt_wechat_databases(db_storage_path: str = None, key: str = None) -> di
logger.info("=" * 60)
logger.info("解密任务完成!")
logger.info(f"成功: {success_count}/{total_databases}")
logger.info(f"失败: {failed_count}/{total_databases}")
logger.info(f"失败: {total_databases - success_count}/{total_databases}")
logger.info(f"输出目录: {base_output_dir.absolute()}")
logger.info(f"结果说明: {message}")
logger.info("=" * 60)
return result
+119
View File
@@ -112,6 +112,125 @@ class TestChatRealtimeName2IdSync(unittest.TestCase):
],
)
def test_sync_still_inserts_new_messages_when_name2id_is_up_to_date(self):
with TemporaryDirectory() as td:
account_dir = Path(td) / "acc"
account_dir.mkdir(parents=True, exist_ok=True)
username = "wxid_friend"
table_name = f"Msg_{hashlib.md5(username.encode('utf-8')).hexdigest()}"
msg_db_path = account_dir / "message_0.db"
conn = sqlite3.connect(str(msg_db_path))
try:
conn.execute("CREATE TABLE Name2Id (user_name TEXT, is_session INTEGER DEFAULT 1)")
conn.execute(
"""
CREATE TABLE "{table_name}" (
local_id INTEGER PRIMARY KEY,
server_id INTEGER,
local_type INTEGER,
sort_seq INTEGER,
real_sender_id INTEGER,
create_time INTEGER,
message_content TEXT,
compress_content BLOB,
packed_info_data BLOB
)
""".format(table_name=table_name)
)
conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (1, ?, 1)", ("acc",))
conn.execute("INSERT INTO Name2Id(rowid, user_name, is_session) VALUES (2, ?, 1)", (username,))
conn.execute(
f'INSERT INTO "{table_name}" '
"(local_id, server_id, local_type, sort_seq, real_sender_id, create_time, message_content, compress_content, packed_info_data) "
"VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
(10, 10010, 1, 10, 2, 1710000010, "old", None, None),
)
conn.commit()
finally:
conn.close()
session_conn = sqlite3.connect(str(account_dir / "session.db"))
try:
session_conn.execute(
"""
CREATE TABLE SessionTable (
username TEXT PRIMARY KEY,
summary TEXT DEFAULT '',
last_timestamp INTEGER DEFAULT 0,
sort_timestamp INTEGER DEFAULT 0,
last_msg_locald_id INTEGER DEFAULT 0,
last_msg_type INTEGER DEFAULT 0,
last_msg_sub_type INTEGER DEFAULT 0,
last_msg_sender TEXT DEFAULT ''
)
"""
)
session_conn.commit()
finally:
session_conn.close()
def _fake_exec_query(_handle, *, kind, path, sql):
self.assertEqual(kind, "message")
self.assertTrue(str(path).endswith("message_0.db"))
if "COUNT(1)" in sql:
return [{"c": 2, "mx": 2}]
raise AssertionError(f"Unexpected SQL: {sql}")
live_messages = [
{
"local_id": 11,
"server_id": 10011,
"local_type": 1,
"sort_seq": 11,
"real_sender_id": 2,
"create_time": 1710000011,
"message_content": "new message",
"compress_content": None,
"sender_username": username,
}
]
with (
patch.object(
chat_router,
"_resolve_db_storage_message_paths",
return_value=(Path(td) / "live_message_0.db", Path(td) / "message_resource.db"),
),
patch.object(chat_router, "_wcdb_exec_query", side_effect=_fake_exec_query),
patch.object(chat_router, "_wcdb_get_messages", side_effect=[list(live_messages)]),
patch.object(chat_router, "_best_effort_upsert_output_name2id_rows") as mock_upsert_name2id,
):
result = chat_router._sync_chat_realtime_messages_for_table(
account_dir=account_dir,
rt_conn=_DummyConn(),
username=username,
msg_db_path=msg_db_path,
table_name=table_name,
max_scan=50,
backfill_limit=0,
)
self.assertEqual(result.get("inserted"), 1)
mock_upsert_name2id.assert_not_called()
conn = sqlite3.connect(str(msg_db_path))
try:
rows = conn.execute(
f'SELECT local_id, server_id, real_sender_id, create_time, message_content FROM "{table_name}" ORDER BY local_id ASC'
).fetchall()
finally:
conn.close()
self.assertEqual(
rows,
[
(10, 10010, 2, 1710000010, "old"),
(11, 10011, 2, 1710000011, "new message"),
],
)
if __name__ == "__main__":
unittest.main()
-100
View File
@@ -3,44 +3,14 @@ import os
import sys
import unittest
import importlib
import hashlib
import hmac
from pathlib import Path
from tempfile import TemporaryDirectory
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
def _encrypt_page(raw_key: bytes, plain_page: bytes, page_num: int, salt: bytes, iv: bytes) -> bytes:
from wechat_decrypt_tool.wechat_decrypt import PAGE_SIZE, RESERVE_SIZE, SALT_SIZE, _derive_mac_key
if page_num == 1:
encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE]
prefix = salt
else:
encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE]
prefix = b""
cipher = Cipher(
algorithms.AES(raw_key),
modes.CBC(iv),
backend=default_backend(),
)
encryptor = cipher.encryptor()
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
page_without_hmac = prefix + encrypted + iv
mac = hmac.new(_derive_mac_key(raw_key, salt), digestmod=hashlib.sha512)
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :])
mac.update(page_num.to_bytes(4, "little"))
return page_without_hmac + mac.digest()
class TestDecryptStreamSSE(unittest.TestCase):
def test_decrypt_stream_reports_progress(self):
from fastapi import FastAPI
@@ -115,76 +85,6 @@ class TestDecryptStreamSSE(unittest.TestCase):
else:
os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = prev_build_cache
def test_decrypt_stream_reports_key_scope_error_for_wrong_key(self):
from fastapi import FastAPI
from fastapi.testclient import TestClient
from wechat_decrypt_tool.wechat_decrypt import PAGE_SIZE, RESERVE_SIZE, SQLITE_HEADER
good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
bad_key = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"
salt = bytes.fromhex("11223344556677889900aabbccddeeff")
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
plain_page = SQLITE_HEADER + (b"A" * (PAGE_SIZE - RESERVE_SIZE - len(SQLITE_HEADER))) + (b"\x00" * RESERVE_SIZE)
encrypted_db = _encrypt_page(good_key, plain_page, 1, salt, iv1)
with TemporaryDirectory() as td:
root = Path(td)
prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR")
prev_build_cache = os.environ.get("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = "0"
import wechat_decrypt_tool.app_paths as app_paths
import wechat_decrypt_tool.routers.decrypt as decrypt_router
importlib.reload(app_paths)
importlib.reload(decrypt_router)
db_storage = root / "xwechat_files" / "wxid_wrong_key_user" / "db_storage"
db_storage.mkdir(parents=True, exist_ok=True)
(db_storage / "MSG0.db").write_bytes(encrypted_db)
app = FastAPI()
app.include_router(decrypt_router.router)
client = TestClient(app)
events: list[dict] = []
with client.stream(
"GET",
"/api/decrypt_stream",
params={"key": bad_key, "db_storage_path": str(db_storage)},
) as resp:
self.assertEqual(resp.status_code, 200)
for line in resp.iter_lines():
if not line:
continue
if isinstance(line, bytes):
line = line.decode("utf-8", errors="ignore")
line = str(line)
if line.startswith(":") or not line.startswith("data: "):
continue
payload = json.loads(line[len("data: ") :])
events.append(payload)
if payload.get("type") in {"complete", "error"}:
break
self.assertEqual(events[-1].get("type"), "complete")
self.assertEqual(events[-1].get("status"), "failed")
self.assertIn("当前数据库密钥不正确", events[-1].get("message", ""))
self.assertIn("另一台设备复制", events[-1].get("message", ""))
finally:
if prev_data_dir is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data_dir
if prev_build_cache is None:
os.environ.pop("WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE", None)
else:
os.environ["WECHAT_TOOL_BUILD_SESSION_LAST_MESSAGE"] = prev_build_cache
if __name__ == "__main__":
unittest.main()
+30
View File
@@ -118,6 +118,36 @@ class TestParseAppMessage(unittest.TestCase):
self.assertEqual(parsed.get("linkType"), "official_article")
self.assertEqual(parsed.get("linkStyle"), "cover")
def test_finder_type_51_uses_nested_desc_and_cover(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
'<title>当前版本不支持展示该内容,请升级至最新版本。</title>'
'<des></des>'
'<type>51</type>'
'<url></url>'
'<finderFeed>'
'<nickname><![CDATA[央视新闻]]></nickname>'
'<username><![CDATA[finder_cctv_news]]></username>'
'<desc><![CDATA[微信视频号全金融行业今公布发布]]></desc>'
'<mediaList><media>'
'<coverUrl><![CDATA[https://finder.video.qq.com/cover.jpg]]></coverUrl>'
'<url><![CDATA[https://channels.weixin.qq.com/web/pages/feed?feedid=abc]]></url>'
'</media></mediaList>'
'</finderFeed>'
'</appmsg></msg>'
)
parsed = _parse_app_message(raw_text)
self.assertEqual(parsed.get("renderType"), "link")
self.assertEqual(parsed.get("linkType"), "finder")
self.assertEqual(parsed.get("title"), "微信视频号全金融行业今公布发布")
self.assertEqual(parsed.get("content"), "微信视频号全金融行业今公布发布")
self.assertEqual(parsed.get("from"), "央视新闻")
self.assertEqual(parsed.get("fromUsername"), "finder_cctv_news")
self.assertEqual(parsed.get("thumbUrl"), "https://finder.video.qq.com/cover.jpg")
self.assertEqual(parsed.get("url"), "https://channels.weixin.qq.com/web/pages/feed?feedid=abc")
def test_quote_type_5_nested_xml_refermsg_uses_inner_title(self):
raw_text = (
'<msg><appmsg appid="" sdkver="0">'
-172
View File
@@ -1,172 +0,0 @@
import hashlib
import hmac
import os
import sys
import tempfile
import unittest
from pathlib import Path
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
ROOT = Path(__file__).resolve().parents[1]
sys.path.insert(0, str(ROOT / "src"))
from wechat_decrypt_tool.wechat_decrypt import (
PAGE_SIZE,
RESERVE_SIZE,
SALT_SIZE,
SQLITE_HEADER,
WeChatDatabaseDecryptor,
_derive_mac_key,
_derive_sqlcipher_enc_key,
decrypt_wechat_databases,
)
def _encrypt_page(
raw_key: bytes,
plain_page: bytes,
page_num: int,
salt: bytes,
iv: bytes,
*,
sqlcipher_passphrase: bool = False,
) -> bytes:
enc_key = _derive_sqlcipher_enc_key(raw_key, salt) if sqlcipher_passphrase else raw_key
if page_num == 1:
encrypted_input = plain_page[SALT_SIZE : PAGE_SIZE - RESERVE_SIZE]
prefix = salt
else:
encrypted_input = plain_page[: PAGE_SIZE - RESERVE_SIZE]
prefix = b""
cipher = Cipher(
algorithms.AES(enc_key),
modes.CBC(iv),
backend=default_backend(),
)
encryptor = cipher.encryptor()
encrypted = encryptor.update(encrypted_input) + encryptor.finalize()
page_without_hmac = prefix + encrypted + iv
mac = hmac.new(_derive_mac_key(enc_key, salt), digestmod=hashlib.sha512)
mac.update(page_without_hmac[SALT_SIZE if page_num == 1 else 0 :])
mac.update(page_num.to_bytes(4, "little"))
return page_without_hmac + mac.digest()
def _build_plain_page(body_byte: int, *, first_page: bool) -> bytes:
if first_page:
payload = SQLITE_HEADER + bytes([body_byte]) * (PAGE_SIZE - RESERVE_SIZE - len(SQLITE_HEADER))
else:
payload = bytes([body_byte]) * (PAGE_SIZE - RESERVE_SIZE)
return payload + (b"\x00" * RESERVE_SIZE)
class WeChatDecryptRawKeyTests(unittest.TestCase):
def test_decrypt_database_uses_raw_enc_key(self):
raw_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
salt = bytes.fromhex("11223344556677889900aabbccddeeff")
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20")
page1 = _build_plain_page(0x41, first_page=True)
page2 = _build_plain_page(0x42, first_page=False)
encrypted_db = _encrypt_page(raw_key, page1, 1, salt, iv1) + _encrypt_page(raw_key, page2, 2, salt, iv2)
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
decryptor = WeChatDatabaseDecryptor(raw_key.hex())
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
self.assertEqual(dst.read_bytes(), page1 + page2)
def test_decrypt_database_falls_back_to_sqlcipher_passphrase_mode(self):
passphrase_key = bytes.fromhex("9f5dd0d3b6d0477ea5045c9e380ee272e53927993eb548dd98a022e842d5f7bd")
salt = bytes.fromhex("50f4090ef6897e146f94109f13743e34")
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
iv2 = bytes.fromhex("1112131415161718191a1b1c1d1e1f20")
page1 = _build_plain_page(0x41, first_page=True)
page2 = _build_plain_page(0x42, first_page=False)
encrypted_db = _encrypt_page(
passphrase_key,
page1,
1,
salt,
iv1,
sqlcipher_passphrase=True,
) + _encrypt_page(
passphrase_key,
page2,
2,
salt,
iv2,
sqlcipher_passphrase=True,
)
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
decryptor = WeChatDatabaseDecryptor(passphrase_key.hex())
self.assertTrue(decryptor.decrypt_database(str(src), str(dst)))
self.assertEqual(dst.read_bytes(), page1 + page2)
def test_decrypt_database_keeps_existing_output_on_hmac_failure(self):
good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"
salt = bytes.fromhex("11223344556677889900aabbccddeeff")
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
page1 = _build_plain_page(0x41, first_page=True)
encrypted_db = _encrypt_page(good_key, page1, 1, salt, iv1)
with tempfile.TemporaryDirectory() as tmpdir:
src = Path(tmpdir) / "source.db"
dst = Path(tmpdir) / "out.db"
src.write_bytes(encrypted_db)
dst.write_bytes(b"keep-existing-output")
decryptor = WeChatDatabaseDecryptor(bad_key_hex)
self.assertFalse(decryptor.decrypt_database(str(src), str(dst)))
self.assertEqual(dst.read_bytes(), b"keep-existing-output")
def test_decrypt_wechat_databases_reports_key_scope_message(self):
good_key = bytes.fromhex("00112233445566778899aabbccddeefffedcba98765432100123456789abcdef")
bad_key_hex = "ffeeddccbbaa998877665544332211000123456789abcdeffedcba9876543210"
salt = bytes.fromhex("11223344556677889900aabbccddeeff")
iv1 = bytes.fromhex("0102030405060708090a0b0c0d0e0f10")
page1 = _build_plain_page(0x41, first_page=True)
encrypted_db = _encrypt_page(good_key, page1, 1, salt, iv1)
with tempfile.TemporaryDirectory() as tmpdir:
root = Path(tmpdir)
db_storage = root / "xwechat_files" / "wxid_scope_user" / "db_storage"
db_storage.mkdir(parents=True, exist_ok=True)
(db_storage / "MSG0.db").write_bytes(encrypted_db)
prev_data_dir = os.environ.get("WECHAT_TOOL_DATA_DIR")
try:
os.environ["WECHAT_TOOL_DATA_DIR"] = str(root)
result = decrypt_wechat_databases(str(db_storage), bad_key_hex)
finally:
if prev_data_dir is None:
os.environ.pop("WECHAT_TOOL_DATA_DIR", None)
else:
os.environ["WECHAT_TOOL_DATA_DIR"] = prev_data_dir
self.assertEqual(result["status"], "error")
self.assertIn("当前数据库密钥不正确", result["message"])
self.assertIn("账号/当前设备", result["message"])
self.assertIn("另一台设备复制", result["message"])
if __name__ == "__main__":
unittest.main()