Files
cdxs/src/quota.rs
T
2026-06-08 15:49:00 +08:00

334 lines
11 KiB
Rust

//! Codex usage quota lookup.
//!
//! Quota data is available only for OAuth-backed ChatGPT/Codex accounts. API
//! key accounts are intentionally rejected because they do not carry the
//! ChatGPT account context used by the usage endpoint.
use anyhow::{anyhow, Context, Result};
use chrono::Utc;
use reqwest::header::{HeaderMap, HeaderValue, ACCEPT, AUTHORIZATION};
use serde::{Deserialize, Serialize};
use crate::config_store::{AuthMode, Quota, Store};
const USAGE_URL: &str = "https://chatgpt.com/backend-api/wham/usage";
#[derive(Debug, Clone, Serialize, Deserialize)]
struct WindowInfo {
used_percent: Option<i32>,
limit_window_seconds: Option<i64>,
reset_after_seconds: Option<i64>,
reset_at: Option<i64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct RateLimitInfo {
primary_window: Option<WindowInfo>,
secondary_window: Option<WindowInfo>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
struct UsageResponse {
#[serde(default, deserialize_with = "deserialize_optional_plan_type")]
plan_type: Option<Option<String>>,
rate_limit: Option<RateLimitInfo>,
}
pub struct QuotaRefreshReport {
pub errors: Vec<(String, String)>,
pub changed: bool,
}
#[derive(Clone)]
struct QuotaFetchInput {
id: String,
access_token: String,
account_id: Option<String>,
}
pub async fn refresh_stale_quotas(
store: &mut Store,
ids: &[String],
max_age_seconds: i64,
concurrency: usize,
) -> QuotaRefreshReport {
refresh_quotas(store, ids, Some(max_age_seconds), concurrency).await
}
async fn refresh_quotas(
store: &mut Store,
ids: &[String],
max_age_seconds: Option<i64>,
concurrency: usize,
) -> QuotaRefreshReport {
let mut inputs = Vec::new();
let mut errors = Vec::<(String, String)>::new();
let mut changed = false;
for id in ids {
match prepare_quota_fetch(store, id, max_age_seconds).await {
Ok((Some(input), token_changed)) => {
changed |= token_changed;
inputs.push(input);
}
Ok((None, token_changed)) => changed |= token_changed,
Err(error) => {
changed = true;
errors.push((id.clone(), error.to_string()));
}
}
}
let mut retry_ids = Vec::new();
for (id, result) in fetch_quotas_concurrently(inputs, concurrency).await {
match result {
Ok(quota) => {
apply_quota_result(store, &id, quota);
changed = true;
}
Err(error) if should_retry_with_refresh(&error.to_string()) => retry_ids.push(id),
Err(error) => errors.push((id, error.to_string())),
}
}
for id in retry_ids {
match refresh_one_quota(store, &id).await {
Ok(()) => changed = true,
Err(error) => errors.push((id, error.to_string())),
}
}
QuotaRefreshReport { errors, changed }
}
async fn prepare_quota_fetch(
store: &mut Store,
account_id: &str,
max_age_seconds: Option<i64>,
) -> Result<(Option<QuotaFetchInput>, bool)> {
// Quota requests need a fresh access token and, when available, the
// ChatGPT-Account-Id header for multi-account organizations.
let account = store
.find_account(account_id)
.ok_or_else(|| anyhow!("账号不存在: {account_id}"))?;
if account.auth_mode != AuthMode::Oauth {
return if max_age_seconds.is_some() {
Ok((None, false))
} else {
Err(anyhow!("API Key 账号不支持 Codex OAuth 配额查询"))
};
}
if let Some(max_age_seconds) = max_age_seconds {
let now = Utc::now().timestamp();
let fresh = account
.quota
.as_ref()
.map(|quota| now - quota.updated_at < max_age_seconds)
.unwrap_or(false);
if fresh {
return Ok((None, false));
}
}
let token_changed = crate::token::refresh_account_if_needed(store, account_id).await?;
let account = store
.find_account(account_id)
.ok_or_else(|| anyhow!("账号不存在: {account_id}"))?
.clone();
let tokens = account
.tokens
.as_ref()
.ok_or_else(|| anyhow!("OAuth 账号缺少 tokens: {account_id}"))?;
Ok((
Some(QuotaFetchInput {
id: account_id.to_string(),
access_token: tokens.access_token.clone(),
account_id: account.account_id.clone(),
}),
token_changed,
))
}
async fn fetch_quotas_concurrently(
inputs: Vec<QuotaFetchInput>,
concurrency: usize,
) -> Vec<(String, Result<FetchQuotaResult>)> {
let concurrency = concurrency.max(1);
let mut results = Vec::new();
for chunk in inputs.chunks(concurrency) {
let mut handles = Vec::new();
for input in chunk.iter().cloned() {
handles.push(tokio::spawn(async move {
let id = input.id;
let result =
fetch_quota(input.access_token.as_str(), input.account_id.as_deref()).await;
(id, result)
}));
}
for handle in handles {
match handle.await {
Ok(result) => results.push(result),
Err(error) => results.push((
"<unknown>".to_string(),
Err(anyhow!("quota task join failed: {error}")),
)),
}
}
}
results
}
async fn refresh_one_quota(store: &mut Store, account_id: &str) -> Result<()> {
crate::token::refresh_account_if_needed(store, account_id).await?;
let account = store
.find_account(account_id)
.ok_or_else(|| anyhow!("账号不存在: {account_id}"))?
.clone();
if account.auth_mode != AuthMode::Oauth {
return Err(anyhow!("API Key 账号不支持 Codex OAuth 配额查询"));
}
let tokens = account
.tokens
.as_ref()
.ok_or_else(|| anyhow!("OAuth 账号缺少 tokens: {account_id}"))?;
let result = fetch_quota(tokens.access_token.as_str(), account.account_id.as_deref()).await;
let quota = match result {
Ok(value) => value,
Err(error) if should_retry_with_refresh(&error.to_string()) => {
crate::token::refresh_account(store, account_id).await?;
let refreshed = store
.find_account(account_id)
.ok_or_else(|| anyhow!("账号不存在: {account_id}"))?;
let tokens = refreshed
.tokens
.as_ref()
.ok_or_else(|| anyhow!("OAuth 账号缺少 tokens: {account_id}"))?;
fetch_quota(
tokens.access_token.as_str(),
refreshed.account_id.as_deref(),
)
.await?
}
Err(error) => return Err(error),
};
apply_quota_result(store, account_id, quota);
Ok(())
}
fn apply_quota_result(store: &mut Store, account_id: &str, quota: FetchQuotaResult) {
let account = store
.find_account_mut(account_id)
.expect("quota result references an existing account");
if let Some(plan) = quota.plan_type {
account.plan_type = plan;
}
account.quota = Some(quota.quota);
account.updated_at = Utc::now().timestamp();
}
struct FetchQuotaResult {
quota: Quota,
plan_type: Option<Option<String>>,
}
async fn fetch_quota(access_token: &str, account_id: Option<&str>) -> Result<FetchQuotaResult> {
let mut headers = HeaderMap::new();
headers.insert(
AUTHORIZATION,
HeaderValue::from_str(&format!("Bearer {access_token}"))
.context("构建 Authorization 头失败")?,
);
headers.insert(ACCEPT, HeaderValue::from_static("application/json"));
if let Some(account_id) = account_id.filter(|value| !value.trim().is_empty()) {
// ChatGPT may select the wrong account without this header when the
// token has access to multiple accounts.
headers.insert(
"ChatGPT-Account-Id",
HeaderValue::from_str(account_id).context("构建 ChatGPT-Account-Id 头失败")?,
);
}
let response = crate::http_client::client()?
.get(USAGE_URL)
.headers(headers)
.send()
.await
.context("配额请求失败")?;
let status = response.status();
let body = response.text().await.context("读取配额响应失败")?;
if !status.is_success() {
return Err(anyhow!(
"配额接口错误: status={}, body_len={}",
status,
body.len()
));
}
let usage: UsageResponse = serde_json::from_str(&body).context("解析配额响应失败")?;
Ok(FetchQuotaResult {
quota: parse_quota(&usage),
plan_type: usage.plan_type,
})
}
fn parse_quota(usage: &UsageResponse) -> Quota {
// The API reports used percentage. cdxs stores remaining percentage because
// that is what the account list displays.
let primary = usage
.rate_limit
.as_ref()
.and_then(|rate| rate.primary_window.as_ref());
let secondary = usage
.rate_limit
.as_ref()
.and_then(|rate| rate.secondary_window.as_ref());
Quota {
primary_remaining_percent: primary.map(remaining_percent).unwrap_or(100),
primary_reset_time: primary.and_then(reset_time),
secondary_remaining_percent: secondary.map(remaining_percent).unwrap_or(100),
secondary_reset_time: secondary.and_then(reset_time),
updated_at: Utc::now().timestamp(),
}
}
fn deserialize_optional_plan_type<'de, D>(
deserializer: D,
) -> Result<Option<Option<String>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let value = Option::<serde_json::Value>::deserialize(deserializer)?;
Ok(match value {
None => None,
Some(serde_json::Value::Null) => Some(None),
Some(serde_json::Value::String(plan)) => Some(Some(plan)),
Some(value) => {
return Err(serde::de::Error::custom(format!(
"expected plan_type to be string or null, got {value}"
)))
}
})
}
fn remaining_percent(window: &WindowInfo) -> i32 {
100 - window.used_percent.unwrap_or(0).clamp(0, 100)
}
fn reset_time(window: &WindowInfo) -> Option<i64> {
window.reset_at.or_else(|| {
window
.reset_after_seconds
.filter(|seconds| *seconds >= 0)
.map(|seconds| Utc::now().timestamp() + seconds)
})
}
fn should_retry_with_refresh(message: &str) -> bool {
// Some invalid-token responses arrive as body text rather than structured
// errors, so this helper intentionally matches conservatively on text.
let lower = message.to_ascii_lowercase();
lower.contains("401")
|| lower.contains("token_invalidated")
|| lower.contains("authentication token has been invalidated")
}