feat(media): 添加图片资源解密与SSE进度

- 后端新增图片解密密钥获取/保存接口(/api/media/keys)

- 新增批量解密接口与输出结构:resource/{md5前2位}/{md5}.{ext}

- 新增资源直读接口(/api/media/resource/{md5}),自动识别媒体类型返回

- 新增SSE实时进度接口(/api/media/decrypt_all_stream),前端可实时展示进度

- 前端解密页增加图片解密步骤与进度条/统计/失败说明,并对接相关API

- README 补充图片资源解密使用说明
This commit is contained in:
2977094657
2025-12-17 16:56:54 +08:00
parent 58f3c6862d
commit 8341c3159e
5 changed files with 1283 additions and 39 deletions

View File

@@ -34,8 +34,9 @@ except Exception:
from fastapi import FastAPI, HTTPException, Request
from fastapi.routing import APIRoute
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import Response, FileResponse
from fastapi.responses import Response, FileResponse, StreamingResponse
from pydantic import BaseModel, Field
import asyncio
from .logging_config import setup_logging, get_logger
from .wechat_decrypt import decrypt_wechat_databases
@@ -494,7 +495,7 @@ def _resolve_media_path_from_hardlink(
@lru_cache(maxsize=4096)
def _fallback_search_media_by_md5(weixin_root_str: str, md5: str) -> Optional[str]:
def _fallback_search_media_by_md5(weixin_root_str: str, md5: str, kind: str = "") -> Optional[str]:
if not weixin_root_str or not md5:
return None
try:
@@ -502,26 +503,40 @@ def _fallback_search_media_by_md5(weixin_root_str: str, md5: str) -> Optional[st
except Exception:
return None
search_dirs = [
root / "msg" / "attach",
root / "msg" / "file",
root / "msg" / "video",
root / "cache",
]
# 优先顺序: _h.dat (高清) > _t.dat (缩略图) > 普通 .dat > 其他格式
# 因为基础 .dat 可能是 wxgf 容器格式,而 _h.dat/_t.dat 是真正的图片
patterns = [
f"{md5}_h.dat", # 高清图优先
f"{md5}_t.dat", # 缩略图次之
f"{md5}.dat", # 基础 dat
f"{md5}*.dat", # 其他 dat 变体
f"{md5}*.jpg",
f"{md5}*.jpeg",
f"{md5}*.png",
f"{md5}*.gif",
f"{md5}*.webp",
f"{md5}*.mp4",
]
kind_key = str(kind or "").lower().strip()
# 根据类型选择搜索目录
if kind_key == "file":
search_dirs = [root / "msg" / "file"]
else:
search_dirs = [
root / "msg" / "attach",
root / "msg" / "file",
root / "msg" / "video",
root / "cache",
]
# 根据类型选择搜索模式
if kind_key == "file":
# 文件类型搜索所有包含md5的文件
patterns = [
f"*{md5}*", # 任何包含md5的文件
]
else:
# 优先顺序: _h.dat (高清) > _t.dat (缩略图) > 普通 .dat > 其他格式
# 因为基础 .dat 可能是 wxgf 容器格式,而 _h.dat/_t.dat 是真正的图片
patterns = [
f"{md5}_h.dat", # 高清图优先
f"{md5}_t.dat", # 缩略图次之
f"{md5}.dat", # 基础 dat
f"{md5}*.dat", # 其他 dat 变体
f"{md5}*.jpg",
f"{md5}*.jpeg",
f"{md5}*.png",
f"{md5}*.gif",
f"{md5}*.webp",
f"{md5}*.mp4",
]
for d in search_dirs:
try:
@@ -882,6 +897,149 @@ def _load_media_keys(account_dir: Path) -> dict[str, Any]:
return {}
# ===================== 解密资源目录相关辅助函数 =====================
def _get_resource_dir(account_dir: Path) -> Path:
"""获取解密资源输出目录"""
return account_dir / "resource"
def _get_decrypted_resource_path(account_dir: Path, md5: str, ext: str = "") -> Path:
"""根据MD5获取解密后资源的路径"""
resource_dir = _get_resource_dir(account_dir)
# 使用MD5前2位作为子目录避免单目录文件过多
sub_dir = md5[:2] if len(md5) >= 2 else "00"
if ext:
return resource_dir / sub_dir / f"{md5}.{ext}"
return resource_dir / sub_dir / md5
def _detect_image_extension(data: bytes) -> str:
"""根据图片数据检测文件扩展名"""
if not data:
return "dat"
if data.startswith(b"\x89PNG\r\n\x1a\n"):
return "png"
if data.startswith(b"\xff\xd8\xff"):
return "jpg"
if data.startswith(b"GIF87a") or data.startswith(b"GIF89a"):
return "gif"
if data.startswith(b"RIFF") and len(data) >= 12 and data[8:12] == b"WEBP":
return "webp"
return "dat"
def _try_find_decrypted_resource(account_dir: Path, md5: str) -> Optional[Path]:
"""尝试在解密资源目录中查找已解密的资源"""
if not md5:
return None
resource_dir = _get_resource_dir(account_dir)
if not resource_dir.exists():
return None
sub_dir = md5[:2] if len(md5) >= 2 else "00"
target_dir = resource_dir / sub_dir
if not target_dir.exists():
return None
# 查找匹配MD5的文件可能有不同扩展名
for ext in ["jpg", "png", "gif", "webp", "dat"]:
p = target_dir / f"{md5}.{ext}"
if p.exists():
return p
return None
def _collect_all_dat_files(wxid_dir: Path) -> list[tuple[Path, str]]:
"""收集所有需要解密的.dat文件返回 (文件路径, md5) 列表"""
results: list[tuple[Path, str]] = []
if not wxid_dir or not wxid_dir.exists():
return results
# 搜索目录
search_dirs = [
wxid_dir / "msg" / "attach",
wxid_dir / "cache",
]
for search_dir in search_dirs:
if not search_dir.exists():
continue
try:
for dat_file in search_dir.rglob("*.dat"):
if not dat_file.is_file():
continue
# 从文件名提取MD5
stem = dat_file.stem
# 文件名格式可能是: md5.dat, md5_t.dat, md5_h.dat 等
md5 = stem.split("_")[0] if "_" in stem else stem
# 验证是否是有效的MD532位十六进制
if len(md5) == 32 and all(c in "0123456789abcdefABCDEF" for c in md5):
results.append((dat_file, md5.lower()))
except Exception as e:
logger.warning(f"扫描目录失败 {search_dir}: {e}")
return results
def _decrypt_and_save_resource(
dat_path: Path,
md5: str,
account_dir: Path,
xor_key: int,
aes_key: Optional[bytes],
) -> tuple[bool, str]:
"""解密单个资源文件并保存到resource目录
Returns:
(success, message)
"""
try:
data = dat_path.read_bytes()
if not data:
return False, "文件为空"
version = _detect_wechat_dat_version(data)
decrypted: Optional[bytes] = None
if version == 0:
# V3: 纯XOR解密
decrypted = _decrypt_wechat_dat_v3(data, xor_key)
elif version == 1:
# V4-V1: 使用固定AES密钥
decrypted = _decrypt_wechat_dat_v4(data, xor_key, b"cfcd208495d565ef")
elif version == 2:
# V4-V2: 需要动态AES密钥
if aes_key and len(aes_key) >= 16:
decrypted = _decrypt_wechat_dat_v4(data, xor_key, aes_key[:16])
else:
return False, "V4-V2版本需要AES密钥"
else:
# 尝试简单XOR解密
dec, mt = _try_xor_decrypt_by_magic(data)
if dec:
decrypted = dec
else:
return False, f"未知加密版本: {version}"
if not decrypted:
return False, "解密结果为空"
# 检测图片类型
ext = _detect_image_extension(decrypted)
mt = _detect_image_media_type(decrypted[:32])
if mt == "application/octet-stream":
# 解密可能失败,跳过
return False, "解密后非有效图片"
# 保存到resource目录
output_path = _get_decrypted_resource_path(account_dir, md5, ext)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(decrypted)
return True, str(output_path)
except Exception as e:
return False, str(e)
def _read_and_maybe_decrypt_media(path: Path, account_dir: Optional[Path] = None, weixin_root: Optional[Path] = None) -> tuple[bytes, str]:
# Fast path: already a normal image
with open(path, "rb") as f:
@@ -1263,14 +1421,14 @@ def _infer_transfer_status_text(
if rs == "1":
return "已收款"
if rs == "2":
return "已退"
return "已退"
if rs == "3":
return "已过期"
if t == "4":
return "已退"
return "已退"
if t == "9":
return "已被退"
return "已被退"
if t == "10":
return "已过期"
@@ -1278,7 +1436,10 @@ def _infer_transfer_status_text(
if t == "8":
return "发起转账"
if t == "3":
return "接收转账"
# paysubtype=3 表示收款方已收款
# 对于发起方isSent=True显示"已收款",表示对方已收
# 对于收款方isSent=False显示"已被接收",表示自己收到转账
return "已收款" if is_sent else "已被接收"
if t == "1":
return "转账"
@@ -1451,6 +1612,13 @@ def _parse_app_message(text: str) -> dict[str, Any]:
receivertitle = _extract_xml_tag_or_attr(text, "receivertitle")
senderdes = _extract_xml_tag_or_attr(text, "senderdes")
receiverdes = _extract_xml_tag_or_attr(text, "receiverdes")
transferid = _extract_xml_tag_or_attr(text, "transferid")
invalidtime = _extract_xml_tag_or_attr(text, "invalidtime")
# 调试日志:记录转账消息的关键字段
logger.debug(
f"[转账解析] paysubtype={paysubtype}, receivestatus={receivestatus}, "
f"transferid={transferid}, feedesc={feedesc}"
)
return {
"renderType": "transfer",
"content": (pay_memo or "").strip(),
@@ -1462,6 +1630,8 @@ def _parse_app_message(text: str) -> dict[str, Any]:
"receiverTitle": receivertitle or "",
"senderDes": senderdes or "",
"receiverDes": receiverdes or "",
"transferId": str(transferid or "").strip(),
"invalidTime": str(invalidtime or "").strip(),
}
if app_type in (2001, 2003) or "<wcpayinfo" in text and ("redenvelope" in text.lower() or "sendertitle" in text.lower()):
@@ -1826,6 +1996,15 @@ async def get_chat_image(md5: str, account: Optional[str] = None, username: Opti
if not md5:
raise HTTPException(status_code=400, detail="Missing md5.")
account_dir = _resolve_account_dir(account)
# 优先从解密资源目录读取(更快)
decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower())
if decrypted_path:
data = decrypted_path.read_bytes()
media_type = _detect_image_media_type(data[:32])
return Response(content=data, media_type=media_type)
# 回退到原始逻辑:从微信数据目录实时解密
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
extra_roots: list[Path] = []
@@ -1868,6 +2047,15 @@ async def get_chat_emoji(md5: str, account: Optional[str] = None, username: Opti
if not md5:
raise HTTPException(status_code=400, detail="Missing md5.")
account_dir = _resolve_account_dir(account)
# 优先从解密资源目录读取(更快)
decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower())
if decrypted_path:
data = decrypted_path.read_bytes()
media_type = _detect_image_media_type(data[:32])
return Response(content=data, media_type=media_type)
# 回退到原始逻辑
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
extra_roots: list[Path] = []
@@ -1906,6 +2094,15 @@ async def get_chat_video_thumb(md5: str, account: Optional[str] = None, username
if not md5:
raise HTTPException(status_code=400, detail="Missing md5.")
account_dir = _resolve_account_dir(account)
# 优先从解密资源目录读取(更快)
decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower())
if decrypted_path:
data = decrypted_path.read_bytes()
media_type = _detect_image_media_type(data[:32])
return Response(content=data, media_type=media_type)
# 回退到原始逻辑
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
extra_roots: list[Path] = []
@@ -2067,6 +2264,17 @@ def _resolve_media_path_for_kind(
) -> Optional[Path]:
if not md5:
return None
kind_key = str(kind or "").strip().lower()
# 优先查找解密后的资源目录(图片、表情、视频缩略图)
if kind_key in {"image", "emoji", "video_thumb"}:
decrypted_path = _try_find_decrypted_resource(account_dir, md5.lower())
if decrypted_path:
logger.debug(f"找到解密资源: {decrypted_path}")
return decrypted_path
# 回退到原始逻辑:从微信数据目录查找
wxid_dir = _resolve_account_wxid_dir(account_dir)
hardlink_db_path = account_dir / "hardlink.db"
db_storage_dir = _resolve_account_db_storage_dir(account_dir)
@@ -2092,7 +2300,7 @@ def _resolve_media_path_for_kind(
extra_roots=roots[1:],
)
if (not p) and wxid_dir:
hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5))
hit = _fallback_search_media_by_md5(str(wxid_dir), str(md5), kind=kind_key)
if hit:
p = Path(hit)
return p
@@ -2614,6 +2822,7 @@ async def list_chat_messages(
pay_sub_type = ""
transfer_status = ""
file_md5 = ""
transfer_id = ""
if local_type == 10000:
render_type = "system"
@@ -2636,8 +2845,12 @@ async def list_chat_messages(
file_size = str(parsed.get("size") or "")
pay_sub_type = str(parsed.get("paySubType") or "")
file_md5 = str(parsed.get("fileMd5") or "")
transfer_id = str(parsed.get("transferId") or "")
if render_type == "transfer":
# 直接从原始 XML 提取 transferid可能在 wcpayinfo 内)
if not transfer_id:
transfer_id = _extract_xml_tag_or_attr(raw_text, "transferid") or ""
transfer_status = _infer_transfer_status_text(
is_sent=is_sent,
paysubtype=pay_sub_type,
@@ -2743,8 +2956,12 @@ async def list_chat_messages(
file_size = str(parsed.get("size") or file_size)
pay_sub_type = str(parsed.get("paySubType") or pay_sub_type)
file_md5 = str(parsed.get("fileMd5") or file_md5)
transfer_id = str(parsed.get("transferId") or transfer_id)
if render_type == "transfer":
# 如果 transferId 仍为空,尝试从原始 XML 提取
if not transfer_id:
transfer_id = _extract_xml_tag_or_attr(content_text, "transferid") or ""
transfer_status = _infer_transfer_status_text(
is_sent=is_sent,
paysubtype=pay_sub_type,
@@ -2795,6 +3012,7 @@ async def list_chat_messages(
"fileMd5": file_md5,
"paySubType": pay_sub_type,
"transferStatus": transfer_status,
"transferId": transfer_id,
"_rawText": raw_text if local_type == 266287972401 else "",
}
)
@@ -2807,6 +3025,76 @@ async def list_chat_messages(
except Exception:
pass
# 后处理:关联转账消息的最终状态
# 策略:优先使用 transferId 精确匹配,回退到金额+时间窗口匹配
# paysubtype 含义1=不明确 3=已收款 4=对方退回给你 8=发起转账 9=被对方退回 10=已过期
# 收集已退还和已收款的转账ID和金额
returned_transfer_ids: set[str] = set() # 退还状态的 transferId
received_transfer_ids: set[str] = set() # 已收款状态的 transferId
returned_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于退还回退匹配
received_amounts_with_time: list[tuple[str, int]] = [] # (金额, 时间戳) 用于收款回退匹配
for m in merged:
if m.get("renderType") == "transfer":
pst = str(m.get("paySubType") or "")
tid = str(m.get("transferId") or "").strip()
amt = str(m.get("amount") or "")
ts = int(m.get("createTime") or 0)
if pst in ("4", "9"): # 退还状态
if tid:
returned_transfer_ids.add(tid)
if amt:
returned_amounts_with_time.append((amt, ts))
elif pst == "3": # 已收款状态
if tid:
received_transfer_ids.add(tid)
if amt:
received_amounts_with_time.append((amt, ts))
# 更新原始转账消息的状态
for m in merged:
if m.get("renderType") == "transfer":
pst = str(m.get("paySubType") or "")
# 只更新未确定状态的原始转账消息paysubtype=1 或 8
if pst in ("1", "8"):
tid = str(m.get("transferId") or "").strip()
amt = str(m.get("amount") or "")
ts = int(m.get("createTime") or 0)
# 优先检查退还状态(退还优先于收款)
should_mark_returned = False
should_mark_received = False
# 策略1精确 transferId 匹配
if tid:
if tid in returned_transfer_ids:
should_mark_returned = True
elif tid in received_transfer_ids:
should_mark_received = True
# 策略2回退到金额+时间窗口匹配24小时内同金额
if not should_mark_returned and not should_mark_received and amt:
for ret_amt, ret_ts in returned_amounts_with_time:
if ret_amt == amt and abs(ret_ts - ts) <= 86400:
should_mark_returned = True
break
if not should_mark_returned:
for rec_amt, rec_ts in received_amounts_with_time:
if rec_amt == amt and abs(rec_ts - ts) <= 86400:
should_mark_received = True
break
if should_mark_returned:
m["paySubType"] = "9"
m["transferStatus"] = "已被退还"
elif should_mark_received:
m["paySubType"] = "3"
# 根据 isSent 判断:发起方显示"已收款",收款方显示"已被接收"
is_sent = m.get("isSent", False)
m["transferStatus"] = "已收款" if is_sent else "已被接收"
uniq_senders = list(dict.fromkeys([u for u in (sender_usernames + list(pat_usernames)) if u]))
sender_contact_rows = _load_contact_rows(contact_db_path, uniq_senders)
local_sender_avatars = _query_head_image_usernames(head_image_db_path, uniq_senders)
@@ -2891,6 +3179,426 @@ async def list_chat_messages(
}
# ===================== 图片密钥与资源解密相关 API =====================
class MediaKeysRequest(BaseModel):
"""媒体密钥请求模型"""
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
force_extract: bool = Field(False, description="是否强制从微信进程重新提取密钥")
class MediaDecryptRequest(BaseModel):
"""媒体解密请求模型"""
account: Optional[str] = Field(None, description="账号目录名(可选,默认使用第一个)")
xor_key: Optional[str] = Field(None, description="XOR密钥十六进制如 0xA5 或 A5")
aes_key: Optional[str] = Field(None, description="AES密钥16字符ASCII字符串")
@app.get("/api/media/keys", summary="获取图片解密密钥")
async def get_media_keys(account: Optional[str] = None, force_extract: bool = False):
"""获取图片解密密钥XOR和AES
如果已缓存密钥且不强制提取,直接返回缓存的密钥。
否则尝试从微信进程中提取密钥。
注意提取AES密钥需要微信进程正在运行。
"""
account_dir = _resolve_account_dir(account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
# 尝试加载已缓存的密钥
cached_keys = _load_media_keys(account_dir)
if cached_keys and not force_extract:
xor_key = cached_keys.get("xor")
aes_key = cached_keys.get("aes")
if xor_key is not None and aes_key:
return {
"status": "success",
"source": "cache",
"xor_key": f"0x{int(xor_key):02X}",
"aes_key": str(aes_key)[:16] if aes_key else "",
"message": "已从缓存加载密钥",
}
if not wxid_dir:
return {
"status": "error",
"message": "未找到微信数据目录,请确保已正确配置 db_storage_path",
}
# 尝试提取XOR密钥
xor_key = _find_wechat_xor_key(str(wxid_dir))
if xor_key is None:
return {
"status": "error",
"message": "无法提取XOR密钥请确保微信数据目录中存在 _t.dat 模板文件",
}
# 尝试提取AES密钥需要微信进程运行
aes_key16: Optional[bytes] = None
aes_message = ""
most_common = _get_wechat_template_most_common_last2(str(wxid_dir))
if most_common:
ct = _get_wechat_v2_ciphertext(wxid_dir, most_common)
if ct:
aes_key16 = _extract_wechat_aes_key_from_process(ct)
if aes_key16:
aes_message = "已从微信进程提取AES密钥"
# 保存密钥到缓存
_save_media_keys(account_dir, xor_key, aes_key16)
else:
aes_message = "无法从微信进程提取AES密钥微信是否正在运行"
else:
aes_message = "未找到V2加密模板文件"
else:
aes_message = "未找到足够的模板文件用于提取AES密钥"
return {
"status": "success",
"source": "extracted",
"xor_key": f"0x{xor_key:02X}",
"aes_key": aes_key16.decode("ascii", errors="ignore") if aes_key16 else "",
"message": f"XOR密钥提取成功。{aes_message}",
}
@app.post("/api/media/keys", summary="保存图片解密密钥")
async def save_media_keys_api(request: MediaKeysRequest, xor_key: str, aes_key: str):
"""手动保存图片解密密钥
参数:
- xor_key: XOR密钥十六进制格式如 0xA5 或 A5
- aes_key: AES密钥16字符ASCII字符串
"""
account_dir = _resolve_account_dir(request.account)
# 解析XOR密钥
try:
xor_hex = xor_key.strip().lower().replace("0x", "")
xor_int = int(xor_hex, 16)
except Exception:
raise HTTPException(status_code=400, detail="XOR密钥格式无效请使用十六进制格式如 0xA5")
# 验证AES密钥
aes_str = aes_key.strip()
if len(aes_str) < 16:
raise HTTPException(status_code=400, detail="AES密钥长度不足需要至少16个字符")
# 保存密钥
_save_media_keys(account_dir, xor_int, aes_str[:16].encode("ascii", errors="ignore"))
return {
"status": "success",
"message": "密钥已保存",
"xor_key": f"0x{xor_int:02X}",
"aes_key": aes_str[:16],
}
@app.post("/api/media/decrypt_all", summary="批量解密所有图片资源")
async def decrypt_all_media(request: MediaDecryptRequest):
"""批量解密所有图片资源到 output/databases/{账号}/resource 目录
解密后的图片按MD5哈希命名存储在 resource/{md5前2位}/{md5}.{ext} 路径下。
这样可以快速通过MD5定位资源文件。
参数:
- account: 账号目录名(可选)
- xor_key: XOR密钥可选不提供则从缓存读取
- aes_key: AES密钥可选不提供则从缓存读取
"""
account_dir = _resolve_account_dir(request.account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
if not wxid_dir:
raise HTTPException(
status_code=400,
detail="未找到微信数据目录,请确保已正确配置 db_storage_path",
)
# 获取密钥
xor_key: Optional[int] = None
aes_key16: Optional[bytes] = None
if request.xor_key:
try:
xor_hex = request.xor_key.strip().lower().replace("0x", "")
xor_key = int(xor_hex, 16)
except Exception:
raise HTTPException(status_code=400, detail="XOR密钥格式无效")
if request.aes_key:
aes_str = request.aes_key.strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
# 如果未提供密钥,尝试从缓存加载
if xor_key is None or aes_key16 is None:
cached = _load_media_keys(account_dir)
if xor_key is None:
xor_key = cached.get("xor")
if aes_key16 is None:
aes_str = str(cached.get("aes") or "").strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
# 如果仍然没有XOR密钥尝试自动提取
if xor_key is None:
xor_key = _find_wechat_xor_key(str(wxid_dir))
if xor_key is None:
raise HTTPException(
status_code=400,
detail="未找到XOR密钥请先调用 /api/media/keys 获取密钥或手动提供",
)
# 收集所有.dat文件
logger.info(f"开始扫描 {wxid_dir} 中的.dat文件...")
dat_files = _collect_all_dat_files(wxid_dir)
total_files = len(dat_files)
logger.info(f"共发现 {total_files} 个.dat文件")
if total_files == 0:
return {
"status": "success",
"message": "未发现需要解密的.dat文件",
"total": 0,
"success_count": 0,
"skip_count": 0,
"fail_count": 0,
"output_dir": str(_get_resource_dir(account_dir)),
}
# 开始解密
success_count = 0
skip_count = 0
fail_count = 0
failed_files: list[dict] = []
resource_dir = _get_resource_dir(account_dir)
resource_dir.mkdir(parents=True, exist_ok=True)
for dat_path, md5 in dat_files:
# 检查是否已解密
existing = _try_find_decrypted_resource(account_dir, md5)
if existing:
skip_count += 1
continue
# 解密并保存
success, msg = _decrypt_and_save_resource(
dat_path, md5, account_dir, xor_key, aes_key16
)
if success:
success_count += 1
else:
fail_count += 1
if len(failed_files) < 100: # 只记录前100个失败
failed_files.append({
"file": str(dat_path),
"md5": md5,
"error": msg,
})
logger.info(
f"解密完成: 成功={success_count}, 跳过={skip_count}, 失败={fail_count}"
)
return {
"status": "success",
"message": f"解密完成: 成功 {success_count}, 跳过 {skip_count}, 失败 {fail_count}",
"total": total_files,
"success_count": success_count,
"skip_count": skip_count,
"fail_count": fail_count,
"output_dir": str(resource_dir),
"failed_files": failed_files[:20] if failed_files else [],
}
@app.get("/api/media/resource/{md5}", summary="获取已解密的资源文件")
async def get_decrypted_resource(md5: str, account: Optional[str] = None):
"""直接从解密资源目录获取图片
如果资源已解密,直接返回解密后的文件。
这比实时解密更快,适合频繁访问的场景。
"""
if not md5 or len(md5) != 32:
raise HTTPException(status_code=400, detail="无效的MD5")
account_dir = _resolve_account_dir(account)
p = _try_find_decrypted_resource(account_dir, md5.lower())
if not p:
raise HTTPException(status_code=404, detail="资源未找到,请先执行批量解密")
data = p.read_bytes()
media_type = _detect_image_media_type(data[:32])
return Response(content=data, media_type=media_type)
@app.get("/api/media/decrypt_all_stream", summary="批量解密所有图片资源SSE实时进度")
async def decrypt_all_media_stream(
account: Optional[str] = None,
xor_key: Optional[str] = None,
aes_key: Optional[str] = None,
):
"""批量解密所有图片资源通过SSE实时推送进度
返回格式为Server-Sent Events每条消息包含:
- type: progress/complete/error
- current: 当前处理数量
- total: 总文件数
- success_count: 成功数
- skip_count: 跳过数(已解密)
- fail_count: 失败数
- current_file: 当前处理的文件名
- status: 当前文件状态success/skip/fail
- message: 状态消息
跳过原因:文件已经解密过
失败原因:
- 文件为空
- V4-V2版本需要AES密钥但未提供
- 未知加密版本
- 解密结果为空
- 解密后非有效图片格式
"""
async def generate_progress():
try:
account_dir = _resolve_account_dir(account)
wxid_dir = _resolve_account_wxid_dir(account_dir)
if not wxid_dir:
yield f"data: {json.dumps({'type': 'error', 'message': '未找到微信数据目录'})}\n\n"
return
# 获取密钥
xor_key_int: Optional[int] = None
aes_key16: Optional[bytes] = None
if xor_key:
try:
xor_hex = xor_key.strip().lower().replace("0x", "")
xor_key_int = int(xor_hex, 16)
except Exception:
yield f"data: {json.dumps({'type': 'error', 'message': 'XOR密钥格式无效'})}\n\n"
return
if aes_key:
aes_str = aes_key.strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
# 如果未提供密钥,尝试从缓存加载
if xor_key_int is None or aes_key16 is None:
cached = _load_media_keys(account_dir)
if xor_key_int is None:
xor_key_int = cached.get("xor")
if aes_key16 is None:
aes_str = str(cached.get("aes") or "").strip()
if len(aes_str) >= 16:
aes_key16 = aes_str[:16].encode("ascii", errors="ignore")
# 如果仍然没有XOR密钥尝试自动提取
if xor_key_int is None:
xor_key_int = _find_wechat_xor_key(str(wxid_dir))
if xor_key_int is None:
yield f"data: {json.dumps({'type': 'error', 'message': '未找到XOR密钥请先获取密钥'})}\n\n"
return
# 收集所有.dat文件
logger.info(f"[SSE] 开始扫描 {wxid_dir} 中的.dat文件...")
yield f"data: {json.dumps({'type': 'scanning', 'message': '正在扫描图片文件...'})}\n\n"
await asyncio.sleep(0) # 让出控制权
dat_files = _collect_all_dat_files(wxid_dir)
total_files = len(dat_files)
logger.info(f"[SSE] 共发现 {total_files} 个.dat文件仅图片")
if total_files == 0:
yield f"data: {json.dumps({'type': 'complete', 'message': '未发现需要解密的图片文件', 'total': 0, 'success_count': 0, 'skip_count': 0, 'fail_count': 0})}\n\n"
return
# 发送总数信息
yield f"data: {json.dumps({'type': 'start', 'total': total_files, 'message': f'开始解密 {total_files} 个图片文件'})}\n\n"
await asyncio.sleep(0)
# 开始解密
success_count = 0
skip_count = 0
fail_count = 0
failed_files: list[dict] = []
resource_dir = _get_resource_dir(account_dir)
resource_dir.mkdir(parents=True, exist_ok=True)
for i, (dat_path, md5) in enumerate(dat_files):
current = i + 1
file_name = dat_path.name
# 检查是否已解密
existing = _try_find_decrypted_resource(account_dir, md5)
if existing:
skip_count += 1
# 每100个跳过的文件发送一次进度减少消息量
if skip_count % 100 == 0 or current == total_files:
yield f"data: {json.dumps({'type': 'progress', 'current': current, 'total': total_files, 'success_count': success_count, 'skip_count': skip_count, 'fail_count': fail_count, 'current_file': file_name, 'status': 'skip', 'message': '已存在'})}\n\n"
await asyncio.sleep(0)
continue
# 解密并保存
success, msg = _decrypt_and_save_resource(
dat_path, md5, account_dir, xor_key_int, aes_key16
)
if success:
success_count += 1
status = "success"
status_msg = "解密成功"
logger.debug(f"[SSE] 解密成功: {file_name}")
else:
fail_count += 1
status = "fail"
status_msg = msg
logger.warning(f"[SSE] 解密失败: {file_name} - {msg}")
if len(failed_files) < 100:
failed_files.append({
"file": file_name,
"md5": md5,
"error": msg,
})
# 每处理一个文件发送进度(成功或失败都发送)
yield f"data: {json.dumps({'type': 'progress', 'current': current, 'total': total_files, 'success_count': success_count, 'skip_count': skip_count, 'fail_count': fail_count, 'current_file': file_name, 'status': status, 'message': status_msg})}\n\n"
# 每处理10个文件让出一次控制权避免阻塞
if current % 10 == 0:
await asyncio.sleep(0)
logger.info(f"[SSE] 解密完成: 成功={success_count}, 跳过={skip_count}, 失败={fail_count}")
# 发送完成消息
yield f"data: {json.dumps({'type': 'complete', 'total': total_files, 'success_count': success_count, 'skip_count': skip_count, 'fail_count': fail_count, 'output_dir': str(resource_dir), 'failed_files': failed_files[:20], 'message': f'解密完成: 成功 {success_count}, 跳过 {skip_count}, 失败 {fail_count}'})}\n\n"
except Exception as e:
logger.error(f"[SSE] 解密过程出错: {e}")
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
return StreamingResponse(
generate_progress(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no",
}
)
@app.get("/api/health", summary="健康检查端点")
async def health_check():
"""健康检查端点"""