feat: add server data directory handling and enhance sync commands with force option

This commit is contained in:
2026-04-30 14:57:17 +08:00
Unverified
parent ed1deae555
commit 78d64de371
6 changed files with 359 additions and 111 deletions
View File
+14 -4
View File
@@ -185,7 +185,7 @@ pub enum ServerCommands {
#[arg(long, default_value = "127.0.0.1:8765")]
bind: String,
#[arg(long)]
data: Option<PathBuf>,
data_dir: Option<PathBuf>,
},
User {
#[command(subcommand)]
@@ -200,7 +200,7 @@ pub enum ServerUserCommands {
#[arg(long)]
password: String,
#[arg(long)]
data: Option<PathBuf>,
data_dir: Option<PathBuf>,
},
}
@@ -214,8 +214,18 @@ pub enum SyncCommands {
#[arg(long)]
password: String,
},
Pull,
Push,
Pull {
#[arg(short, long)]
force: bool,
},
Push {
#[arg(short, long)]
force: bool,
},
Remote {
#[arg(long)]
json: bool,
},
Status,
}
+12
View File
@@ -161,6 +161,18 @@ pub struct SyncConfig {
pub last_pull_at: Option<i64>,
#[serde(default)]
pub last_push_at: Option<i64>,
#[serde(default)]
pub last_remote_revision: Option<u64>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct AccountSyncState {
#[serde(default)]
pub revision: u64,
#[serde(default)]
pub updated_at: i64,
#[serde(default)]
pub accounts: Vec<Account>,
}
impl Store {
+6 -5
View File
@@ -102,13 +102,13 @@ async fn main() -> Result<()> {
HomeCommands::Remove { name } => account::remove_home(&name),
},
Commands::Server { command } => match command {
ServerCommands::Run { bind, data } => server::run_server(bind, data).await,
ServerCommands::Run { bind, data_dir } => server::run_server(bind, data_dir).await,
ServerCommands::User { command } => match command {
ServerUserCommands::Add {
username,
password,
data,
} => server::add_user(data, &username, &password),
data_dir,
} => server::add_user(data_dir, &username, &password),
},
},
Commands::Sync { command } => match command {
@@ -117,8 +117,9 @@ async fn main() -> Result<()> {
user,
password,
} => sync_client::login(&server, &user, &password).await,
SyncCommands::Pull => sync_client::pull().await,
SyncCommands::Push => sync_client::push().await,
SyncCommands::Pull { force } => sync_client::pull(force).await,
SyncCommands::Push { force } => sync_client::push(force).await,
SyncCommands::Remote { json } => sync_client::remote(json).await,
SyncCommands::Status => sync_client::status(),
},
Commands::Session { command } => match command {
+164 -78
View File
@@ -1,8 +1,7 @@
//! Minimal HTTP sync server for cdxs state.
//! Minimal HTTP sync server for per-user cdxs account state.
//!
//! The server stores users, bearer sessions and shared cdxs state in one TOML
//! file. It is intentionally small: clients login, then GET or PUT portable
//! state through `/v1/state`.
//! Server credentials live in `server.toml`. Each user gets an isolated
//! `states/<username>.toml` containing only synced account records.
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
@@ -21,18 +20,21 @@ use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use tokio::sync::Mutex;
use crate::config_store::{ServerSession, ServerUser, Store};
use crate::config_store::{AccountSyncState, ServerConfig, ServerSession, ServerUser};
use crate::{atomic, paths};
#[derive(Clone)]
struct AppState {
/// Path to the server-side cdxs.toml.
data_path: PathBuf,
/// Used for resolving default home entries when the TOML does not exist.
default_home: PathBuf,
/// Serialize file reads/writes so concurrent requests do not race.
data_dir: PathBuf,
lock: Arc<Mutex<()>>,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct ServerStore {
#[serde(default)]
server: ServerConfig,
}
#[derive(Debug, Deserialize)]
struct LoginRequest {
username: String,
@@ -44,24 +46,27 @@ struct LoginResponse {
token: String,
}
pub fn add_user(data: Option<PathBuf>, username: &str, password: &str) -> Result<()> {
// User management is file-backed just like normal cdxs state, so the same
// --data path must be used by `server run`.
let (data_path, default_home) = resolve_data_path(data)?;
let mut store = Store::load_from_path(&data_path, &default_home)?;
let username = username.trim();
if username.is_empty() {
return Err(anyhow!("用户名不能为空"));
}
#[derive(Debug, Deserialize)]
struct PutStateRequest {
expected_revision: Option<u64>,
force: bool,
state: AccountSyncState,
}
pub fn add_user(data_dir: Option<PathBuf>, username: &str, password: &str) -> Result<()> {
let data_dir = resolve_data_dir(data_dir)?;
let mut store = load_server_store(&data_dir)?;
let username = normalize_username(username)?;
if password.is_empty() {
return Err(anyhow!("密码不能为空"));
}
let now = Utc::now().timestamp();
let salt = random_token();
let user = ServerUser {
username: username.to_string(),
username: username.clone(),
salt: salt.clone(),
password_hash: hash_secret(&salt, password),
created_at: Utc::now().timestamp(),
created_at: now,
};
if let Some(existing) = store
.server
@@ -73,21 +78,21 @@ pub fn add_user(data: Option<PathBuf>, username: &str, password: &str) -> Result
} else {
store.server.users.push(user);
}
store.save_to_path(&data_path, &default_home)?;
save_server_store(&data_dir, &store)?;
ensure_user_state(&data_dir, &username)?;
println!("已添加/更新 server 用户: {username}");
Ok(())
}
pub async fn run_server(bind: String, data: Option<PathBuf>) -> Result<()> {
// Bind defaults to localhost for safety; Docker deployments usually pass
// 0.0.0.0 through compose.
let (data_path, default_home) = resolve_data_path(data)?;
pub async fn run_server(bind: String, data_dir: Option<PathBuf>) -> Result<()> {
let data_dir = resolve_data_dir(data_dir)?;
std::fs::create_dir_all(states_dir(&data_dir))
.with_context(|| format!("创建 states 目录失败: {}", states_dir(&data_dir).display()))?;
let addr: SocketAddr = bind
.parse()
.with_context(|| format!("监听地址无效: {bind}"))?;
let state = AppState {
data_path,
default_home,
data_dir,
lock: Arc::new(Mutex::new(())),
};
let app = Router::new()
@@ -114,12 +119,13 @@ async fn login_handler(
Json(req): Json<LoginRequest>,
) -> Result<Json<LoginResponse>, ApiError> {
let _guard = state.lock.lock().await;
let mut store = Store::load_from_path(&state.data_path, &state.default_home)?;
let mut store = load_server_store(&state.data_dir)?;
let username = normalize_username(&req.username).map_err(ApiError::bad_request)?;
let user = store
.server
.users
.iter()
.find(|user| user.username == req.username)
.find(|user| user.username == username)
.ok_or_else(|| ApiError::unauthorized("用户名或密码错误"))?;
if user.password_hash != hash_secret(&user.salt, &req.password) {
return Err(ApiError::unauthorized("用户名或密码错误"));
@@ -127,45 +133,51 @@ async fn login_handler(
let token = random_token();
store.server.sessions.push(ServerSession {
username: req.username,
username: username.clone(),
token_hash: hash_token(&token),
created_at: Utc::now().timestamp(),
});
store.save_to_path(&state.data_path, &state.default_home)?;
save_server_store(&state.data_dir, &store)?;
ensure_user_state(&state.data_dir, &username)?;
Ok(Json(LoginResponse { token }))
}
async fn get_state_handler(
State(state): State<AppState>,
headers: HeaderMap,
) -> Result<Json<Store>, ApiError> {
) -> Result<Json<AccountSyncState>, ApiError> {
let _guard = state.lock.lock().await;
let store = Store::load_from_path(&state.data_path, &state.default_home)?;
authorize(&store, &headers)?;
Ok(Json(sanitized_for_client(&store)))
let store = load_server_store(&state.data_dir)?;
let username = authorize(&store, &headers)?;
Ok(Json(load_user_state(&state.data_dir, &username)?))
}
async fn put_state_handler(
State(state): State<AppState>,
headers: HeaderMap,
Json(incoming): Json<Store>,
) -> Result<Json<Store>, ApiError> {
Json(incoming): Json<PutStateRequest>,
) -> Result<Json<AccountSyncState>, ApiError> {
let _guard = state.lock.lock().await;
let mut store = Store::load_from_path(&state.data_path, &state.default_home)?;
authorize(&store, &headers)?;
let store = load_server_store(&state.data_dir)?;
let username = authorize(&store, &headers)?;
let current = load_user_state(&state.data_dir, &username)?;
if !incoming.force && incoming.expected_revision != Some(current.revision) {
return Err(ApiError::conflict(format!(
"revision mismatch: expected={:?}, current={}",
incoming.expected_revision, current.revision
)));
}
// Client sync can replace portable state, but server credentials stay server-side.
store.meta = incoming.meta;
store.accounts = incoming.accounts;
store.homes = incoming.homes;
store.sync = Default::default();
store.save_to_path(&state.data_path, &state.default_home)?;
Ok(Json(sanitized_for_client(&store)))
let next = AccountSyncState {
revision: current.revision + 1,
updated_at: Utc::now().timestamp(),
accounts: incoming.state.accounts,
};
save_user_state(&state.data_dir, &username, &next)?;
Ok(Json(next))
}
fn authorize(store: &Store, headers: &HeaderMap) -> Result<(), ApiError> {
// Only token hashes are stored server-side. The raw bearer token is returned
// once during login and then kept by the client in its local cdxs.toml.
fn authorize(store: &ServerStore, headers: &HeaderMap) -> Result<String, ApiError> {
let Some(value) = headers.get("authorization").and_then(|v| v.to_str().ok()) else {
return Err(ApiError::unauthorized("缺少 Authorization header"));
};
@@ -173,42 +185,102 @@ fn authorize(store: &Store, headers: &HeaderMap) -> Result<(), ApiError> {
return Err(ApiError::unauthorized("Authorization 格式无效"));
};
let token_hash = hash_token(token);
if store
store
.server
.sessions
.iter()
.any(|session| session.token_hash == token_hash)
.find(|session| session.token_hash == token_hash)
.map(|session| session.username.clone())
.ok_or_else(|| ApiError::unauthorized("Token 无效"))
}
fn load_server_store(data_dir: &Path) -> Result<ServerStore> {
let path = server_store_path(data_dir);
if !path.exists() {
return Ok(ServerStore::default());
}
let content = std::fs::read_to_string(&path)
.with_context(|| format!("读取 server.toml 失败: {}", path.display()))?;
if content.trim().is_empty() {
return Ok(ServerStore::default());
}
toml::from_str(&content).with_context(|| format!("解析 server.toml 失败: {}", path.display()))
}
fn save_server_store(data_dir: &Path, store: &ServerStore) -> Result<()> {
std::fs::create_dir_all(data_dir)
.with_context(|| format!("创建 server data 目录失败: {}", data_dir.display()))?;
let path = server_store_path(data_dir);
atomic::backup_if_exists(&path, data_dir, "server.toml")?;
let content = toml::to_string_pretty(store).context("序列化 server.toml 失败")?;
atomic::write_atomic(&path, &content)
}
fn ensure_user_state(data_dir: &Path, username: &str) -> Result<()> {
let path = user_state_path(data_dir, username);
if path.exists() {
return Ok(());
}
save_user_state(data_dir, username, &AccountSyncState::default())
}
fn load_user_state(data_dir: &Path, username: &str) -> Result<AccountSyncState> {
let path = user_state_path(data_dir, username);
if !path.exists() {
return Ok(AccountSyncState::default());
}
let content = std::fs::read_to_string(&path)
.with_context(|| format!("读取用户同步状态失败: {}", path.display()))?;
if content.trim().is_empty() {
return Ok(AccountSyncState::default());
}
toml::from_str(&content).with_context(|| format!("解析用户同步状态失败: {}", path.display()))
}
fn save_user_state(data_dir: &Path, username: &str, state: &AccountSyncState) -> Result<()> {
let dir = states_dir(data_dir);
std::fs::create_dir_all(&dir)
.with_context(|| format!("创建 states 目录失败: {}", dir.display()))?;
let path = user_state_path(data_dir, username);
atomic::backup_if_exists(&path, data_dir, &format!("state-{username}.toml"))?;
let content = toml::to_string_pretty(state).context("序列化用户同步状态失败")?;
atomic::write_atomic(&path, &content)
}
fn resolve_data_dir(data_dir: Option<PathBuf>) -> Result<PathBuf> {
if let Some(path) = data_dir {
return Ok(paths::expand_home(path));
}
Ok(paths::codex_home(None)?.join("cdxs-server"))
}
fn server_store_path(data_dir: &Path) -> PathBuf {
data_dir.join("server.toml")
}
fn states_dir(data_dir: &Path) -> PathBuf {
data_dir.join("states")
}
fn user_state_path(data_dir: &Path, username: &str) -> PathBuf {
states_dir(data_dir).join(format!("{username}.toml"))
}
fn normalize_username(username: &str) -> Result<String> {
let username = username.trim();
if username.is_empty() {
return Err(anyhow!("用户名不能为空"));
}
if username
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_')
{
Ok(())
Ok(username.to_string())
} else {
Err(ApiError::unauthorized("Token 无效"))
Err(anyhow!("用户名只能包含字母、数字、- 和 _"))
}
}
fn sanitized_for_client(store: &Store) -> Store {
// Never leak server users, password hashes or issued bearer sessions to
// clients during pull/login responses.
let mut copy = store.clone();
copy.server = Default::default();
copy.sync = Default::default();
copy
}
fn resolve_data_path(data: Option<PathBuf>) -> Result<(PathBuf, PathBuf)> {
// With --data, the parent directory acts as backup/default-home base. Without
// it, the active CODEX_HOME owns the server's cdxs.toml.
if let Some(path) = data {
let path = crate::paths::expand_home(path);
let default_home = path
.parent()
.map(Path::to_path_buf)
.unwrap_or_else(|| PathBuf::from("."));
return Ok((path, default_home));
}
let home = crate::paths::codex_home(None)?;
Ok((crate::paths::config_path(&home), home))
}
fn random_token() -> String {
let mut bytes = [0u8; 32];
rand::thread_rng().fill_bytes(&mut bytes);
@@ -236,12 +308,26 @@ struct ApiError {
}
impl ApiError {
fn bad_request(error: anyhow::Error) -> Self {
Self {
status: StatusCode::BAD_REQUEST,
message: error.to_string(),
}
}
fn unauthorized(message: &str) -> Self {
Self {
status: StatusCode::UNAUTHORIZED,
message: message.to_string(),
}
}
fn conflict(message: String) -> Self {
Self {
status: StatusCode::CONFLICT,
message,
}
}
}
impl From<anyhow::Error> for ApiError {
+163 -24
View File
@@ -8,7 +8,7 @@ use chrono::Utc;
use reqwest::header::{HeaderMap, HeaderValue, AUTHORIZATION};
use serde::{Deserialize, Serialize};
use crate::config_store::Store;
use crate::config_store::{Account, AccountSyncState, Store};
#[derive(Debug, Serialize)]
struct LoginRequest<'a> {
@@ -21,6 +21,13 @@ struct LoginResponse {
token: String,
}
#[derive(Debug, Serialize)]
struct PutStateRequest {
expected_revision: Option<u64>,
force: bool,
state: AccountSyncState,
}
pub async fn login(server: &str, user: &str, password: &str) -> Result<()> {
// Normalize the server URL once so later pull/push can simply append API
// paths without double slashes.
@@ -49,45 +56,55 @@ pub async fn login(server: &str, user: &str, password: &str) -> Result<()> {
store.sync.server_url = Some(server.to_string());
store.sync.username = Some(user.to_string());
store.sync.token = Some(login.token);
store.sync.last_remote_revision = None;
store.save(&home)?;
println!("sync login 成功: {server} ({user})");
Ok(())
}
pub async fn pull() -> Result<()> {
pub async fn pull(force: bool) -> Result<()> {
let home = crate::paths::codex_home(None)?;
let mut local = Store::load(&home)?;
let (server, token) = sync_endpoint(&local)?;
let remote = reqwest::Client::new()
.get(format!("{server}/v1/state"))
.headers(auth_headers(&token)?)
.send()
.await
.context("sync pull 请求失败")?;
let status = remote.status();
let body = remote.text().await.context("读取 sync pull 响应失败")?;
if !status.is_success() {
return Err(anyhow!("sync pull 失败: status={}, body={}", status, body));
let remote_state = fetch_remote_state(&server, &token).await?;
let before = local.accounts.len();
if force {
local.accounts = remote_state.accounts;
} else {
local.accounts = merge_accounts(local.accounts.clone(), remote_state.accounts);
}
let remote_store: Store = serde_json::from_str(&body).context("解析 sync pull 响应失败")?;
// Pull replaces local portable state, but keeps local sync endpoint/token.
local.meta = remote_store.meta;
local.accounts = remote_store.accounts;
local.homes = remote_store.homes;
local.sync.last_remote_revision = Some(remote_state.revision);
local.sync.last_pull_at = Some(Utc::now().timestamp());
local.save(&home)?;
println!("sync pull 完成");
println!(
"sync pull 完成: accounts={} -> {}, revision={}, mode={}",
before,
local.accounts.len(),
remote_state.revision,
if force { "force" } else { "merge" }
);
Ok(())
}
pub async fn push() -> Result<()> {
pub async fn push(force: bool) -> Result<()> {
let home = crate::paths::codex_home(None)?;
let mut local = Store::load(&home)?;
let (server, token) = sync_endpoint(&local)?;
let mut payload = local.clone();
// Do not upload local server users or sync token back into the shared state.
payload.server = Default::default();
payload.sync = Default::default();
if !force {
let remote_state = fetch_remote_state(&server, &token).await?;
let remote_revision = remote_state.revision;
local.accounts = merge_accounts(local.accounts.clone(), remote_state.accounts);
local.sync.last_remote_revision = Some(remote_revision);
}
let payload = PutStateRequest {
expected_revision: local.sync.last_remote_revision.or(Some(0)),
force,
state: AccountSyncState {
revision: local.sync.last_remote_revision.unwrap_or_default(),
updated_at: Utc::now().timestamp(),
accounts: local.accounts.clone(),
},
};
let response = reqwest::Client::new()
.put(format!("{server}/v1/state"))
.headers(auth_headers(&token)?)
@@ -100,9 +117,64 @@ pub async fn push() -> Result<()> {
if !status.is_success() {
return Err(anyhow!("sync push 失败: status={}, body={}", status, body));
}
let remote_state: AccountSyncState =
serde_json::from_str(&body).context("解析 sync push 响应失败")?;
local.sync.last_remote_revision = Some(remote_state.revision);
local.sync.last_push_at = Some(Utc::now().timestamp());
local.save(&home)?;
println!("sync push 完成");
println!(
"sync push 完成: accounts={}, revision={}, mode={}",
local.accounts.len(),
remote_state.revision,
if force { "force" } else { "merge" }
);
Ok(())
}
pub async fn remote(json: bool) -> Result<()> {
let home = crate::paths::codex_home(None)?;
let local = Store::load(&home)?;
let (server, token) = sync_endpoint(&local)?;
let remote_state = fetch_remote_state(&server, &token).await?;
if json {
println!("{}", serde_json::to_string_pretty(&remote_state)?);
return Ok(());
}
println!(
"remote: {} ({}) revision={}",
server,
local.sync.username.as_deref().unwrap_or("-"),
remote_state.revision
);
if remote_state.accounts.is_empty() {
println!("远端账号库为空。");
return Ok(());
}
println!(
"{:<22} {:<34} {:<10} {:<12} {}",
"ID", "Email", "Mode", "Plan", "Quota"
);
for account in &remote_state.accounts {
let quota = account
.quota
.as_ref()
.map(|quota| {
format!(
"5h={}%, weekly={}%",
quota.primary_remaining_percent, quota.secondary_remaining_percent
)
})
.unwrap_or_else(|| "-".to_string());
println!(
"{:<22} {:<34} {:<10} {:<12} {}",
shorten(&account.id, 22),
shorten(&account.email, 34),
account_auth_mode_name(account),
account.plan_type.as_deref().unwrap_or("-"),
quota
);
}
Ok(())
}
@@ -138,6 +210,14 @@ pub fn status() -> Result<()> {
.map(|v| v.to_string())
.unwrap_or_else(|| "-".to_string())
);
println!(
"last_remote_revision: {}",
store
.sync
.last_remote_revision
.map(|v| v.to_string())
.unwrap_or_else(|| "-".to_string())
);
Ok(())
}
@@ -168,3 +248,62 @@ fn auth_headers(token: &str) -> Result<HeaderMap> {
);
Ok(headers)
}
async fn fetch_remote_state(server: &str, token: &str) -> Result<AccountSyncState> {
let remote = reqwest::Client::new()
.get(format!("{server}/v1/state"))
.headers(auth_headers(token)?)
.send()
.await
.context("sync pull 请求失败")?;
let status = remote.status();
let body = remote.text().await.context("读取 sync pull 响应失败")?;
if !status.is_success() {
return Err(anyhow!("sync pull 失败: status={}, body={}", status, body));
}
serde_json::from_str(&body).context("解析 sync pull 响应失败")
}
fn merge_accounts(mut local: Vec<Account>, remote: Vec<Account>) -> Vec<Account> {
for remote_account in remote {
if let Some(local_account) = local
.iter_mut()
.find(|account| account.id == remote_account.id)
{
if remote_account.updated_at > local_account.updated_at {
*local_account = remote_account;
}
} else {
local.push(remote_account);
}
}
local.sort_by(|left, right| {
left.email
.to_ascii_lowercase()
.cmp(&right.email.to_ascii_lowercase())
.then_with(|| left.id.cmp(&right.id))
});
local
}
fn account_auth_mode_name(account: &Account) -> &'static str {
match account.auth_mode {
crate::config_store::AuthMode::Oauth => "oauth",
crate::config_store::AuthMode::ApiKey => "api_key",
}
}
fn shorten(value: &str, width: usize) -> String {
if value.chars().count() <= width {
return value.to_string();
}
if width <= 1 {
return "...".to_string();
}
let mut out = value
.chars()
.take(width.saturating_sub(3))
.collect::<String>();
out.push_str("...");
out
}