From 78d64de371c7d827df38ab8ed66050ac2a21ad8c Mon Sep 17 00:00:00 2001 From: chuan Date: Thu, 30 Apr 2026 14:57:17 +0800 Subject: [PATCH] feat: add server data directory handling and enhance sync commands with force option --- docker-compose.yml => compose.yml | 0 src/cli.rs | 18 ++- src/config_store.rs | 12 ++ src/main.rs | 11 +- src/server.rs | 242 ++++++++++++++++++++---------- src/sync_client.rs | 187 ++++++++++++++++++++--- 6 files changed, 359 insertions(+), 111 deletions(-) rename docker-compose.yml => compose.yml (100%) diff --git a/docker-compose.yml b/compose.yml similarity index 100% rename from docker-compose.yml rename to compose.yml diff --git a/src/cli.rs b/src/cli.rs index 9e3005a..eadb6e0 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -185,7 +185,7 @@ pub enum ServerCommands { #[arg(long, default_value = "127.0.0.1:8765")] bind: String, #[arg(long)] - data: Option, + data_dir: Option, }, User { #[command(subcommand)] @@ -200,7 +200,7 @@ pub enum ServerUserCommands { #[arg(long)] password: String, #[arg(long)] - data: Option, + data_dir: Option, }, } @@ -214,8 +214,18 @@ pub enum SyncCommands { #[arg(long)] password: String, }, - Pull, - Push, + Pull { + #[arg(short, long)] + force: bool, + }, + Push { + #[arg(short, long)] + force: bool, + }, + Remote { + #[arg(long)] + json: bool, + }, Status, } diff --git a/src/config_store.rs b/src/config_store.rs index d9882eb..1b70de6 100644 --- a/src/config_store.rs +++ b/src/config_store.rs @@ -161,6 +161,18 @@ pub struct SyncConfig { pub last_pull_at: Option, #[serde(default)] pub last_push_at: Option, + #[serde(default)] + pub last_remote_revision: Option, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +pub struct AccountSyncState { + #[serde(default)] + pub revision: u64, + #[serde(default)] + pub updated_at: i64, + #[serde(default)] + pub accounts: Vec, } impl Store { diff --git a/src/main.rs b/src/main.rs index 673f2bd..204587a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -102,13 +102,13 @@ async fn main() -> Result<()> { HomeCommands::Remove { name } => account::remove_home(&name), }, Commands::Server { command } => match command { - ServerCommands::Run { bind, data } => server::run_server(bind, data).await, + ServerCommands::Run { bind, data_dir } => server::run_server(bind, data_dir).await, ServerCommands::User { command } => match command { ServerUserCommands::Add { username, password, - data, - } => server::add_user(data, &username, &password), + data_dir, + } => server::add_user(data_dir, &username, &password), }, }, Commands::Sync { command } => match command { @@ -117,8 +117,9 @@ async fn main() -> Result<()> { user, password, } => sync_client::login(&server, &user, &password).await, - SyncCommands::Pull => sync_client::pull().await, - SyncCommands::Push => sync_client::push().await, + SyncCommands::Pull { force } => sync_client::pull(force).await, + SyncCommands::Push { force } => sync_client::push(force).await, + SyncCommands::Remote { json } => sync_client::remote(json).await, SyncCommands::Status => sync_client::status(), }, Commands::Session { command } => match command { diff --git a/src/server.rs b/src/server.rs index 767dc31..ca7a44c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,8 +1,7 @@ -//! Minimal HTTP sync server for cdxs state. +//! Minimal HTTP sync server for per-user cdxs account state. //! -//! The server stores users, bearer sessions and shared cdxs state in one TOML -//! file. It is intentionally small: clients login, then GET or PUT portable -//! state through `/v1/state`. +//! Server credentials live in `server.toml`. Each user gets an isolated +//! `states/.toml` containing only synced account records. use std::net::SocketAddr; use std::path::{Path, PathBuf}; @@ -21,18 +20,21 @@ use serde::{Deserialize, Serialize}; use sha2::{Digest, Sha256}; use tokio::sync::Mutex; -use crate::config_store::{ServerSession, ServerUser, Store}; +use crate::config_store::{AccountSyncState, ServerConfig, ServerSession, ServerUser}; +use crate::{atomic, paths}; #[derive(Clone)] struct AppState { - /// Path to the server-side cdxs.toml. - data_path: PathBuf, - /// Used for resolving default home entries when the TOML does not exist. - default_home: PathBuf, - /// Serialize file reads/writes so concurrent requests do not race. + data_dir: PathBuf, lock: Arc>, } +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct ServerStore { + #[serde(default)] + server: ServerConfig, +} + #[derive(Debug, Deserialize)] struct LoginRequest { username: String, @@ -44,24 +46,27 @@ struct LoginResponse { token: String, } -pub fn add_user(data: Option, username: &str, password: &str) -> Result<()> { - // User management is file-backed just like normal cdxs state, so the same - // --data path must be used by `server run`. - let (data_path, default_home) = resolve_data_path(data)?; - let mut store = Store::load_from_path(&data_path, &default_home)?; - let username = username.trim(); - if username.is_empty() { - return Err(anyhow!("用户名不能为空")); - } +#[derive(Debug, Deserialize)] +struct PutStateRequest { + expected_revision: Option, + force: bool, + state: AccountSyncState, +} + +pub fn add_user(data_dir: Option, username: &str, password: &str) -> Result<()> { + let data_dir = resolve_data_dir(data_dir)?; + let mut store = load_server_store(&data_dir)?; + let username = normalize_username(username)?; if password.is_empty() { return Err(anyhow!("密码不能为空")); } + let now = Utc::now().timestamp(); let salt = random_token(); let user = ServerUser { - username: username.to_string(), + username: username.clone(), salt: salt.clone(), password_hash: hash_secret(&salt, password), - created_at: Utc::now().timestamp(), + created_at: now, }; if let Some(existing) = store .server @@ -73,21 +78,21 @@ pub fn add_user(data: Option, username: &str, password: &str) -> Result } else { store.server.users.push(user); } - store.save_to_path(&data_path, &default_home)?; + save_server_store(&data_dir, &store)?; + ensure_user_state(&data_dir, &username)?; println!("已添加/更新 server 用户: {username}"); Ok(()) } -pub async fn run_server(bind: String, data: Option) -> Result<()> { - // Bind defaults to localhost for safety; Docker deployments usually pass - // 0.0.0.0 through compose. - let (data_path, default_home) = resolve_data_path(data)?; +pub async fn run_server(bind: String, data_dir: Option) -> Result<()> { + let data_dir = resolve_data_dir(data_dir)?; + std::fs::create_dir_all(states_dir(&data_dir)) + .with_context(|| format!("创建 states 目录失败: {}", states_dir(&data_dir).display()))?; let addr: SocketAddr = bind .parse() .with_context(|| format!("监听地址无效: {bind}"))?; let state = AppState { - data_path, - default_home, + data_dir, lock: Arc::new(Mutex::new(())), }; let app = Router::new() @@ -114,12 +119,13 @@ async fn login_handler( Json(req): Json, ) -> Result, ApiError> { let _guard = state.lock.lock().await; - let mut store = Store::load_from_path(&state.data_path, &state.default_home)?; + let mut store = load_server_store(&state.data_dir)?; + let username = normalize_username(&req.username).map_err(ApiError::bad_request)?; let user = store .server .users .iter() - .find(|user| user.username == req.username) + .find(|user| user.username == username) .ok_or_else(|| ApiError::unauthorized("用户名或密码错误"))?; if user.password_hash != hash_secret(&user.salt, &req.password) { return Err(ApiError::unauthorized("用户名或密码错误")); @@ -127,45 +133,51 @@ async fn login_handler( let token = random_token(); store.server.sessions.push(ServerSession { - username: req.username, + username: username.clone(), token_hash: hash_token(&token), created_at: Utc::now().timestamp(), }); - store.save_to_path(&state.data_path, &state.default_home)?; + save_server_store(&state.data_dir, &store)?; + ensure_user_state(&state.data_dir, &username)?; Ok(Json(LoginResponse { token })) } async fn get_state_handler( State(state): State, headers: HeaderMap, -) -> Result, ApiError> { +) -> Result, ApiError> { let _guard = state.lock.lock().await; - let store = Store::load_from_path(&state.data_path, &state.default_home)?; - authorize(&store, &headers)?; - Ok(Json(sanitized_for_client(&store))) + let store = load_server_store(&state.data_dir)?; + let username = authorize(&store, &headers)?; + Ok(Json(load_user_state(&state.data_dir, &username)?)) } async fn put_state_handler( State(state): State, headers: HeaderMap, - Json(incoming): Json, -) -> Result, ApiError> { + Json(incoming): Json, +) -> Result, ApiError> { let _guard = state.lock.lock().await; - let mut store = Store::load_from_path(&state.data_path, &state.default_home)?; - authorize(&store, &headers)?; + let store = load_server_store(&state.data_dir)?; + let username = authorize(&store, &headers)?; + let current = load_user_state(&state.data_dir, &username)?; + if !incoming.force && incoming.expected_revision != Some(current.revision) { + return Err(ApiError::conflict(format!( + "revision mismatch: expected={:?}, current={}", + incoming.expected_revision, current.revision + ))); + } - // Client sync can replace portable state, but server credentials stay server-side. - store.meta = incoming.meta; - store.accounts = incoming.accounts; - store.homes = incoming.homes; - store.sync = Default::default(); - store.save_to_path(&state.data_path, &state.default_home)?; - Ok(Json(sanitized_for_client(&store))) + let next = AccountSyncState { + revision: current.revision + 1, + updated_at: Utc::now().timestamp(), + accounts: incoming.state.accounts, + }; + save_user_state(&state.data_dir, &username, &next)?; + Ok(Json(next)) } -fn authorize(store: &Store, headers: &HeaderMap) -> Result<(), ApiError> { - // Only token hashes are stored server-side. The raw bearer token is returned - // once during login and then kept by the client in its local cdxs.toml. +fn authorize(store: &ServerStore, headers: &HeaderMap) -> Result { let Some(value) = headers.get("authorization").and_then(|v| v.to_str().ok()) else { return Err(ApiError::unauthorized("缺少 Authorization header")); }; @@ -173,42 +185,102 @@ fn authorize(store: &Store, headers: &HeaderMap) -> Result<(), ApiError> { return Err(ApiError::unauthorized("Authorization 格式无效")); }; let token_hash = hash_token(token); - if store + store .server .sessions .iter() - .any(|session| session.token_hash == token_hash) + .find(|session| session.token_hash == token_hash) + .map(|session| session.username.clone()) + .ok_or_else(|| ApiError::unauthorized("Token 无效")) +} + +fn load_server_store(data_dir: &Path) -> Result { + let path = server_store_path(data_dir); + if !path.exists() { + return Ok(ServerStore::default()); + } + let content = std::fs::read_to_string(&path) + .with_context(|| format!("读取 server.toml 失败: {}", path.display()))?; + if content.trim().is_empty() { + return Ok(ServerStore::default()); + } + toml::from_str(&content).with_context(|| format!("解析 server.toml 失败: {}", path.display())) +} + +fn save_server_store(data_dir: &Path, store: &ServerStore) -> Result<()> { + std::fs::create_dir_all(data_dir) + .with_context(|| format!("创建 server data 目录失败: {}", data_dir.display()))?; + let path = server_store_path(data_dir); + atomic::backup_if_exists(&path, data_dir, "server.toml")?; + let content = toml::to_string_pretty(store).context("序列化 server.toml 失败")?; + atomic::write_atomic(&path, &content) +} + +fn ensure_user_state(data_dir: &Path, username: &str) -> Result<()> { + let path = user_state_path(data_dir, username); + if path.exists() { + return Ok(()); + } + save_user_state(data_dir, username, &AccountSyncState::default()) +} + +fn load_user_state(data_dir: &Path, username: &str) -> Result { + let path = user_state_path(data_dir, username); + if !path.exists() { + return Ok(AccountSyncState::default()); + } + let content = std::fs::read_to_string(&path) + .with_context(|| format!("读取用户同步状态失败: {}", path.display()))?; + if content.trim().is_empty() { + return Ok(AccountSyncState::default()); + } + toml::from_str(&content).with_context(|| format!("解析用户同步状态失败: {}", path.display())) +} + +fn save_user_state(data_dir: &Path, username: &str, state: &AccountSyncState) -> Result<()> { + let dir = states_dir(data_dir); + std::fs::create_dir_all(&dir) + .with_context(|| format!("创建 states 目录失败: {}", dir.display()))?; + let path = user_state_path(data_dir, username); + atomic::backup_if_exists(&path, data_dir, &format!("state-{username}.toml"))?; + let content = toml::to_string_pretty(state).context("序列化用户同步状态失败")?; + atomic::write_atomic(&path, &content) +} + +fn resolve_data_dir(data_dir: Option) -> Result { + if let Some(path) = data_dir { + return Ok(paths::expand_home(path)); + } + Ok(paths::codex_home(None)?.join("cdxs-server")) +} + +fn server_store_path(data_dir: &Path) -> PathBuf { + data_dir.join("server.toml") +} + +fn states_dir(data_dir: &Path) -> PathBuf { + data_dir.join("states") +} + +fn user_state_path(data_dir: &Path, username: &str) -> PathBuf { + states_dir(data_dir).join(format!("{username}.toml")) +} + +fn normalize_username(username: &str) -> Result { + let username = username.trim(); + if username.is_empty() { + return Err(anyhow!("用户名不能为空")); + } + if username + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_') { - Ok(()) + Ok(username.to_string()) } else { - Err(ApiError::unauthorized("Token 无效")) + Err(anyhow!("用户名只能包含字母、数字、- 和 _")) } } -fn sanitized_for_client(store: &Store) -> Store { - // Never leak server users, password hashes or issued bearer sessions to - // clients during pull/login responses. - let mut copy = store.clone(); - copy.server = Default::default(); - copy.sync = Default::default(); - copy -} - -fn resolve_data_path(data: Option) -> Result<(PathBuf, PathBuf)> { - // With --data, the parent directory acts as backup/default-home base. Without - // it, the active CODEX_HOME owns the server's cdxs.toml. - if let Some(path) = data { - let path = crate::paths::expand_home(path); - let default_home = path - .parent() - .map(Path::to_path_buf) - .unwrap_or_else(|| PathBuf::from(".")); - return Ok((path, default_home)); - } - let home = crate::paths::codex_home(None)?; - Ok((crate::paths::config_path(&home), home)) -} - fn random_token() -> String { let mut bytes = [0u8; 32]; rand::thread_rng().fill_bytes(&mut bytes); @@ -236,12 +308,26 @@ struct ApiError { } impl ApiError { + fn bad_request(error: anyhow::Error) -> Self { + Self { + status: StatusCode::BAD_REQUEST, + message: error.to_string(), + } + } + fn unauthorized(message: &str) -> Self { Self { status: StatusCode::UNAUTHORIZED, message: message.to_string(), } } + + fn conflict(message: String) -> Self { + Self { + status: StatusCode::CONFLICT, + message, + } + } } impl From for ApiError { diff --git a/src/sync_client.rs b/src/sync_client.rs index d4b7d5d..44e0d1e 100644 --- a/src/sync_client.rs +++ b/src/sync_client.rs @@ -8,7 +8,7 @@ use chrono::Utc; use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION}; use serde::{Deserialize, Serialize}; -use crate::config_store::Store; +use crate::config_store::{Account, AccountSyncState, Store}; #[derive(Debug, Serialize)] struct LoginRequest<'a> { @@ -21,6 +21,13 @@ struct LoginResponse { token: String, } +#[derive(Debug, Serialize)] +struct PutStateRequest { + expected_revision: Option, + force: bool, + state: AccountSyncState, +} + pub async fn login(server: &str, user: &str, password: &str) -> Result<()> { // Normalize the server URL once so later pull/push can simply append API // paths without double slashes. @@ -49,45 +56,55 @@ pub async fn login(server: &str, user: &str, password: &str) -> Result<()> { store.sync.server_url = Some(server.to_string()); store.sync.username = Some(user.to_string()); store.sync.token = Some(login.token); + store.sync.last_remote_revision = None; store.save(&home)?; println!("sync login 成功: {server} ({user})"); Ok(()) } -pub async fn pull() -> Result<()> { +pub async fn pull(force: bool) -> Result<()> { let home = crate::paths::codex_home(None)?; let mut local = Store::load(&home)?; let (server, token) = sync_endpoint(&local)?; - let remote = reqwest::Client::new() - .get(format!("{server}/v1/state")) - .headers(auth_headers(&token)?) - .send() - .await - .context("sync pull 请求失败")?; - let status = remote.status(); - let body = remote.text().await.context("读取 sync pull 响应失败")?; - if !status.is_success() { - return Err(anyhow!("sync pull 失败: status={}, body={}", status, body)); + let remote_state = fetch_remote_state(&server, &token).await?; + let before = local.accounts.len(); + if force { + local.accounts = remote_state.accounts; + } else { + local.accounts = merge_accounts(local.accounts.clone(), remote_state.accounts); } - let remote_store: Store = serde_json::from_str(&body).context("解析 sync pull 响应失败")?; - // Pull replaces local portable state, but keeps local sync endpoint/token. - local.meta = remote_store.meta; - local.accounts = remote_store.accounts; - local.homes = remote_store.homes; + local.sync.last_remote_revision = Some(remote_state.revision); local.sync.last_pull_at = Some(Utc::now().timestamp()); local.save(&home)?; - println!("sync pull 完成"); + println!( + "sync pull 完成: accounts={} -> {}, revision={}, mode={}", + before, + local.accounts.len(), + remote_state.revision, + if force { "force" } else { "merge" } + ); Ok(()) } -pub async fn push() -> Result<()> { +pub async fn push(force: bool) -> Result<()> { let home = crate::paths::codex_home(None)?; let mut local = Store::load(&home)?; let (server, token) = sync_endpoint(&local)?; - let mut payload = local.clone(); - // Do not upload local server users or sync token back into the shared state. - payload.server = Default::default(); - payload.sync = Default::default(); + if !force { + let remote_state = fetch_remote_state(&server, &token).await?; + let remote_revision = remote_state.revision; + local.accounts = merge_accounts(local.accounts.clone(), remote_state.accounts); + local.sync.last_remote_revision = Some(remote_revision); + } + let payload = PutStateRequest { + expected_revision: local.sync.last_remote_revision.or(Some(0)), + force, + state: AccountSyncState { + revision: local.sync.last_remote_revision.unwrap_or_default(), + updated_at: Utc::now().timestamp(), + accounts: local.accounts.clone(), + }, + }; let response = reqwest::Client::new() .put(format!("{server}/v1/state")) .headers(auth_headers(&token)?) @@ -100,9 +117,64 @@ pub async fn push() -> Result<()> { if !status.is_success() { return Err(anyhow!("sync push 失败: status={}, body={}", status, body)); } + let remote_state: AccountSyncState = + serde_json::from_str(&body).context("解析 sync push 响应失败")?; + local.sync.last_remote_revision = Some(remote_state.revision); local.sync.last_push_at = Some(Utc::now().timestamp()); local.save(&home)?; - println!("sync push 完成"); + println!( + "sync push 完成: accounts={}, revision={}, mode={}", + local.accounts.len(), + remote_state.revision, + if force { "force" } else { "merge" } + ); + Ok(()) +} + +pub async fn remote(json: bool) -> Result<()> { + let home = crate::paths::codex_home(None)?; + let local = Store::load(&home)?; + let (server, token) = sync_endpoint(&local)?; + let remote_state = fetch_remote_state(&server, &token).await?; + if json { + println!("{}", serde_json::to_string_pretty(&remote_state)?); + return Ok(()); + } + + println!( + "remote: {} ({}) revision={}", + server, + local.sync.username.as_deref().unwrap_or("-"), + remote_state.revision + ); + if remote_state.accounts.is_empty() { + println!("远端账号库为空。"); + return Ok(()); + } + println!( + "{:<22} {:<34} {:<10} {:<12} {}", + "ID", "Email", "Mode", "Plan", "Quota" + ); + for account in &remote_state.accounts { + let quota = account + .quota + .as_ref() + .map(|quota| { + format!( + "5h={}%, weekly={}%", + quota.primary_remaining_percent, quota.secondary_remaining_percent + ) + }) + .unwrap_or_else(|| "-".to_string()); + println!( + "{:<22} {:<34} {:<10} {:<12} {}", + shorten(&account.id, 22), + shorten(&account.email, 34), + account_auth_mode_name(account), + account.plan_type.as_deref().unwrap_or("-"), + quota + ); + } Ok(()) } @@ -138,6 +210,14 @@ pub fn status() -> Result<()> { .map(|v| v.to_string()) .unwrap_or_else(|| "-".to_string()) ); + println!( + "last_remote_revision: {}", + store + .sync + .last_remote_revision + .map(|v| v.to_string()) + .unwrap_or_else(|| "-".to_string()) + ); Ok(()) } @@ -168,3 +248,62 @@ fn auth_headers(token: &str) -> Result { ); Ok(headers) } + +async fn fetch_remote_state(server: &str, token: &str) -> Result { + let remote = reqwest::Client::new() + .get(format!("{server}/v1/state")) + .headers(auth_headers(token)?) + .send() + .await + .context("sync pull 请求失败")?; + let status = remote.status(); + let body = remote.text().await.context("读取 sync pull 响应失败")?; + if !status.is_success() { + return Err(anyhow!("sync pull 失败: status={}, body={}", status, body)); + } + serde_json::from_str(&body).context("解析 sync pull 响应失败") +} + +fn merge_accounts(mut local: Vec, remote: Vec) -> Vec { + for remote_account in remote { + if let Some(local_account) = local + .iter_mut() + .find(|account| account.id == remote_account.id) + { + if remote_account.updated_at > local_account.updated_at { + *local_account = remote_account; + } + } else { + local.push(remote_account); + } + } + local.sort_by(|left, right| { + left.email + .to_ascii_lowercase() + .cmp(&right.email.to_ascii_lowercase()) + .then_with(|| left.id.cmp(&right.id)) + }); + local +} + +fn account_auth_mode_name(account: &Account) -> &'static str { + match account.auth_mode { + crate::config_store::AuthMode::Oauth => "oauth", + crate::config_store::AuthMode::ApiKey => "api_key", + } +} + +fn shorten(value: &str, width: usize) -> String { + if value.chars().count() <= width { + return value.to_string(); + } + if width <= 1 { + return "...".to_string(); + } + let mut out = value + .chars() + .take(width.saturating_sub(3)) + .collect::(); + out.push_str("..."); + out +}