mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-02-02 20:40:52 +08:00
fix(logging): capture streaming TTFB on first chunk and make timestamps required
- Add firstChunkTimestamp field to ResponseWriterWrapper for sync capture - Capture TTFB in Write() and WriteString() before async channel send - Add SetFirstChunkTimestamp() to StreamingLogWriter interface - Make requestTimestamp/apiResponseTimestamp required in LogRequest() - Remove timestamp capture from WriteAPIResponse() (now via setter) - Fix Gemini handler to set API_RESPONSE_TIMESTAMP before writing response This ensures accurate TTFB measurement for all streaming API formats (OpenAI, Gemini, Claude) by capturing timestamp synchronously when the first response chunk arrives, not when the stream finalizes.
This commit is contained in:
@@ -28,16 +28,17 @@ type RequestInfo struct {
|
|||||||
// It is designed to handle both standard and streaming responses, ensuring that logging operations do not block the client response.
|
// It is designed to handle both standard and streaming responses, ensuring that logging operations do not block the client response.
|
||||||
type ResponseWriterWrapper struct {
|
type ResponseWriterWrapper struct {
|
||||||
gin.ResponseWriter
|
gin.ResponseWriter
|
||||||
body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses.
|
body *bytes.Buffer // body is a buffer to store the response body for non-streaming responses.
|
||||||
isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream).
|
isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream).
|
||||||
streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries.
|
streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries.
|
||||||
chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger.
|
chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger.
|
||||||
streamDone chan struct{} // streamDone signals when the streaming goroutine completes.
|
streamDone chan struct{} // streamDone signals when the streaming goroutine completes.
|
||||||
logger logging.RequestLogger // logger is the instance of the request logger service.
|
logger logging.RequestLogger // logger is the instance of the request logger service.
|
||||||
requestInfo *RequestInfo // requestInfo holds the details of the original request.
|
requestInfo *RequestInfo // requestInfo holds the details of the original request.
|
||||||
statusCode int // statusCode stores the HTTP status code of the response.
|
statusCode int // statusCode stores the HTTP status code of the response.
|
||||||
headers map[string][]string // headers stores the response headers.
|
headers map[string][]string // headers stores the response headers.
|
||||||
logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected.
|
logOnErrorOnly bool // logOnErrorOnly enables logging only when an error response is detected.
|
||||||
|
firstChunkTimestamp time.Time // firstChunkTimestamp captures TTFB for streaming responses.
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewResponseWriterWrapper creates and initializes a new ResponseWriterWrapper.
|
// NewResponseWriterWrapper creates and initializes a new ResponseWriterWrapper.
|
||||||
@@ -75,6 +76,10 @@ func (w *ResponseWriterWrapper) Write(data []byte) (int, error) {
|
|||||||
|
|
||||||
// THEN: Handle logging based on response type
|
// THEN: Handle logging based on response type
|
||||||
if w.isStreaming && w.chunkChannel != nil {
|
if w.isStreaming && w.chunkChannel != nil {
|
||||||
|
// Capture TTFB on first chunk (synchronous, before async channel send)
|
||||||
|
if w.firstChunkTimestamp.IsZero() {
|
||||||
|
w.firstChunkTimestamp = time.Now()
|
||||||
|
}
|
||||||
// For streaming responses: Send to async logging channel (non-blocking)
|
// For streaming responses: Send to async logging channel (non-blocking)
|
||||||
select {
|
select {
|
||||||
case w.chunkChannel <- append([]byte(nil), data...): // Non-blocking send with copy
|
case w.chunkChannel <- append([]byte(nil), data...): // Non-blocking send with copy
|
||||||
@@ -119,6 +124,10 @@ func (w *ResponseWriterWrapper) WriteString(data string) (int, error) {
|
|||||||
|
|
||||||
// THEN: Capture for logging
|
// THEN: Capture for logging
|
||||||
if w.isStreaming && w.chunkChannel != nil {
|
if w.isStreaming && w.chunkChannel != nil {
|
||||||
|
// Capture TTFB on first chunk (synchronous, before async channel send)
|
||||||
|
if w.firstChunkTimestamp.IsZero() {
|
||||||
|
w.firstChunkTimestamp = time.Now()
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case w.chunkChannel <- []byte(data):
|
case w.chunkChannel <- []byte(data):
|
||||||
default:
|
default:
|
||||||
@@ -282,6 +291,8 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error {
|
|||||||
w.streamDone = nil
|
w.streamDone = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
w.streamWriter.SetFirstChunkTimestamp(w.firstChunkTimestamp)
|
||||||
|
|
||||||
// Write API Request and Response to the streaming log before closing
|
// Write API Request and Response to the streaming log before closing
|
||||||
apiRequest := w.extractAPIRequest(c)
|
apiRequest := w.extractAPIRequest(c)
|
||||||
if len(apiRequest) > 0 {
|
if len(apiRequest) > 0 {
|
||||||
@@ -393,5 +404,7 @@ func (w *ResponseWriterWrapper) logRequest(statusCode int, headers map[string][]
|
|||||||
apiResponseBody,
|
apiResponseBody,
|
||||||
apiResponseErrors,
|
apiResponseErrors,
|
||||||
w.requestInfo.RequestID,
|
w.requestInfo.RequestID,
|
||||||
|
w.requestInfo.Timestamp,
|
||||||
|
apiResponseTimestamp,
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -44,10 +44,12 @@ type RequestLogger interface {
|
|||||||
// - apiRequest: The API request data
|
// - apiRequest: The API request data
|
||||||
// - apiResponse: The API response data
|
// - apiResponse: The API response data
|
||||||
// - requestID: Optional request ID for log file naming
|
// - requestID: Optional request ID for log file naming
|
||||||
|
// - requestTimestamp: When the request was received
|
||||||
|
// - apiResponseTimestamp: When the API response was received
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
// - error: An error if logging fails, nil otherwise
|
// - error: An error if logging fails, nil otherwise
|
||||||
LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error
|
LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error
|
||||||
|
|
||||||
// LogStreamingRequest initiates logging for a streaming request and returns a writer for chunks.
|
// LogStreamingRequest initiates logging for a streaming request and returns a writer for chunks.
|
||||||
//
|
//
|
||||||
@@ -109,6 +111,12 @@ type StreamingLogWriter interface {
|
|||||||
// - error: An error if writing fails, nil otherwise
|
// - error: An error if writing fails, nil otherwise
|
||||||
WriteAPIResponse(apiResponse []byte) error
|
WriteAPIResponse(apiResponse []byte) error
|
||||||
|
|
||||||
|
// SetFirstChunkTimestamp sets the TTFB timestamp captured when first chunk was received.
|
||||||
|
//
|
||||||
|
// Parameters:
|
||||||
|
// - timestamp: The time when first response chunk was received
|
||||||
|
SetFirstChunkTimestamp(timestamp time.Time)
|
||||||
|
|
||||||
// Close finalizes the log file and cleans up resources.
|
// Close finalizes the log file and cleans up resources.
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
@@ -180,11 +188,13 @@ func (l *FileRequestLogger) SetEnabled(enabled bool) {
|
|||||||
// - apiRequest: The API request data
|
// - apiRequest: The API request data
|
||||||
// - apiResponse: The API response data
|
// - apiResponse: The API response data
|
||||||
// - requestID: Optional request ID for log file naming
|
// - requestID: Optional request ID for log file naming
|
||||||
|
// - requestTimestamp: When the request was received
|
||||||
|
// - apiResponseTimestamp: When the API response was received
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
// - error: An error if logging fails, nil otherwise
|
// - error: An error if logging fails, nil otherwise
|
||||||
func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string) error {
|
func (l *FileRequestLogger) LogRequest(url, method string, requestHeaders map[string][]string, body []byte, statusCode int, responseHeaders map[string][]string, response, apiRequest, apiResponse []byte, apiResponseErrors []*interfaces.ErrorMessage, requestID string, requestTimestamp, apiResponseTimestamp time.Time) error {
|
||||||
return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID, time.Time{}, time.Time{})
|
return l.logRequest(url, method, requestHeaders, body, statusCode, responseHeaders, response, apiRequest, apiResponse, apiResponseErrors, false, requestID, requestTimestamp, apiResponseTimestamp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// LogRequestWithOptions logs a request with optional forced logging behavior.
|
// LogRequestWithOptions logs a request with optional forced logging behavior.
|
||||||
@@ -1065,10 +1075,15 @@ func (w *FileStreamingLogWriter) WriteAPIResponse(apiResponse []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
w.apiResponse = bytes.Clone(apiResponse)
|
w.apiResponse = bytes.Clone(apiResponse)
|
||||||
w.apiResponseTimestamp = time.Now()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *FileStreamingLogWriter) SetFirstChunkTimestamp(timestamp time.Time) {
|
||||||
|
if !timestamp.IsZero() {
|
||||||
|
w.apiResponseTimestamp = timestamp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Close finalizes the log file and cleans up resources.
|
// Close finalizes the log file and cleans up resources.
|
||||||
// It writes all buffered data to the file in the correct order:
|
// It writes all buffered data to the file in the correct order:
|
||||||
// API REQUEST -> API RESPONSE -> RESPONSE (status, headers, body chunks)
|
// API REQUEST -> API RESPONSE -> RESPONSE (status, headers, body chunks)
|
||||||
@@ -1236,6 +1251,8 @@ func (w *NoOpStreamingLogWriter) WriteAPIResponse(_ []byte) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (w *NoOpStreamingLogWriter) SetFirstChunkTimestamp(_ time.Time) {}
|
||||||
|
|
||||||
// Close is a no-op implementation that does nothing and always returns nil.
|
// Close is a no-op implementation that does nothing and always returns nil.
|
||||||
//
|
//
|
||||||
// Returns:
|
// Returns:
|
||||||
|
|||||||
@@ -124,8 +124,8 @@ func (h *GeminiCLIAPIHandler) CLIHandler(c *gin.Context) {
|
|||||||
log.Errorf("Failed to read response body: %v", err)
|
log.Errorf("Failed to read response body: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_, _ = c.Writer.Write(output)
|
|
||||||
c.Set("API_RESPONSE_TIMESTAMP", time.Now())
|
c.Set("API_RESPONSE_TIMESTAMP", time.Now())
|
||||||
|
_, _ = c.Writer.Write(output)
|
||||||
c.Set("API_RESPONSE", output)
|
c.Set("API_RESPONSE", output)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user