启动偏好
@@ -376,6 +474,7 @@ const emit = defineEmits(['close'])
const settingNavItems = [
{ key: 'desktop', label: '桌面行为', hint: '启动 / 关闭 / 端口' },
+ { key: 'mcp', label: 'MCP 接入', hint: '手机 / Skill / 工具' },
{ key: 'startup', label: '启动偏好', hint: '自动实时 / 默认页面' },
{ key: 'updates', label: '更新', hint: '版本信息 / 检查更新' },
{ key: 'sns', label: '朋友圈', hint: '图片缓存策略' },
@@ -384,6 +483,7 @@ const settingNavItems = [
const activeSection = ref(settingNavItems[0].key)
const contentScrollRef = ref(null)
const desktopSectionRef = ref(null)
+const mcpSectionRef = ref(null)
const startupSectionRef = ref(null)
const updatesSectionRef = ref(null)
const snsSectionRef = ref(null)
@@ -497,6 +597,65 @@ const desktopLogFileText = computed(() => {
return v || '—'
})
+const mcpLanAccessEnabled = ref(false)
+const mcpLanAccessLoading = ref(false)
+const mcpLanAccessError = ref('')
+const mcpLanAccessMessage = ref('')
+const mcpToken = ref('')
+const mcpTokenLoading = ref(false)
+const mcpTokenError = ref('')
+const mcpSkillBundleText = ref('')
+const mcpSkillBundleLoading = ref(false)
+const mcpSkillBundleError = ref('')
+const mcpCopiedKey = ref('')
+let mcpCopiedTimer = null
+
+const mcpPortText = computed(() => {
+ const n = Number(String(desktopBackendPortInput.value || '').trim())
+ if (Number.isInteger(n) && n >= 1 && n <= 65535) return String(n)
+ return '10392'
+})
+
+const mcpEndpoint = computed(() => {
+ if (!process.client || typeof window === 'undefined') return `http://127.0.0.1:${mcpPortText.value}/mcp`
+ const apiBase = useApiBase()
+ if (/^https?:\/\//i.test(apiBase)) {
+ try {
+ const u = new URL(apiBase)
+ return `${u.origin}/mcp`
+ } catch {}
+ }
+ const protocol = window.location?.protocol === 'https:' ? 'https:' : 'http:'
+ const host = String(window.location?.hostname || '127.0.0.1').trim() || '127.0.0.1'
+ return `${protocol}//${host}:${mcpPortText.value}/mcp`
+})
+
+const mcpSkillFallback = [
+ '# WeChat MCP Copilot',
+ '',
+ 'Use WeChatDataAnalysis MCP like an investigator: start broad, resolve fuzzy targets, then fetch only the context needed to answer.',
+ '',
+ 'Core rules:',
+ '1. Start with initialize and tools/list.',
+ '2. Prefer compact mobile facade tools before low-level tools.',
+ '3. Keep limits small, page results, and expand only when needed.',
+ '4. Use returned URLs for media and exports instead of inlining binary content.',
+].join('\n')
+const mcpAiPrompt = computed(() => [
+ '你现在可以通过 WeChatDataAnalysis MCP 访问本机微信数据。',
+ `MCP endpoint: ${mcpEndpoint.value}`,
+ `Authorization: Bearer ${mcpToken.value || ''}`,
+ '',
+ '接入要求:',
+ '1. 使用 JSON-RPC 2.0 POST 到 MCP endpoint,Content-Type 为 application/json,并带上 Authorization Bearer token。',
+ '2. 先调用 initialize,再用 tools/list 分页读取工具 schema。',
+ '3. 工具调用使用 tools/call,优先读取 result.structuredContent。',
+ '4. 不要一次性请求大结果;按下方 skill 的分页和上下文预算逐步扩展。',
+ '5. 媒体、导出和 SSE 进度按返回 URL 在 App 侧加载,不要让模型内联二进制内容。',
+].join('\n'))
+const mcpSkillText = computed(() => mcpSkillBundleText.value || mcpSkillFallback)
+const mcpTokenText = computed(() => mcpToken.value || '加载中...')
+
const switchTrackClass = (enabled, disabled = false) => {
if (disabled) return enabled ? 'bg-[#07b75b] opacity-50 cursor-not-allowed' : 'bg-[#d0d0d0] opacity-50 cursor-not-allowed'
return enabled ? 'bg-[#07b75b] hover:brightness-95' : 'bg-[#d0d0d0] hover:brightness-95'
@@ -535,6 +694,7 @@ const refreshDesktopOutputDirProgress = async () => {
const sectionElements = computed(() => [
{ key: 'desktop', el: desktopSectionRef.value },
+ { key: 'mcp', el: mcpSectionRef.value },
{ key: 'startup', el: startupSectionRef.value },
{ key: 'updates', el: updatesSectionRef.value },
{ key: 'sns', el: snsSectionRef.value },
@@ -591,6 +751,164 @@ const fetchAdminEndpoint = async (url, options = {}) => {
}
}
+const waitForBackendHealth = async (timeoutMs = 30_000) => {
+ if (!process.client || typeof window === 'undefined') return
+ const apiBase = useApiBase()
+ const healthUrl = `${String(apiBase || '').replace(/\/api\/?$/, '')}/api/health`
+ const startedAt = Date.now()
+ while (true) {
+ try {
+ const r = await fetch(healthUrl, { method: 'GET' })
+ if (r && r.status < 500) return
+ } catch {}
+ if (Date.now() - startedAt > timeoutMs) throw new Error(`后端启动超时:${healthUrl}`)
+ await new Promise((resolve) => setTimeout(resolve, 400))
+ }
+}
+
+const copyMcpText = async (key, text) => {
+ if (!process.client || typeof window === 'undefined') return
+ const value = String(text || '').trim()
+ if (!value) return
+ try {
+ if (navigator?.clipboard?.writeText) {
+ await navigator.clipboard.writeText(value)
+ } else {
+ const el = document.createElement('textarea')
+ el.value = value
+ el.setAttribute('readonly', '')
+ el.style.position = 'fixed'
+ el.style.left = '-9999px'
+ document.body.appendChild(el)
+ el.select()
+ document.execCommand('copy')
+ document.body.removeChild(el)
+ }
+ mcpCopiedKey.value = key
+ if (mcpCopiedTimer) clearTimeout(mcpCopiedTimer)
+ mcpCopiedTimer = setTimeout(() => {
+ if (mcpCopiedKey.value === key) mcpCopiedKey.value = ''
+ }, 1600)
+ } catch {}
+}
+
+const refreshMcpLanAccess = async () => {
+ if (!process.client || typeof window === 'undefined') return
+ mcpLanAccessLoading.value = true
+ mcpLanAccessError.value = ''
+ try {
+ if (window.wechatDesktop?.getMcpLanAccess) {
+ const resp = await window.wechatDesktop.getMcpLanAccess()
+ mcpLanAccessEnabled.value = !!resp?.enabled
+ return
+ }
+ const resp = await fetchAdminEndpoint('/admin/mcp-access')
+ mcpLanAccessEnabled.value = !!resp?.enabled
+ } catch (e) {
+ mcpLanAccessError.value = e?.message || '读取 MCP 接入状态失败'
+ } finally {
+ mcpLanAccessLoading.value = false
+ }
+}
+
+const refreshMcpToken = async () => {
+ if (!process.client || typeof window === 'undefined') return
+ mcpTokenLoading.value = true
+ mcpTokenError.value = ''
+ try {
+ const resp = await fetchAdminEndpoint('/admin/mcp-token')
+ const token = String(resp?.token || '').trim()
+ if (!token) throw new Error('MCP token is empty')
+ mcpToken.value = token
+ } catch (e) {
+ mcpTokenError.value = e?.message || '读取 MCP Token 失败'
+ } finally {
+ mcpTokenLoading.value = false
+ }
+}
+
+const resetMcpToken = async () => {
+ if (!process.client || typeof window === 'undefined') return
+ mcpTokenLoading.value = true
+ mcpTokenError.value = ''
+ try {
+ const resp = await fetchAdminEndpoint('/admin/mcp-token/reset', { method: 'POST' })
+ const token = String(resp?.token || '').trim()
+ if (!token) throw new Error('MCP token is empty')
+ mcpToken.value = token
+ mcpCopiedKey.value = 'token-reset'
+ if (mcpCopiedTimer) clearTimeout(mcpCopiedTimer)
+ mcpCopiedTimer = setTimeout(() => {
+ if (mcpCopiedKey.value === 'token-reset') mcpCopiedKey.value = ''
+ }, 1600)
+ await refreshMcpSkillBundle()
+ } catch (e) {
+ mcpTokenError.value = e?.message || '重置 MCP Token 失败'
+ } finally {
+ mcpTokenLoading.value = false
+ }
+}
+
+const refreshMcpSkillBundle = async () => {
+ if (!process.client || typeof window === 'undefined') return
+ mcpSkillBundleLoading.value = true
+ mcpSkillBundleError.value = ''
+ try {
+ if (!mcpToken.value) await refreshMcpToken()
+ const resp = await $fetch('/mcp/skill/bundle', {
+ baseURL: mcpEndpoint.value.replace(/\/mcp$/, ''),
+ headers: mcpToken.value ? { Authorization: `Bearer ${mcpToken.value}` } : {},
+ })
+ const bundleText = String(resp?.bundleText || '').trim()
+ if (!bundleText) throw new Error('Skill bundle is empty')
+ mcpSkillBundleText.value = bundleText
+ } catch (e) {
+ mcpSkillBundleError.value = e?.message || '读取 skill 失败,当前显示内置精简版'
+ if (!mcpSkillBundleText.value) mcpSkillBundleText.value = ''
+ } finally {
+ mcpSkillBundleLoading.value = false
+ }
+}
+
+const setMcpLanAccess = async (enabled) => {
+ if (!process.client || typeof window === 'undefined') return
+ mcpLanAccessLoading.value = true
+ mcpLanAccessError.value = ''
+ mcpLanAccessMessage.value = ''
+ const previous = mcpLanAccessEnabled.value
+ mcpLanAccessEnabled.value = !!enabled
+ try {
+ if (window.wechatDesktop?.setMcpLanAccess) {
+ const resp = await window.wechatDesktop.setMcpLanAccess(!!enabled)
+ mcpLanAccessEnabled.value = !!resp?.enabled
+ mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,后端已重启。' : 'MCP 局域网接入状态未变化。'
+ return
+ }
+
+ const resp = await fetchAdminEndpoint('/admin/mcp-access', {
+ method: 'POST',
+ body: { enabled: !!enabled },
+ })
+ mcpLanAccessEnabled.value = !!resp?.enabled
+ mcpLanAccessMessage.value = resp?.changed ? 'MCP 局域网接入已更新,正在等待后端重启。' : 'MCP 局域网接入状态未变化。'
+ if (resp?.changed) {
+ await waitForBackendHealth(30_000)
+ mcpLanAccessMessage.value = 'MCP 局域网接入已更新,后端已恢复。'
+ }
+ } catch (e) {
+ mcpLanAccessEnabled.value = previous
+ mcpLanAccessError.value = e?.message || '设置 MCP 接入状态失败'
+ await refreshMcpLanAccess()
+ } finally {
+ mcpLanAccessLoading.value = false
+ }
+}
+
+const toggleMcpLanAccess = async () => {
+ if (mcpLanAccessLoading.value) return
+ await setMcpLanAccess(!mcpLanAccessEnabled.value)
+}
+
const refreshDesktopAutoLaunch = async () => {
if (!process.client || typeof window === 'undefined') return
if (!window.wechatDesktop?.getAutoLaunch) return
@@ -945,6 +1263,9 @@ const onDesktopCheckUpdates = async () => {
watch(() => props.open, async (isOpen) => {
if (!isOpen) return
+ await refreshMcpLanAccess()
+ await refreshMcpToken()
+ await refreshMcpSkillBundle()
await refreshBackendLogFileInfo()
if (isDesktopEnv.value) {
await refreshDesktopOutputDir()
@@ -969,6 +1290,9 @@ onMounted(async () => {
snsUseCache.value = readLocalBoolSetting(SNS_SETTING_USE_CACHE_KEY, true)
await refreshDesktopBackendPort()
+ await refreshMcpLanAccess()
+ await refreshMcpToken()
+ await refreshMcpSkillBundle()
if (isDesktopEnv.value) {
void desktopUpdate.initListeners()
await refreshDesktopAutoLaunch()
@@ -984,6 +1308,10 @@ onMounted(async () => {
onBeforeUnmount(() => {
if (!process.client || typeof window === 'undefined') return
window.removeEventListener('keydown', onEscKeydown)
+ if (mcpCopiedTimer) {
+ clearTimeout(mcpCopiedTimer)
+ mcpCopiedTimer = null
+ }
if (typeof removeDesktopOutputDirProgressListener === 'function') {
removeDesktopOutputDirProgressListener()
removeDesktopOutputDirProgressListener = null
diff --git a/main.py b/main.py
index 924dc94..b6acd1f 100644
--- a/main.py
+++ b/main.py
@@ -11,11 +11,11 @@
import uvicorn
import os
from pathlib import Path
-from wechat_decrypt_tool.runtime_settings import read_effective_backend_port
+from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
def main():
"""启动微信解密工具API服务"""
- host = os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1")
+ host, host_source = read_effective_backend_host(default="127.0.0.1")
port, port_source = read_effective_backend_port(default=10392)
access_host = "127.0.0.1" if host in {"0.0.0.0", "::"} else host
@@ -29,6 +29,13 @@ def main():
print("端口来源: 配置文件 output/runtime_settings.json(由网页/桌面设置写入)")
else:
print("端口来源: 默认值")
+ if host_source == "env":
+ print("监听地址来源: 环境变量 WECHAT_TOOL_HOST")
+ elif host_source == "settings":
+ print("监听地址来源: 配置文件 output/runtime_settings.json(由网页/桌面设置写入)")
+ else:
+ print("监听地址来源: 默认值")
+ print(f"监听地址: {host}")
print(f"API文档: http://{access_host}:{port}/docs")
print(f"健康检查: http://{access_host}:{port}/api/health")
print("按 Ctrl+C 停止服务")
diff --git a/src/wechat_decrypt_tool/backend_entry.py b/src/wechat_decrypt_tool/backend_entry.py
index 4f0d5e3..8b39318 100644
--- a/src/wechat_decrypt_tool/backend_entry.py
+++ b/src/wechat_decrypt_tool/backend_entry.py
@@ -9,11 +9,11 @@ import os
import uvicorn
from wechat_decrypt_tool.api import app
-from wechat_decrypt_tool.runtime_settings import read_effective_backend_port
+from wechat_decrypt_tool.runtime_settings import read_effective_backend_host, read_effective_backend_port
def main() -> None:
- host = os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1")
+ host, _ = read_effective_backend_host(default="127.0.0.1")
port, _ = read_effective_backend_port(default=10392)
uvicorn.run(app, host=host, port=port, log_level="info")
diff --git a/src/wechat_decrypt_tool/routers/admin.py b/src/wechat_decrypt_tool/routers/admin.py
index e947e3c..519a0c7 100644
--- a/src/wechat_decrypt_tool/routers/admin.py
+++ b/src/wechat_decrypt_tool/routers/admin.py
@@ -15,7 +15,19 @@ from starlette.requests import Request
from ..logging_config import get_log_file_path, get_logger
from ..path_fix import PathFixRoute
-from ..runtime_settings import read_effective_backend_port, write_backend_port_env_file, write_backend_port_setting
+from ..runtime_settings import (
+ LAN_BACKEND_HOST,
+ LOOPBACK_BACKEND_HOST,
+ read_effective_backend_host,
+ read_effective_mcp_token,
+ read_effective_backend_port,
+ reset_mcp_token,
+ write_backend_host_env_file,
+ write_backend_host_setting,
+ write_backend_port_env_file,
+ write_backend_port_setting,
+ write_mcp_token_env_file,
+)
router = APIRouter(route_class=PathFixRoute)
@@ -33,7 +45,8 @@ def _format_host_for_url(host: str) -> str:
def _get_backend_bind_host() -> str:
- return str(os.environ.get("WECHAT_TOOL_HOST", "127.0.0.1") or "").strip() or "127.0.0.1"
+ host, _ = read_effective_backend_host(default=LOOPBACK_BACKEND_HOST)
+ return host
def _get_backend_access_host() -> str:
@@ -116,10 +129,10 @@ async def _wait_for_backend_ready(health_url: str, timeout_s: float = 30.0) -> b
return False
-def _spawn_backend_process(next_port: int) -> subprocess.Popen:
+def _spawn_backend_process(next_port: int, next_host: str | None = None) -> subprocess.Popen:
env = os.environ.copy()
env["WECHAT_TOOL_PORT"] = str(int(next_port))
- env.setdefault("WECHAT_TOOL_HOST", _get_backend_bind_host())
+ env["WECHAT_TOOL_HOST"] = str(next_host or _get_backend_bind_host())
# Keep the same working directory so output paths remain consistent.
# (When `WECHAT_TOOL_DATA_DIR` is not set, the app uses `Path.cwd()`.)
@@ -150,6 +163,40 @@ def _spawn_backend_process(next_port: int) -> subprocess.Popen:
return subprocess.Popen(cmd, cwd=spawn_cwd, env=env)
+def _spawn_backend_process_after_delay(next_port: int, next_host: str, delay_s: float = 0.8) -> subprocess.Popen:
+ env = os.environ.copy()
+ env["WECHAT_TOOL_PORT"] = str(int(next_port))
+ env["WECHAT_TOOL_HOST"] = str(next_host or _get_backend_bind_host())
+
+ cwd = os.getcwd()
+ cwd_path = Path(cwd)
+ src_dir = cwd_path / "src"
+ try:
+ existing_pp = str(env.get("PYTHONPATH", "") or "").strip()
+ if src_dir.is_dir():
+ env["PYTHONPATH"] = str(src_dir) if not existing_pp else f"{src_dir}{os.pathsep}{existing_pp}"
+ except Exception:
+ pass
+
+ if getattr(sys, "frozen", False):
+ target = [sys.executable]
+ else:
+ main_py = cwd_path / "main.py"
+ if main_py.is_file():
+ target = [sys.executable, str(main_py)]
+ else:
+ target = [sys.executable, "-m", "wechat_decrypt_tool.backend_entry"]
+
+ # Keep the launcher independent from this process; it starts the backend after
+ # the current process has released its listening socket.
+ launcher_code = (
+ "import os,subprocess,sys,time;"
+ f"time.sleep({max(0.0, float(delay_s))!r});"
+ "subprocess.Popen(sys.argv[1:], cwd=os.getcwd(), env=os.environ)"
+ )
+ return subprocess.Popen([sys.executable, "-c", launcher_code, *target], cwd=cwd, env=env)
+
+
async def _exit_process_after(delay_s: float) -> None:
try:
await asyncio.sleep(max(0.0, float(delay_s)))
@@ -212,6 +259,58 @@ async def get_backend_port() -> dict:
return {"port": port, "source": source, "default_port": DEFAULT_BACKEND_PORT}
+@router.get("/api/admin/mcp-access", summary="获取 MCP 局域网接入状态")
+async def get_mcp_access() -> dict:
+ host, source = read_effective_backend_host(default=LOOPBACK_BACKEND_HOST)
+ port, port_source = read_effective_backend_port(default=DEFAULT_BACKEND_PORT)
+ return {
+ "enabled": host == LAN_BACKEND_HOST,
+ "host": host,
+ "source": source,
+ "port": port,
+ "port_source": port_source,
+ "default_host": LOOPBACK_BACKEND_HOST,
+ "lan_host": LAN_BACKEND_HOST,
+ "restart_required": False,
+ }
+
+
+@router.get("/api/admin/mcp-token", summary="获取 MCP token(仅允许本机访问)")
+async def get_mcp_token(request: Request) -> dict:
+ if not _is_loopback_client(request):
+ raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
+
+ from ..runtime_settings import ensure_mcp_token
+
+ token, source = ensure_mcp_token()
+ env_file = write_mcp_token_env_file(token)
+ return {
+ "success": True,
+ "token": token,
+ "source": source,
+ "env_file": str(env_file) if env_file else None,
+ }
+
+
+@router.post("/api/admin/mcp-token/reset", summary="重置 MCP token(仅允许本机访问)")
+async def reset_mcp_token_endpoint(request: Request) -> dict:
+ if not _is_loopback_client(request):
+ raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
+
+ previous, previous_source = read_effective_mcp_token()
+ token = reset_mcp_token()
+ os.environ["WECHAT_TOOL_MCP_TOKEN"] = token
+ env_file = write_mcp_token_env_file(token)
+ return {
+ "success": True,
+ "changed": token != previous,
+ "token": token,
+ "previous_source": previous_source,
+ "source": "reset",
+ "env_file": str(env_file) if env_file else None,
+ }
+
+
@router.post("/api/admin/port", summary="修改后端端口并重启(仅允许本机访问)")
async def set_backend_port(payload: dict, request: Request, background_tasks: BackgroundTasks) -> dict:
if not _is_loopback_client(request):
@@ -281,3 +380,54 @@ async def set_backend_port(payload: dict, request: Request, background_tasks: Ba
}
finally:
_PORT_CHANGE_IN_PROGRESS = False
+
+
+@router.post("/api/admin/mcp-access", summary="开启或关闭 MCP 局域网接入并重启后端(仅允许本机访问)")
+async def set_mcp_access(payload: dict, request: Request, background_tasks: BackgroundTasks) -> dict:
+ if not _is_loopback_client(request):
+ raise HTTPException(status_code=403, detail="仅允许本机访问该接口")
+
+ global _PORT_CHANGE_IN_PROGRESS
+ if _PORT_CHANGE_IN_PROGRESS:
+ raise HTTPException(status_code=409, detail="后端切换中,请稍后重试")
+
+ enabled = bool(payload.get("enabled")) if isinstance(payload, dict) else False
+ next_host = LAN_BACKEND_HOST if enabled else LOOPBACK_BACKEND_HOST
+ current_host = _get_backend_bind_host()
+ current_port, _ = read_effective_backend_port(default=DEFAULT_BACKEND_PORT)
+
+ if next_host == current_host:
+ write_backend_host_setting(next_host)
+ env_file = write_backend_host_env_file(next_host)
+ return {
+ "success": True,
+ "changed": False,
+ "enabled": enabled,
+ "host": next_host,
+ "port": int(current_port),
+ "ui_url": f"http://{_format_host_for_url(_get_backend_access_host())}:{int(current_port)}/",
+ "env_file": str(env_file) if env_file else None,
+ }
+
+ _PORT_CHANGE_IN_PROGRESS = True
+ try:
+ write_backend_host_setting(next_host)
+ env_file = write_backend_host_env_file(next_host)
+
+ # Host changes keep the same port. The old socket must close before the
+ # new process can bind, so start a detached launcher and then exit.
+ background_tasks.add_task(_spawn_backend_process_after_delay, int(current_port), next_host, 0.8)
+ background_tasks.add_task(_exit_process_after, 0.2)
+
+ return {
+ "success": True,
+ "changed": True,
+ "enabled": enabled,
+ "host": next_host,
+ "port": int(current_port),
+ "ui_url": f"http://{_format_host_for_url(LOOPBACK_BACKEND_HOST)}:{int(current_port)}/",
+ "env_file": str(env_file) if env_file else None,
+ "restart_scheduled": True,
+ }
+ finally:
+ _PORT_CHANGE_IN_PROGRESS = False
diff --git a/src/wechat_decrypt_tool/runtime_settings.py b/src/wechat_decrypt_tool/runtime_settings.py
index d071d53..b7196d4 100644
--- a/src/wechat_decrypt_tool/runtime_settings.py
+++ b/src/wechat_decrypt_tool/runtime_settings.py
@@ -3,14 +3,21 @@ from __future__ import annotations
import json
import os
import re
+import secrets
from pathlib import Path
RUNTIME_SETTINGS_FILENAME = "runtime_settings.json"
BACKEND_PORT_KEY = "backend_port"
+BACKEND_HOST_KEY = "backend_host"
+MCP_TOKEN_KEY = "mcp_token"
ENV_PORT_KEY = "WECHAT_TOOL_PORT"
+ENV_HOST_KEY = "WECHAT_TOOL_HOST"
+ENV_MCP_TOKEN_KEY = "WECHAT_TOOL_MCP_TOKEN"
ENV_FILE_KEY = "WECHAT_TOOL_ENV_FILE"
DEFAULT_ENV_FILENAME = ".env"
+LOOPBACK_BACKEND_HOST = "127.0.0.1"
+LAN_BACKEND_HOST = "0.0.0.0"
def _parse_port(value: object) -> int | None:
@@ -31,6 +38,66 @@ def _parse_port(value: object) -> int | None:
return port
+def _normalize_host(value: object) -> str | None:
+ try:
+ raw = str(value or "").strip()
+ except Exception:
+ return None
+ if raw in {LOOPBACK_BACKEND_HOST, "localhost", "::1"}:
+ return LOOPBACK_BACKEND_HOST
+ if raw in {LAN_BACKEND_HOST, "::"}:
+ return LAN_BACKEND_HOST
+ return None
+
+
+def _normalize_mcp_token(value: object) -> str | None:
+ try:
+ raw = str(value or "").strip()
+ except Exception:
+ return None
+ if len(raw) < 16 or len(raw) > 512:
+ return None
+ if any(ch.isspace() for ch in raw):
+ return None
+ return raw
+
+
+def generate_mcp_token() -> str:
+ return secrets.token_urlsafe(32)
+
+
+def _read_runtime_settings() -> dict:
+ path = get_runtime_settings_path()
+ try:
+ if not path.is_file():
+ return {}
+ data = json.loads(path.read_text(encoding="utf-8") or "{}")
+ return data if isinstance(data, dict) else {}
+ except Exception:
+ return {}
+
+
+def _write_runtime_settings(data: dict) -> None:
+ path = get_runtime_settings_path()
+ try:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ except Exception:
+ return
+
+ try:
+ cleaned = data if isinstance(data, dict) else {}
+ if not cleaned:
+ try:
+ path.unlink(missing_ok=True)
+ except Exception:
+ pass
+ return
+
+ path.write_text(json.dumps(cleaned, ensure_ascii=False, indent=2), encoding="utf-8")
+ except Exception:
+ return
+
+
def get_runtime_settings_path() -> Path:
from .app_paths import get_output_dir
@@ -38,50 +105,24 @@ def get_runtime_settings_path() -> Path:
def read_backend_port_setting() -> int | None:
- path = get_runtime_settings_path()
try:
- if not path.is_file():
- return None
- data = json.loads(path.read_text(encoding="utf-8") or "{}")
- if not isinstance(data, dict):
- return None
+ data = _read_runtime_settings()
return _parse_port(data.get(BACKEND_PORT_KEY))
except Exception:
return None
def write_backend_port_setting(port: int | None) -> None:
- path = get_runtime_settings_path()
safe_port = _parse_port(port)
try:
- path.parent.mkdir(parents=True, exist_ok=True)
- except Exception:
- return
-
- try:
- data: dict = {}
- if path.is_file():
- try:
- existing = json.loads(path.read_text(encoding="utf-8") or "{}")
- if isinstance(existing, dict):
- data = existing
- except Exception:
- data = {}
+ data = _read_runtime_settings()
if safe_port is None:
data.pop(BACKEND_PORT_KEY, None)
else:
data[BACKEND_PORT_KEY] = safe_port
- # Keep the file small and stable; remove if empty.
- if not data:
- try:
- path.unlink(missing_ok=True)
- except Exception:
- pass
- return
-
- path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
+ _write_runtime_settings(data)
except Exception:
return
@@ -101,6 +142,92 @@ def read_effective_backend_port(default: int) -> tuple[int, str]:
return int(default), "default"
+def read_backend_host_setting() -> str | None:
+ try:
+ data = _read_runtime_settings()
+ return _normalize_host(data.get(BACKEND_HOST_KEY))
+ except Exception:
+ return None
+
+
+def write_backend_host_setting(host: str | None) -> None:
+ safe_host = _normalize_host(host)
+ try:
+ data = _read_runtime_settings()
+ if safe_host is None:
+ data.pop(BACKEND_HOST_KEY, None)
+ else:
+ data[BACKEND_HOST_KEY] = safe_host
+ _write_runtime_settings(data)
+ except Exception:
+ return
+
+
+def read_effective_backend_host(default: str = LOOPBACK_BACKEND_HOST) -> tuple[str, str]:
+ """Return (host, source) where source is one of: env | settings | default."""
+
+ env_host = _normalize_host(os.environ.get(ENV_HOST_KEY, ""))
+ if env_host is not None:
+ return env_host, "env"
+
+ settings_host = read_backend_host_setting()
+ if settings_host is not None:
+ return settings_host, "settings"
+
+ return _normalize_host(default) or LOOPBACK_BACKEND_HOST, "default"
+
+
+def read_mcp_token_setting() -> str | None:
+ try:
+ data = _read_runtime_settings()
+ return _normalize_mcp_token(data.get(MCP_TOKEN_KEY))
+ except Exception:
+ return None
+
+
+def write_mcp_token_setting(token: str | None) -> None:
+ safe_token = _normalize_mcp_token(token)
+ try:
+ data = _read_runtime_settings()
+ if safe_token is None:
+ data.pop(MCP_TOKEN_KEY, None)
+ else:
+ data[MCP_TOKEN_KEY] = safe_token
+ _write_runtime_settings(data)
+ except Exception:
+ return
+
+
+def read_effective_mcp_token() -> tuple[str | None, str]:
+ """Return (token, source) where source is one of: env | settings | missing."""
+
+ env_token = _normalize_mcp_token(os.environ.get(ENV_MCP_TOKEN_KEY, ""))
+ if env_token is not None:
+ return env_token, "env"
+
+ settings_token = read_mcp_token_setting()
+ if settings_token is not None:
+ return settings_token, "settings"
+
+ return None, "missing"
+
+
+def ensure_mcp_token() -> tuple[str, str]:
+ token, source = read_effective_mcp_token()
+ if token:
+ return token, source
+
+ token = generate_mcp_token()
+ write_mcp_token_setting(token)
+ return token, "generated"
+
+
+def reset_mcp_token() -> str:
+ token = generate_mcp_token()
+ write_mcp_token_setting(token)
+ return token
+
+
def get_env_file_path() -> Path | None:
"""Best-effort env file path for `uv run` (defaults to repo root `.env`)."""
@@ -173,3 +300,27 @@ def write_backend_port_env_file(port: int | None) -> Path | None:
safe_port = _parse_port(port)
ok = _set_env_var_in_file(env_file, ENV_PORT_KEY, str(safe_port) if safe_port is not None else None)
return env_file if ok else None
+
+
+def write_backend_host_env_file(host: str | None) -> Path | None:
+ """Write `WECHAT_TOOL_HOST` into a `.env` file so `uv run main.py` picks it up on restart."""
+
+ env_file = get_env_file_path()
+ if not env_file:
+ return None
+
+ safe_host = _normalize_host(host)
+ ok = _set_env_var_in_file(env_file, ENV_HOST_KEY, safe_host)
+ return env_file if ok else None
+
+
+def write_mcp_token_env_file(token: str | None) -> Path | None:
+ """Write `WECHAT_TOOL_MCP_TOKEN` into a `.env` file so `uv run main.py` picks it up."""
+
+ env_file = get_env_file_path()
+ if not env_file:
+ return None
+
+ safe_token = _normalize_mcp_token(token)
+ ok = _set_env_var_in_file(env_file, ENV_MCP_TOKEN_KEY, safe_token)
+ return env_file if ok else None