mirror of
https://github.com/farion1231/cc-switch.git
synced 2026-06-16 13:34:04 +08:00
perf(proxy): trim per-request hot-path work and db wait
- Guard debug body serialization with `log::log_enabled!`; previously
serialized the filtered body to a throwaway String on every forward,
even with debug logging off.
- Skip SSE parse + UTF-8 buffer loop when no usage collector and debug
is off; the per-chunk `serde_json::from_str::<Value>` ran even in
pure passthrough mode.
- Add cheap per-app SSE event pre-filter (string `contains`) so usage
collectors only parse events that could contain usage (e.g. Claude
`message_start` / `message_delta`).
- Skip non-streaming response body JSON parse when usage logging is
disabled.
- Move `ProviderRouter::record_result` off the success response path
via `tokio::spawn` for non-HalfOpen state; that call internally does
`get_proxy_config_for_app` + `update_provider_health`, two SQLite
ops that previously blocked TTFB.
Also: dedupe `usage_logging_enabled` (was duplicated in handlers.rs)
and merge `SseUsageCollector::{new, new_filtered}` into a single
constructor that takes `Option<StreamUsageEventFilter>`.
This commit is contained in:
@@ -109,6 +109,40 @@ impl RequestForwarder {
|
||||
}
|
||||
}
|
||||
|
||||
async fn record_success_result(
|
||||
&self,
|
||||
provider_id: &str,
|
||||
app_type: &str,
|
||||
used_half_open_permit: bool,
|
||||
) {
|
||||
if used_half_open_permit {
|
||||
if let Err(e) = self
|
||||
.router
|
||||
.record_result(provider_id, app_type, true, true, None)
|
||||
.await
|
||||
{
|
||||
log::warn!(
|
||||
"[{app_type}] 记录 Provider 成功结果失败: provider_id={provider_id}, error={e}"
|
||||
);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
let router = self.router.clone();
|
||||
let provider_id = provider_id.to_string();
|
||||
let app_type = app_type.to_string();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = router
|
||||
.record_result(&provider_id, &app_type, false, true, None)
|
||||
.await
|
||||
{
|
||||
log::warn!(
|
||||
"[{app_type}] 异步记录 Provider 成功结果失败: provider_id={provider_id}, error={e}"
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// 转发请求(带故障转移)
|
||||
///
|
||||
/// # Arguments
|
||||
@@ -207,16 +241,9 @@ impl RequestForwarder {
|
||||
.await
|
||||
{
|
||||
Ok((response, claude_api_format)) => {
|
||||
// 成功:记录成功并更新熔断器
|
||||
let _ = self
|
||||
.router
|
||||
.record_result(
|
||||
&provider.id,
|
||||
app_type_str,
|
||||
used_half_open_permit,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
// 成功:普通闭合熔断状态异步记录,避免阻塞流式首包返回;
|
||||
// HalfOpen 探测仍同步等待,保证 permit 与熔断状态及时释放。
|
||||
self.record_success_result(&provider.id, app_type_str, used_half_open_permit)
|
||||
.await;
|
||||
|
||||
// 更新当前应用类型使用的 provider
|
||||
@@ -339,17 +366,12 @@ impl RequestForwarder {
|
||||
{
|
||||
Ok((response, claude_api_format)) => {
|
||||
log::info!("[{app_type_str}] [RECT-002] 整流重试成功");
|
||||
// 记录成功
|
||||
let _ = self
|
||||
.router
|
||||
.record_result(
|
||||
&provider.id,
|
||||
app_type_str,
|
||||
used_half_open_permit,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
self.record_success_result(
|
||||
&provider.id,
|
||||
app_type_str,
|
||||
used_half_open_permit,
|
||||
)
|
||||
.await;
|
||||
|
||||
// 更新当前应用类型使用的 provider
|
||||
{
|
||||
@@ -539,16 +561,12 @@ impl RequestForwarder {
|
||||
{
|
||||
Ok((response, claude_api_format)) => {
|
||||
log::info!("[{app_type_str}] [RECT-011] budget 整流重试成功");
|
||||
let _ = self
|
||||
.router
|
||||
.record_result(
|
||||
&provider.id,
|
||||
app_type_str,
|
||||
used_half_open_permit,
|
||||
true,
|
||||
None,
|
||||
)
|
||||
.await;
|
||||
self.record_success_result(
|
||||
&provider.id,
|
||||
app_type_str,
|
||||
used_half_open_permit,
|
||||
)
|
||||
.await;
|
||||
|
||||
{
|
||||
let mut current_providers =
|
||||
@@ -1415,12 +1433,14 @@ impl RequestForwarder {
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("<none>");
|
||||
log::info!("[{tag}] >>> 请求 URL: {url} (model={request_model})");
|
||||
if let Ok(body_str) = serde_json::to_string(&filtered_body) {
|
||||
log::debug!(
|
||||
"[{tag}] >>> 请求体内容 ({}字节): {}",
|
||||
body_str.len(),
|
||||
body_str
|
||||
);
|
||||
if log::log_enabled!(log::Level::Debug) {
|
||||
if let Ok(body_str) = serde_json::to_string(&filtered_body) {
|
||||
log::debug!(
|
||||
"[{tag}] >>> 请求体内容 ({}字节): {}",
|
||||
body_str.len(),
|
||||
body_str
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// 确定超时
|
||||
|
||||
@@ -14,6 +14,12 @@ pub type ResponseUsageParser = fn(&Value) -> Option<TokenUsage>;
|
||||
/// 参数: (流式事件列表, 请求中的模型名称) -> 最终使用的模型名称
|
||||
pub type StreamModelExtractor = fn(&[Value], &str) -> String;
|
||||
|
||||
/// 流式 usage 事件预过滤器类型别名。
|
||||
///
|
||||
/// 参数是 SSE `data:` 原始字符串。返回 false 时跳过 JSON parse,避免在
|
||||
/// token/chunk 高频路径上解析与 usage 无关的事件。
|
||||
pub type StreamUsageEventFilter = fn(&str) -> bool;
|
||||
|
||||
/// 各 API 的使用量解析配置
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct UsageParserConfig {
|
||||
@@ -23,10 +29,32 @@ pub struct UsageParserConfig {
|
||||
pub response_parser: ResponseUsageParser,
|
||||
/// 流式响应中的模型提取器
|
||||
pub model_extractor: StreamModelExtractor,
|
||||
/// 流式 usage 事件预过滤器
|
||||
pub stream_event_filter: Option<StreamUsageEventFilter>,
|
||||
/// 应用类型字符串(用于日志记录)
|
||||
pub app_type_str: &'static str,
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// 流式 usage 事件预过滤
|
||||
// ============================================================================
|
||||
|
||||
pub fn claude_stream_usage_event_filter(data: &str) -> bool {
|
||||
data.contains("\"message_start\"") || data.contains("\"message_delta\"")
|
||||
}
|
||||
|
||||
fn openai_stream_usage_event_filter(data: &str) -> bool {
|
||||
data.contains("\"usage\"")
|
||||
}
|
||||
|
||||
fn codex_stream_usage_event_filter(data: &str) -> bool {
|
||||
data.contains("\"response.completed\"") || data.contains("\"usage\"")
|
||||
}
|
||||
|
||||
fn gemini_stream_usage_event_filter(data: &str) -> bool {
|
||||
data.contains("\"usageMetadata\"")
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// 模型提取器实现
|
||||
// ============================================================================
|
||||
@@ -104,6 +132,7 @@ pub const CLAUDE_PARSER_CONFIG: UsageParserConfig = UsageParserConfig {
|
||||
stream_parser: TokenUsage::from_claude_stream_events,
|
||||
response_parser: TokenUsage::from_claude_response,
|
||||
model_extractor: claude_model_extractor,
|
||||
stream_event_filter: Some(claude_stream_usage_event_filter),
|
||||
app_type_str: "claude",
|
||||
};
|
||||
|
||||
@@ -112,6 +141,7 @@ pub const OPENAI_PARSER_CONFIG: UsageParserConfig = UsageParserConfig {
|
||||
stream_parser: TokenUsage::from_openai_stream_events,
|
||||
response_parser: TokenUsage::from_openai_response,
|
||||
model_extractor: openai_model_extractor,
|
||||
stream_event_filter: Some(openai_stream_usage_event_filter),
|
||||
app_type_str: "codex",
|
||||
};
|
||||
|
||||
@@ -120,6 +150,7 @@ pub const CODEX_PARSER_CONFIG: UsageParserConfig = UsageParserConfig {
|
||||
stream_parser: TokenUsage::from_codex_stream_events_auto,
|
||||
response_parser: TokenUsage::from_codex_response_auto,
|
||||
model_extractor: codex_auto_model_extractor,
|
||||
stream_event_filter: Some(codex_stream_usage_event_filter),
|
||||
app_type_str: "codex",
|
||||
};
|
||||
|
||||
@@ -128,6 +159,7 @@ pub const GEMINI_PARSER_CONFIG: UsageParserConfig = UsageParserConfig {
|
||||
stream_parser: TokenUsage::from_gemini_stream_chunks,
|
||||
response_parser: TokenUsage::from_gemini_response,
|
||||
model_extractor: gemini_model_extractor,
|
||||
stream_event_filter: Some(gemini_stream_usage_event_filter),
|
||||
app_type_str: "gemini",
|
||||
};
|
||||
|
||||
|
||||
@@ -10,7 +10,8 @@
|
||||
use super::{
|
||||
error_mapper::{get_error_message, map_proxy_error_to_status},
|
||||
handler_config::{
|
||||
CLAUDE_PARSER_CONFIG, CODEX_PARSER_CONFIG, GEMINI_PARSER_CONFIG, OPENAI_PARSER_CONFIG,
|
||||
claude_stream_usage_event_filter, CLAUDE_PARSER_CONFIG, CODEX_PARSER_CONFIG,
|
||||
GEMINI_PARSER_CONFIG, OPENAI_PARSER_CONFIG,
|
||||
},
|
||||
handler_context::RequestContext,
|
||||
providers::{
|
||||
@@ -22,7 +23,7 @@ use super::{
|
||||
response_processor::{
|
||||
create_logged_passthrough_stream, process_response, read_decoded_body,
|
||||
strip_entity_headers_for_rebuilt_body, strip_hop_by_hop_response_headers,
|
||||
SseUsageCollector,
|
||||
usage_logging_enabled, SseUsageCollector,
|
||||
},
|
||||
server::ProxyState,
|
||||
sse::{strip_sse_field, take_sse_block},
|
||||
@@ -270,8 +271,8 @@ async fn handle_claude_transform(
|
||||
Box::new(Box::pin(create_anthropic_sse_stream(stream)))
|
||||
};
|
||||
|
||||
// 创建使用量收集器
|
||||
let usage_collector = {
|
||||
// 创建使用量收集器;关闭 usage logging 时不要再解析转换后的 SSE。
|
||||
let usage_collector = if usage_logging_enabled(state) {
|
||||
let state = state.clone();
|
||||
let provider_id = ctx.provider.id.clone();
|
||||
let model = ctx.request_model.clone();
|
||||
@@ -279,34 +280,40 @@ async fn handle_claude_transform(
|
||||
let start_time = ctx.start_time;
|
||||
let session_id = ctx.session_id.clone();
|
||||
|
||||
SseUsageCollector::new(start_time, move |events, first_token_ms| {
|
||||
if let Some(usage) = TokenUsage::from_claude_stream_events(&events) {
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let model = model.clone();
|
||||
let session_id = session_id.clone();
|
||||
Some(SseUsageCollector::new(
|
||||
start_time,
|
||||
Some(claude_stream_usage_event_filter),
|
||||
move |events, first_token_ms| {
|
||||
if let Some(usage) = TokenUsage::from_claude_stream_events(&events) {
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let model = model.clone();
|
||||
let session_id = session_id.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
log_usage(
|
||||
&state,
|
||||
&provider_id,
|
||||
"claude",
|
||||
&model,
|
||||
&model,
|
||||
usage,
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true,
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
} else {
|
||||
log::debug!("[Claude] OpenRouter 流式响应缺少 usage 统计,跳过消费记录");
|
||||
}
|
||||
})
|
||||
tokio::spawn(async move {
|
||||
log_usage(
|
||||
&state,
|
||||
&provider_id,
|
||||
"claude",
|
||||
&model,
|
||||
&model,
|
||||
usage,
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true,
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
} else {
|
||||
log::debug!("[Claude] OpenRouter 流式响应缺少 usage 统计,跳过消费记录");
|
||||
}
|
||||
},
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// 获取流式超时配置
|
||||
@@ -315,7 +322,7 @@ async fn handle_claude_transform(
|
||||
let logged_stream = create_logged_passthrough_stream(
|
||||
sse_stream,
|
||||
"Claude/OpenRouter",
|
||||
Some(usage_collector),
|
||||
usage_collector,
|
||||
timeout_config,
|
||||
);
|
||||
|
||||
@@ -798,6 +805,10 @@ async fn log_usage(
|
||||
) {
|
||||
use super::usage::logger::UsageLogger;
|
||||
|
||||
if !usage_logging_enabled(state) {
|
||||
return;
|
||||
}
|
||||
|
||||
let logger = UsageLogger::new(&state.db);
|
||||
|
||||
let (multiplier, pricing_model_source) =
|
||||
|
||||
@@ -3,7 +3,7 @@
|
||||
//! 统一处理流式和非流式 API 响应
|
||||
|
||||
use super::{
|
||||
handler_config::UsageParserConfig,
|
||||
handler_config::{StreamUsageEventFilter, UsageParserConfig},
|
||||
handler_context::{RequestContext, StreamingTimeoutConfig},
|
||||
hyper_client::ProxyResponse,
|
||||
server::ProxyState,
|
||||
@@ -211,7 +211,7 @@ pub async fn handle_streaming(
|
||||
// 创建字节流
|
||||
let stream = response.bytes_stream();
|
||||
|
||||
// 创建使用量收集器
|
||||
// 创建使用量收集器;关闭 usage logging 时不要在流式热路径上解析每个 SSE event。
|
||||
let usage_collector = create_usage_collector(ctx, state, status.as_u16(), parser_config);
|
||||
|
||||
// 获取流式超时配置
|
||||
@@ -219,7 +219,7 @@ pub async fn handle_streaming(
|
||||
|
||||
// 创建带日志和超时的透传流
|
||||
let logged_stream =
|
||||
create_logged_passthrough_stream(stream, ctx.tag, Some(usage_collector), timeout_config);
|
||||
create_logged_passthrough_stream(stream, ctx.tag, usage_collector, timeout_config);
|
||||
|
||||
let body = axum::body::Body::from_stream(logged_stream);
|
||||
match builder.body(body) {
|
||||
@@ -255,63 +255,67 @@ pub async fn handle_non_streaming(
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
);
|
||||
|
||||
// 解析并记录使用量
|
||||
if let Ok(json_value) = serde_json::from_slice::<Value>(&body_bytes) {
|
||||
// 解析使用量
|
||||
if let Some(usage) = (parser_config.response_parser)(&json_value) {
|
||||
// 优先使用 usage 中解析出的模型名称,其次使用响应中的 model 字段,最后回退到请求模型
|
||||
let model = if let Some(ref m) = usage.model {
|
||||
m.clone()
|
||||
} else if let Some(m) = json_value.get("model").and_then(|m| m.as_str()) {
|
||||
m.to_string()
|
||||
} else {
|
||||
ctx.request_model.clone()
|
||||
};
|
||||
// 解析并记录使用量。关闭 usage logging 时直接跳过,避免非流式响应整包 JSON parse。
|
||||
if usage_logging_enabled(state) {
|
||||
if let Ok(json_value) = serde_json::from_slice::<Value>(&body_bytes) {
|
||||
// 解析使用量
|
||||
if let Some(usage) = (parser_config.response_parser)(&json_value) {
|
||||
// 优先使用 usage 中解析出的模型名称,其次使用响应中的 model 字段,最后回退到请求模型
|
||||
let model = if let Some(ref m) = usage.model {
|
||||
m.clone()
|
||||
} else if let Some(m) = json_value.get("model").and_then(|m| m.as_str()) {
|
||||
m.to_string()
|
||||
} else {
|
||||
ctx.request_model.clone()
|
||||
};
|
||||
|
||||
spawn_log_usage(
|
||||
state,
|
||||
ctx,
|
||||
usage,
|
||||
&model,
|
||||
&ctx.request_model,
|
||||
status.as_u16(),
|
||||
false,
|
||||
);
|
||||
spawn_log_usage(
|
||||
state,
|
||||
ctx,
|
||||
usage,
|
||||
&model,
|
||||
&ctx.request_model,
|
||||
status.as_u16(),
|
||||
false,
|
||||
);
|
||||
} else {
|
||||
let model = json_value
|
||||
.get("model")
|
||||
.and_then(|m| m.as_str())
|
||||
.unwrap_or(&ctx.request_model)
|
||||
.to_string();
|
||||
spawn_log_usage(
|
||||
state,
|
||||
ctx,
|
||||
TokenUsage::default(),
|
||||
&model,
|
||||
&ctx.request_model,
|
||||
status.as_u16(),
|
||||
false,
|
||||
);
|
||||
log::debug!(
|
||||
"[{}] 未能解析 usage 信息,跳过记录",
|
||||
parser_config.app_type_str
|
||||
);
|
||||
}
|
||||
} else {
|
||||
let model = json_value
|
||||
.get("model")
|
||||
.and_then(|m| m.as_str())
|
||||
.unwrap_or(&ctx.request_model)
|
||||
.to_string();
|
||||
log::debug!(
|
||||
"[{}] <<< 响应 (非 JSON): {} bytes",
|
||||
ctx.tag,
|
||||
body_bytes.len()
|
||||
);
|
||||
spawn_log_usage(
|
||||
state,
|
||||
ctx,
|
||||
TokenUsage::default(),
|
||||
&model,
|
||||
&ctx.request_model,
|
||||
&ctx.request_model,
|
||||
status.as_u16(),
|
||||
false,
|
||||
);
|
||||
log::debug!(
|
||||
"[{}] 未能解析 usage 信息,跳过记录",
|
||||
parser_config.app_type_str
|
||||
);
|
||||
}
|
||||
} else {
|
||||
log::debug!(
|
||||
"[{}] <<< 响应 (非 JSON): {} bytes",
|
||||
ctx.tag,
|
||||
body_bytes.len()
|
||||
);
|
||||
spawn_log_usage(
|
||||
state,
|
||||
ctx,
|
||||
TokenUsage::default(),
|
||||
&ctx.request_model,
|
||||
&ctx.request_model,
|
||||
status.as_u16(),
|
||||
false,
|
||||
);
|
||||
log::debug!("[{}] usage logging 已关闭,跳过非流式 usage 解析", ctx.tag);
|
||||
}
|
||||
|
||||
// 构建响应
|
||||
@@ -360,13 +364,15 @@ struct SseUsageCollectorInner {
|
||||
first_event_time: Mutex<Option<std::time::Instant>>,
|
||||
start_time: std::time::Instant,
|
||||
on_complete: UsageCallbackWithTiming,
|
||||
should_collect: Option<StreamUsageEventFilter>,
|
||||
finished: AtomicBool,
|
||||
}
|
||||
|
||||
impl SseUsageCollector {
|
||||
/// 创建新的使用量收集器
|
||||
/// 创建使用量收集器;`should_collect` 用来在 hot path 跳过与 usage 无关的事件。
|
||||
pub fn new(
|
||||
start_time: std::time::Instant,
|
||||
should_collect: Option<StreamUsageEventFilter>,
|
||||
callback: impl Fn(Vec<Value>, Option<u64>) + Send + Sync + 'static,
|
||||
) -> Self {
|
||||
let on_complete: UsageCallbackWithTiming = Arc::new(callback);
|
||||
@@ -376,11 +382,19 @@ impl SseUsageCollector {
|
||||
first_event_time: Mutex::new(None),
|
||||
start_time,
|
||||
on_complete,
|
||||
should_collect,
|
||||
finished: AtomicBool::new(false),
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn should_collect(&self, data: &str) -> bool {
|
||||
self.inner
|
||||
.should_collect
|
||||
.map(|filter| filter(data))
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// 推送 SSE 事件
|
||||
pub async fn push(&self, event: Value) {
|
||||
// 记录首个事件时间
|
||||
@@ -424,12 +438,16 @@ fn create_usage_collector(
|
||||
state: &ProxyState,
|
||||
status_code: u16,
|
||||
parser_config: &UsageParserConfig,
|
||||
) -> SseUsageCollector {
|
||||
) -> Option<SseUsageCollector> {
|
||||
let logging_enabled = state
|
||||
.config
|
||||
.try_read()
|
||||
.map(|c| c.enable_logging)
|
||||
.unwrap_or(true);
|
||||
if !logging_enabled {
|
||||
return None;
|
||||
}
|
||||
|
||||
let state = state.clone();
|
||||
let provider_id = ctx.provider.id.clone();
|
||||
let request_model = ctx.request_model.clone();
|
||||
@@ -440,62 +458,63 @@ fn create_usage_collector(
|
||||
let model_extractor = parser_config.model_extractor;
|
||||
let session_id = ctx.session_id.clone();
|
||||
|
||||
SseUsageCollector::new(start_time, move |events, first_token_ms| {
|
||||
if !logging_enabled {
|
||||
return;
|
||||
}
|
||||
if let Some(usage) = stream_parser(&events) {
|
||||
let model = model_extractor(&events, &request_model);
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
Some(SseUsageCollector::new(
|
||||
start_time,
|
||||
parser_config.stream_event_filter,
|
||||
move |events, first_token_ms| {
|
||||
if let Some(usage) = stream_parser(&events) {
|
||||
let model = model_extractor(&events, &request_model);
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let session_id = session_id.clone();
|
||||
let request_model = request_model.clone();
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let session_id = session_id.clone();
|
||||
let request_model = request_model.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
log_usage_internal(
|
||||
&state,
|
||||
&provider_id,
|
||||
app_type_str,
|
||||
&model,
|
||||
&request_model,
|
||||
usage,
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true, // is_streaming
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
} else {
|
||||
let model = model_extractor(&events, &request_model);
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let session_id = session_id.clone();
|
||||
let request_model = request_model.clone();
|
||||
tokio::spawn(async move {
|
||||
log_usage_internal(
|
||||
&state,
|
||||
&provider_id,
|
||||
app_type_str,
|
||||
&model,
|
||||
&request_model,
|
||||
usage,
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true, // is_streaming
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
} else {
|
||||
let model = model_extractor(&events, &request_model);
|
||||
let latency_ms = start_time.elapsed().as_millis() as u64;
|
||||
let state = state.clone();
|
||||
let provider_id = provider_id.clone();
|
||||
let session_id = session_id.clone();
|
||||
let request_model = request_model.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
log_usage_internal(
|
||||
&state,
|
||||
&provider_id,
|
||||
app_type_str,
|
||||
&model,
|
||||
&request_model,
|
||||
TokenUsage::default(),
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true, // is_streaming
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
log::debug!("[{tag}] 流式响应缺少 usage 统计,跳过消费记录");
|
||||
}
|
||||
})
|
||||
tokio::spawn(async move {
|
||||
log_usage_internal(
|
||||
&state,
|
||||
&provider_id,
|
||||
app_type_str,
|
||||
&model,
|
||||
&request_model,
|
||||
TokenUsage::default(),
|
||||
latency_ms,
|
||||
first_token_ms,
|
||||
true, // is_streaming
|
||||
status_code,
|
||||
Some(session_id),
|
||||
)
|
||||
.await;
|
||||
});
|
||||
log::debug!("[{tag}] 流式响应缺少 usage 统计,跳过消费记录");
|
||||
}
|
||||
},
|
||||
))
|
||||
}
|
||||
|
||||
/// 异步记录使用量
|
||||
@@ -541,6 +560,14 @@ fn spawn_log_usage(
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn usage_logging_enabled(state: &ProxyState) -> bool {
|
||||
state
|
||||
.config
|
||||
.try_read()
|
||||
.map(|config| config.enable_logging)
|
||||
.unwrap_or(true)
|
||||
}
|
||||
|
||||
/// 内部使用量记录函数
|
||||
#[allow(clippy::too_many_arguments)]
|
||||
async fn log_usage_internal(
|
||||
@@ -609,6 +636,8 @@ pub fn create_logged_passthrough_stream(
|
||||
let mut buffer = String::new();
|
||||
let mut utf8_remainder: Vec<u8> = Vec::new();
|
||||
let mut collector = usage_collector;
|
||||
let inspect_sse_events =
|
||||
collector.is_some() || log::log_enabled!(log::Level::Debug);
|
||||
let mut is_first_chunk = true;
|
||||
|
||||
// 超时配置
|
||||
@@ -659,25 +688,36 @@ pub fn create_logged_passthrough_stream(
|
||||
);
|
||||
}
|
||||
is_first_chunk = false;
|
||||
crate::proxy::sse::append_utf8_safe(&mut buffer, &mut utf8_remainder, &bytes);
|
||||
if inspect_sse_events {
|
||||
crate::proxy::sse::append_utf8_safe(&mut buffer, &mut utf8_remainder, &bytes);
|
||||
|
||||
// 尝试解析并记录完整的 SSE 事件
|
||||
while let Some(event_text) = take_sse_block(&mut buffer) {
|
||||
if !event_text.trim().is_empty() {
|
||||
// 提取 data 部分并尝试解析为 JSON
|
||||
for line in event_text.lines() {
|
||||
if let Some(data) = strip_sse_field(line, "data") {
|
||||
if data.trim() != "[DONE]" {
|
||||
if let Ok(json_value) = serde_json::from_str::<Value>(data) {
|
||||
if let Some(c) = &collector {
|
||||
c.push(json_value.clone()).await;
|
||||
// 尝试解析并记录完整的 SSE 事件
|
||||
while let Some(event_text) = take_sse_block(&mut buffer) {
|
||||
if !event_text.trim().is_empty() {
|
||||
// 提取 data 部分;只有 usage collector 存在时才解析 JSON。
|
||||
for line in event_text.lines() {
|
||||
if let Some(data) = strip_sse_field(line, "data") {
|
||||
if data.trim() != "[DONE]" {
|
||||
let collected = match &collector {
|
||||
Some(c) if c.should_collect(data) => {
|
||||
match serde_json::from_str::<Value>(data) {
|
||||
Ok(json_value) => {
|
||||
c.push(json_value).await;
|
||||
true
|
||||
}
|
||||
Err(_) => false,
|
||||
}
|
||||
}
|
||||
_ => false,
|
||||
};
|
||||
if collected {
|
||||
log::debug!("[{tag}] <<< SSE 事件: {data}");
|
||||
} else {
|
||||
log::debug!("[{tag}] <<< SSE 数据: {data}");
|
||||
}
|
||||
log::debug!("[{tag}] <<< SSE 事件: {data}");
|
||||
} else {
|
||||
log::debug!("[{tag}] <<< SSE 数据: {data}");
|
||||
log::debug!("[{tag}] <<< SSE: [DONE]");
|
||||
}
|
||||
} else {
|
||||
log::debug!("[{tag}] <<< SSE: [DONE]");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user