"""微信解密工具的FastAPI Web服务器""" import os from pathlib import Path from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from starlette.requests import Request from starlette.exceptions import HTTPException as StarletteHTTPException from starlette.responses import FileResponse from starlette.staticfiles import StaticFiles from .logging_config import setup_logging, get_logger # 初始化日志系统 setup_logging() logger = get_logger(__name__) request_logger = get_logger("wechat_decrypt_tool.request") from . import __version__ as APP_VERSION from .path_fix import PathFixRoute from .chat_realtime_autosync import CHAT_REALTIME_AUTOSYNC from .routers.chat import router as _chat_router from .routers.chat_contacts import router as _chat_contacts_router from .routers.chat_export import router as _chat_export_router from .routers.chat_media import router as _chat_media_router from .routers.decrypt import router as _decrypt_router from .routers.health import router as _health_router from .routers.admin import router as _admin_router from .routers.keys import router as _keys_router from .routers.media import router as _media_router from .routers.sns import router as _sns_router from .routers.sns_export import router as _sns_export_router from .routers.wechat_detection import router as _wechat_detection_router from .routers.wrapped import router as _wrapped_router from .request_logging import log_server_errors_middleware from .sns_stage_timing import add_sns_stage_timing_headers from .wcdb_realtime import WCDB_REALTIME, shutdown as _wcdb_shutdown app = FastAPI( title="微信数据库解密工具", description="现代化的微信数据库解密工具,支持微信信息检测和数据库解密功能", version=APP_VERSION, ) # 设置自定义路由类 app.router.route_class = PathFixRoute # Enable CORS for React frontend app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], expose_headers=["X-SNS-Source", "X-SNS-Hit-Type", "X-SNS-X-Enc"], ) @app.middleware("http") async def _add_sns_stage_timing_headers(request: Request, call_next): """Expose SNS stage metadata to the frontend without extra requests. `` elements can't read response headers, but browsers can surface `Server-Timing` via `performance.getEntriesByName(...).serverTiming` when `Timing-Allow-Origin` is set. """ response = await call_next(request) try: add_sns_stage_timing_headers( response.headers, source=str(response.headers.get("X-SNS-Source") or ""), hit_type=str(response.headers.get("X-SNS-Hit-Type") or ""), x_enc=str(response.headers.get("X-SNS-X-Enc") or ""), ) except Exception: pass return response @app.middleware("http") async def _log_server_errors(request: Request, call_next): return await log_server_errors_middleware(request_logger, request, call_next) app.include_router(_health_router) app.include_router(_admin_router) app.include_router(_wechat_detection_router) app.include_router(_decrypt_router) app.include_router(_keys_router) app.include_router(_media_router) app.include_router(_chat_router) app.include_router(_chat_contacts_router) app.include_router(_chat_export_router) app.include_router(_chat_media_router) app.include_router(_sns_router) app.include_router(_sns_export_router) app.include_router(_wrapped_router) class _SPAStaticFiles(StaticFiles): """StaticFiles with a SPA fallback (Nuxt generate output).""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._fallback_200 = Path(str(self.directory)) / "200.html" self._fallback_index = Path(str(self.directory)) / "index.html" @staticmethod def _normalize_path(path: str) -> str: return str(path or "").strip().lstrip("/") @classmethod def _is_shell_path(cls, path: str) -> bool: normalized = cls._normalize_path(path) return normalized in {"", "index.html", "200.html", "_payload.json"} or normalized.startswith( "_payload.json/" ) @classmethod def _apply_cache_headers(cls, path: str, response): normalized = cls._normalize_path(path) try: if cls._is_shell_path(normalized): response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" response.headers["Pragma"] = "no-cache" response.headers["Expires"] = "0" elif normalized.startswith("_nuxt/"): response.headers.setdefault("Cache-Control", "public, max-age=31536000, immutable") except Exception: pass return response async def get_response(self, path: str, scope): # type: ignore[override] normalized = self._normalize_path(path) try: response = await super().get_response(path, scope) return self._apply_cache_headers(normalized, response) except StarletteHTTPException as exc: if exc.status_code != 404: raise # For client-side routes (no file extension), return Nuxt's SPA fallback. name = Path(path).name if "." in name: raise if self._fallback_200.exists(): return self._apply_cache_headers("200.html", FileResponse(str(self._fallback_200))) return self._apply_cache_headers("index.html", FileResponse(str(self._fallback_index))) def _maybe_mount_frontend() -> None: """Serve the generated Nuxt static site at `/` if present. This keeps web + desktop UI identical when the desktop shell (Electron) loads http://127.0.0.1:/ from the same backend that serves `/api/*`. """ ui_dir_env = os.environ.get("WECHAT_TOOL_UI_DIR", "").strip() candidates: list[Path] = [] if ui_dir_env: candidates.append(Path(ui_dir_env)) # Repo default: `frontend/.output/public` after `npm --prefix frontend run generate`. repo_root = Path(__file__).resolve().parents[2] candidates.append(repo_root / "frontend" / ".output" / "public") ui_dir: Path | None = None for p in candidates: try: if (p / "index.html").is_file(): ui_dir = p break except Exception: continue if not ui_dir: return try: app.mount("/", _SPAStaticFiles(directory=str(ui_dir), html=True), name="ui") logger.info("Serving frontend UI from: %s", ui_dir) except Exception: logger.exception("Failed to mount frontend UI from: %s", ui_dir) _maybe_mount_frontend() @app.on_event("startup") async def _startup_background_jobs() -> None: try: CHAT_REALTIME_AUTOSYNC.start() except Exception: logger.exception("Failed to start realtime autosync service") @app.on_event("shutdown") async def _shutdown_wcdb_realtime() -> None: try: CHAT_REALTIME_AUTOSYNC.stop() except Exception: pass close_ok = False lock_timeout_s: float | None = 0.2 try: raw = str(os.environ.get("WECHAT_TOOL_WCDB_SHUTDOWN_LOCK_TIMEOUT_S", "0.2") or "").strip() lock_timeout_s = float(raw) if raw else 0.2 if lock_timeout_s <= 0: lock_timeout_s = None except Exception: lock_timeout_s = 0.2 try: close_ok = WCDB_REALTIME.close_all(lock_timeout_s=lock_timeout_s) except Exception: close_ok = False if close_ok: try: _wcdb_shutdown() except Exception: pass else: # If some conn locks were busy, other threads may still be running WCDB calls; avoid shutting down the lib. logger.warning("[wcdb] close_all not fully completed; skip wcdb_shutdown") if __name__ == "__main__": import uvicorn from .runtime_settings import read_effective_backend_port host = os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1") port, _ = read_effective_backend_port(default=10392) uvicorn.run(app, host=host, port=port)