feat(keys): 自动保存密钥并支持前端回填

- 新增 output/account_keys.json 账号密钥存储(db_key / image_xor_key / image_aes_key)
- 新增 /api/keys 查询已保存密钥;缺失时兜底从账号目录 _media_keys.json 读取图片密钥

- 数据库解密成功后按账号写入 db_key;保存图片密钥时同步写入 store(失败静默不影响主流程)
- 前端解密页进入图片密钥步骤自动回填;进入下一步/跳过时自动保存一次
This commit is contained in:
2977094657
2026-01-01 16:28:33 +08:00
parent 67358deeef
commit c1712ba6dd
7 changed files with 233 additions and 75 deletions

View File

@@ -10,6 +10,7 @@ from .routers.chat_export import router as _chat_export_router
from .routers.chat_media import router as _chat_media_router
from .routers.decrypt import router as _decrypt_router
from .routers.health import router as _health_router
from .routers.keys import router as _keys_router
from .routers.media import router as _media_router
from .routers.wechat_detection import router as _wechat_detection_router
@@ -38,6 +39,7 @@ app.add_middleware(
app.include_router(_health_router)
app.include_router(_wechat_detection_router)
app.include_router(_decrypt_router)
app.include_router(_keys_router)
app.include_router(_media_router)
app.include_router(_chat_router)
app.include_router(_chat_export_router)

View File

@@ -0,0 +1,69 @@
import datetime
import json
from pathlib import Path
from typing import Any, Optional
_REPO_ROOT = Path(__file__).resolve().parents[2]
_KEY_STORE_PATH = _REPO_ROOT / "output" / "account_keys.json"
def _atomic_write_json(path: Path, payload: Any) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
tmp = path.with_suffix(path.suffix + ".tmp")
tmp.write_text(
json.dumps(payload, ensure_ascii=False, indent=2),
encoding="utf-8",
)
tmp.replace(path)
def load_account_keys_store() -> dict[str, Any]:
if not _KEY_STORE_PATH.exists():
return {}
try:
data = json.loads(_KEY_STORE_PATH.read_text(encoding="utf-8"))
return data if isinstance(data, dict) else {}
except Exception:
return {}
def get_account_keys_from_store(account: str) -> dict[str, Any]:
store = load_account_keys_store()
v = store.get(account, {})
return v if isinstance(v, dict) else {}
def upsert_account_keys_in_store(
account: str,
*,
db_key: Optional[str] = None,
image_xor_key: Optional[str] = None,
image_aes_key: Optional[str] = None,
) -> dict[str, Any]:
account = str(account or "").strip()
if not account:
return {}
store = load_account_keys_store()
item = store.get(account, {})
if not isinstance(item, dict):
item = {}
if db_key is not None:
item["db_key"] = str(db_key)
if image_xor_key is not None:
item["image_xor_key"] = str(image_xor_key)
if image_aes_key is not None:
item["image_aes_key"] = str(image_aes_key)
item["updated_at"] = datetime.datetime.now().isoformat(timespec="seconds")
store[account] = item
try:
_atomic_write_json(_KEY_STORE_PATH, store)
except Exception:
# 不影响主流程:写入失败时静默忽略
pass
return item

View File

@@ -3,6 +3,7 @@ from pydantic import BaseModel, Field
from ..logging_config import get_logger
from ..path_fix import PathFixRoute
from ..key_store import upsert_account_keys_in_store
from ..wechat_decrypt import decrypt_wechat_databases
logger = get_logger(__name__)
@@ -49,6 +50,13 @@ async def decrypt_databases(request: DecryptRequest):
logger.info(f"解密完成: 成功 {results['successful_count']}/{results['total_databases']} 个数据库")
# 成功解密后,按账号保存数据库密钥(用于前端自动回填)
try:
for account_name in (results.get("account_results") or {}).keys():
upsert_account_keys_in_store(str(account_name), db_key=request.key)
except Exception:
pass
return {
"status": "completed" if results["status"] == "success" else "failed",
"total_databases": results["total_databases"],

View File

@@ -0,0 +1,53 @@
from typing import Optional
from fastapi import APIRouter
from ..key_store import get_account_keys_from_store
from ..media_helpers import _load_media_keys, _resolve_account_dir
from ..path_fix import PathFixRoute
router = APIRouter(route_class=PathFixRoute)
@router.get("/api/keys", summary="获取账号已保存的密钥")
async def get_saved_keys(account: Optional[str] = None):
"""获取账号的数据库密钥与图片密钥(用于前端自动回填)"""
account_name: Optional[str] = None
account_dir = None
try:
account_dir = _resolve_account_dir(account)
account_name = account_dir.name
except Exception:
# 账号可能尚未解密;仍允许从全局 store 读取(如果传入了 account
account_name = str(account or "").strip() or None
keys: dict = {}
if account_name:
keys = get_account_keys_from_store(account_name)
# 兼容:如果 store 里没有图片密钥,尝试从账号目录的 _media_keys.json 读取
if account_dir and isinstance(keys, dict):
try:
media = _load_media_keys(account_dir)
if keys.get("image_xor_key") in (None, "") and media.get("xor") is not None:
keys["image_xor_key"] = f"0x{int(media['xor']):02X}"
if keys.get("image_aes_key") in (None, "") and str(media.get("aes") or "").strip():
keys["image_aes_key"] = str(media.get("aes") or "").strip()
except Exception:
pass
# 仅返回需要的字段
result = {
"db_key": str(keys.get("db_key") or "").strip(),
"image_xor_key": str(keys.get("image_xor_key") or "").strip(),
"image_aes_key": str(keys.get("image_aes_key") or "").strip(),
"updated_at": str(keys.get("updated_at") or "").strip(),
}
return {
"status": "success",
"account": account_name,
"keys": result,
}

View File

@@ -20,6 +20,7 @@ from ..media_helpers import (
_try_find_decrypted_resource,
)
from ..path_fix import PathFixRoute
from ..key_store import upsert_account_keys_in_store
logger = get_logger(__name__)
@@ -67,6 +68,14 @@ async def save_media_keys_api(request: MediaKeysSaveRequest):
# 保存密钥
aes_key16 = aes_str[:16].encode("ascii", errors="ignore") if aes_str else None
_save_media_keys(account_dir, xor_int, aes_key16)
try:
upsert_account_keys_in_store(
account_dir.name,
image_xor_key=f"0x{xor_int:02X}",
image_aes_key=aes_str[:16] if aes_str else "",
)
except Exception:
pass
return {
"status": "success",