From f6d625114c98de2010a4841b5f9a69b70a0ea704 Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Sun, 21 Dec 2025 16:17:48 +0800 Subject: [PATCH] feat(logging): revamp request logger to support streaming and temporary file spooling This update enhances the `FileRequestLogger` by introducing support for spooling large request and response bodies to temporary files, reducing memory consumption. It adds atomic requestLogID generation for sequential log naming and new methods for non-streaming/streaming log assembly. Also includes better error handling during logging and temp file cleanups. --- internal/logging/request_logger.go | 494 +++++++++++++++++++++++------ 1 file changed, 403 insertions(+), 91 deletions(-) diff --git a/internal/logging/request_logger.go b/internal/logging/request_logger.go index f8c068c5..391f2869 100644 --- a/internal/logging/request_logger.go +++ b/internal/logging/request_logger.go @@ -14,6 +14,7 @@ import ( "regexp" "sort" "strings" + "sync/atomic" "time" "github.com/andybalholm/brotli" @@ -25,6 +26,8 @@ import ( "github.com/router-for-me/CLIProxyAPI/v6/internal/util" ) +var requestLogID atomic.Uint64 + // RequestLogger defines the interface for logging HTTP requests and responses. // It provides methods for logging both regular and streaming HTTP request/response cycles. type RequestLogger interface { @@ -204,19 +207,52 @@ func (l *FileRequestLogger) logRequest(url, method string, requestHeaders map[st } filePath := filepath.Join(l.logsDir, filename) - // Decompress response if needed - decompressedResponse, err := l.decompressResponse(responseHeaders, response) - if err != nil { - // If decompression fails, log the error but continue with original response - decompressedResponse = append(response, []byte(fmt.Sprintf("\n[DECOMPRESSION ERROR: %v]", err))...) + requestBodyPath, errTemp := l.writeRequestBodyTempFile(body) + if errTemp != nil { + log.WithError(errTemp).Warn("failed to create request body temp file, falling back to direct write") + } + if requestBodyPath != "" { + defer func() { + if errRemove := os.Remove(requestBodyPath); errRemove != nil { + log.WithError(errRemove).Warn("failed to remove request body temp file") + } + }() } - // Create log content - content := l.formatLogContent(url, method, requestHeaders, body, apiRequest, apiResponse, decompressedResponse, statusCode, responseHeaders, apiResponseErrors) + responseToWrite, decompressErr := l.decompressResponse(responseHeaders, response) + if decompressErr != nil { + // If decompression fails, continue with original response and annotate the log output. + responseToWrite = response + } - // Write to file - if err = os.WriteFile(filePath, []byte(content), 0644); err != nil { - return fmt.Errorf("failed to write log file: %w", err) + logFile, errOpen := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if errOpen != nil { + return fmt.Errorf("failed to create log file: %w", errOpen) + } + + writeErr := l.writeNonStreamingLog( + logFile, + url, + method, + requestHeaders, + body, + requestBodyPath, + apiRequest, + apiResponse, + apiResponseErrors, + statusCode, + responseHeaders, + responseToWrite, + decompressErr, + ) + if errClose := logFile.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close request log file") + if writeErr == nil { + return errClose + } + } + if writeErr != nil { + return fmt.Errorf("failed to write log file: %w", writeErr) } if force && !l.enabled { @@ -253,26 +289,38 @@ func (l *FileRequestLogger) LogStreamingRequest(url, method string, headers map[ filename := l.generateFilename(url) filePath := filepath.Join(l.logsDir, filename) - // Create and open file - file, err := os.Create(filePath) - if err != nil { - return nil, fmt.Errorf("failed to create log file: %w", err) + requestHeaders := make(map[string][]string, len(headers)) + for key, values := range headers { + headerValues := make([]string, len(values)) + copy(headerValues, values) + requestHeaders[key] = headerValues } - // Write initial request information - requestInfo := l.formatRequestInfo(url, method, headers, body) - if _, err = file.WriteString(requestInfo); err != nil { - _ = file.Close() - return nil, fmt.Errorf("failed to write request info: %w", err) + requestBodyPath, errTemp := l.writeRequestBodyTempFile(body) + if errTemp != nil { + return nil, fmt.Errorf("failed to create request body temp file: %w", errTemp) } + responseBodyFile, errCreate := os.CreateTemp(l.logsDir, "response-body-*.tmp") + if errCreate != nil { + _ = os.Remove(requestBodyPath) + return nil, fmt.Errorf("failed to create response body temp file: %w", errCreate) + } + responseBodyPath := responseBodyFile.Name() + // Create streaming writer writer := &FileStreamingLogWriter{ - file: file, - chunkChan: make(chan []byte, 100), // Buffered channel for async writes - closeChan: make(chan struct{}), - errorChan: make(chan error, 1), - bufferedChunks: &bytes.Buffer{}, + logFilePath: filePath, + url: url, + method: method, + timestamp: time.Now(), + requestHeaders: requestHeaders, + requestBodyPath: requestBodyPath, + responseBodyPath: responseBodyPath, + responseBodyFile: responseBodyFile, + chunkChan: make(chan []byte, 100), // Buffered channel for async writes + closeChan: make(chan struct{}), + errorChan: make(chan error, 1), } // Start async writer goroutine @@ -323,7 +371,9 @@ func (l *FileRequestLogger) generateFilename(url string) string { timestamp := time.Now().Format("2006-01-02T150405-.000000000") timestamp = strings.Replace(timestamp, ".", "", -1) - return fmt.Sprintf("%s-%s.log", sanitized, timestamp) + id := requestLogID.Add(1) + + return fmt.Sprintf("%s-%s-%d.log", sanitized, timestamp, id) } // sanitizeForFilename replaces characters that are not safe for filenames. @@ -405,6 +455,220 @@ func (l *FileRequestLogger) cleanupOldErrorLogs() error { return nil } +func (l *FileRequestLogger) writeRequestBodyTempFile(body []byte) (string, error) { + tmpFile, errCreate := os.CreateTemp(l.logsDir, "request-body-*.tmp") + if errCreate != nil { + return "", errCreate + } + tmpPath := tmpFile.Name() + + if _, errCopy := io.Copy(tmpFile, bytes.NewReader(body)); errCopy != nil { + _ = tmpFile.Close() + _ = os.Remove(tmpPath) + return "", errCopy + } + if errClose := tmpFile.Close(); errClose != nil { + _ = os.Remove(tmpPath) + return "", errClose + } + return tmpPath, nil +} + +func (l *FileRequestLogger) writeNonStreamingLog( + w io.Writer, + url, method string, + requestHeaders map[string][]string, + requestBody []byte, + requestBodyPath string, + apiRequest []byte, + apiResponse []byte, + apiResponseErrors []*interfaces.ErrorMessage, + statusCode int, + responseHeaders map[string][]string, + response []byte, + decompressErr error, +) error { + if errWrite := writeRequestInfoWithBody(w, url, method, requestHeaders, requestBody, requestBodyPath, time.Now()); errWrite != nil { + return errWrite + } + if errWrite := writeAPISection(w, "=== API REQUEST ===\n", "=== API REQUEST", apiRequest); errWrite != nil { + return errWrite + } + if errWrite := writeAPIErrorResponses(w, apiResponseErrors); errWrite != nil { + return errWrite + } + if errWrite := writeAPISection(w, "=== API RESPONSE ===\n", "=== API RESPONSE", apiResponse); errWrite != nil { + return errWrite + } + return writeResponseSection(w, statusCode, true, responseHeaders, bytes.NewReader(response), decompressErr, true) +} + +func writeRequestInfoWithBody( + w io.Writer, + url, method string, + headers map[string][]string, + body []byte, + bodyPath string, + timestamp time.Time, +) error { + if _, errWrite := io.WriteString(w, "=== REQUEST INFO ===\n"); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, fmt.Sprintf("Version: %s\n", buildinfo.Version)); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, fmt.Sprintf("URL: %s\n", url)); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, fmt.Sprintf("Method: %s\n", method)); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, fmt.Sprintf("Timestamp: %s\n", timestamp.Format(time.RFC3339Nano))); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + + if _, errWrite := io.WriteString(w, "=== HEADERS ===\n"); errWrite != nil { + return errWrite + } + for key, values := range headers { + for _, value := range values { + masked := util.MaskSensitiveHeaderValue(key, value) + if _, errWrite := io.WriteString(w, fmt.Sprintf("%s: %s\n", key, masked)); errWrite != nil { + return errWrite + } + } + } + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + + if _, errWrite := io.WriteString(w, "=== REQUEST BODY ===\n"); errWrite != nil { + return errWrite + } + + if bodyPath != "" { + bodyFile, errOpen := os.Open(bodyPath) + if errOpen != nil { + return errOpen + } + if _, errCopy := io.Copy(w, bodyFile); errCopy != nil { + _ = bodyFile.Close() + return errCopy + } + if errClose := bodyFile.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close request body temp file") + } + } else if _, errWrite := w.Write(body); errWrite != nil { + return errWrite + } + + if _, errWrite := io.WriteString(w, "\n\n"); errWrite != nil { + return errWrite + } + return nil +} + +func writeAPISection(w io.Writer, sectionHeader string, sectionPrefix string, payload []byte) error { + if len(payload) == 0 { + return nil + } + + if bytes.HasPrefix(payload, []byte(sectionPrefix)) { + if _, errWrite := w.Write(payload); errWrite != nil { + return errWrite + } + if !bytes.HasSuffix(payload, []byte("\n")) { + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + } + } else { + if _, errWrite := io.WriteString(w, sectionHeader); errWrite != nil { + return errWrite + } + if _, errWrite := w.Write(payload); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + } + + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + return nil +} + +func writeAPIErrorResponses(w io.Writer, apiResponseErrors []*interfaces.ErrorMessage) error { + for i := 0; i < len(apiResponseErrors); i++ { + if apiResponseErrors[i] == nil { + continue + } + if _, errWrite := io.WriteString(w, "=== API ERROR RESPONSE ===\n"); errWrite != nil { + return errWrite + } + if _, errWrite := io.WriteString(w, fmt.Sprintf("HTTP Status: %d\n", apiResponseErrors[i].StatusCode)); errWrite != nil { + return errWrite + } + if apiResponseErrors[i].Error != nil { + if _, errWrite := io.WriteString(w, apiResponseErrors[i].Error.Error()); errWrite != nil { + return errWrite + } + } + if _, errWrite := io.WriteString(w, "\n\n"); errWrite != nil { + return errWrite + } + } + return nil +} + +func writeResponseSection(w io.Writer, statusCode int, statusWritten bool, responseHeaders map[string][]string, responseReader io.Reader, decompressErr error, trailingNewline bool) error { + if _, errWrite := io.WriteString(w, "=== RESPONSE ===\n"); errWrite != nil { + return errWrite + } + if statusWritten { + if _, errWrite := io.WriteString(w, fmt.Sprintf("Status: %d\n", statusCode)); errWrite != nil { + return errWrite + } + } + + if responseHeaders != nil { + for key, values := range responseHeaders { + for _, value := range values { + if _, errWrite := io.WriteString(w, fmt.Sprintf("%s: %s\n", key, value)); errWrite != nil { + return errWrite + } + } + } + } + + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + + if responseReader != nil { + if _, errCopy := io.Copy(w, responseReader); errCopy != nil { + return errCopy + } + } + if decompressErr != nil { + if _, errWrite := io.WriteString(w, fmt.Sprintf("\n[DECOMPRESSION ERROR: %v]", decompressErr)); errWrite != nil { + return errWrite + } + } + + if trailingNewline { + if _, errWrite := io.WriteString(w, "\n"); errWrite != nil { + return errWrite + } + } + return nil +} + // formatLogContent creates the complete log content for non-streaming requests. // // Parameters: @@ -648,13 +912,34 @@ func (l *FileRequestLogger) formatRequestInfo(url, method string, headers map[st } // FileStreamingLogWriter implements StreamingLogWriter for file-based streaming logs. -// It handles asynchronous writing of streaming response chunks to a file. -// All data is buffered and written in the correct order when Close is called. +// It spools streaming response chunks to a temporary file to avoid retaining large responses in memory. +// The final log file is assembled when Close is called. type FileStreamingLogWriter struct { - // file is the file where log data is written. - file *os.File + // logFilePath is the final log file path. + logFilePath string - // chunkChan is a channel for receiving response chunks to buffer. + // url is the request URL (masked upstream in middleware). + url string + + // method is the HTTP method. + method string + + // timestamp is captured when the streaming log is initialized. + timestamp time.Time + + // requestHeaders stores the request headers. + requestHeaders map[string][]string + + // requestBodyPath is a temporary file path holding the request body. + requestBodyPath string + + // responseBodyPath is a temporary file path holding the streaming response body. + responseBodyPath string + + // responseBodyFile is the temp file where chunks are appended by the async writer. + responseBodyFile *os.File + + // chunkChan is a channel for receiving response chunks to spool. chunkChan chan []byte // closeChan is a channel for signaling when the writer is closed. @@ -663,9 +948,6 @@ type FileStreamingLogWriter struct { // errorChan is a channel for reporting errors during writing. errorChan chan error - // bufferedChunks stores the response chunks in order. - bufferedChunks *bytes.Buffer - // responseStatus stores the HTTP status code. responseStatus int @@ -770,85 +1052,115 @@ func (w *FileStreamingLogWriter) Close() error { close(w.chunkChan) } - // Wait for async writer to finish buffering chunks + // Wait for async writer to finish spooling chunks if w.closeChan != nil { <-w.closeChan w.chunkChan = nil } - if w.file == nil { + select { + case errWrite := <-w.errorChan: + w.cleanupTempFiles() + return errWrite + default: + } + + if w.logFilePath == "" { + w.cleanupTempFiles() return nil } - // Write all content in the correct order - var content strings.Builder - - // 1. Write API REQUEST section - if len(w.apiRequest) > 0 { - if bytes.HasPrefix(w.apiRequest, []byte("=== API REQUEST")) { - content.Write(w.apiRequest) - if !bytes.HasSuffix(w.apiRequest, []byte("\n")) { - content.WriteString("\n") - } - } else { - content.WriteString("=== API REQUEST ===\n") - content.Write(w.apiRequest) - content.WriteString("\n") - } - content.WriteString("\n") + logFile, errOpen := os.OpenFile(w.logFilePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0644) + if errOpen != nil { + w.cleanupTempFiles() + return fmt.Errorf("failed to create log file: %w", errOpen) } - // 2. Write API RESPONSE section - if len(w.apiResponse) > 0 { - if bytes.HasPrefix(w.apiResponse, []byte("=== API RESPONSE")) { - content.Write(w.apiResponse) - if !bytes.HasSuffix(w.apiResponse, []byte("\n")) { - content.WriteString("\n") - } - } else { - content.WriteString("=== API RESPONSE ===\n") - content.Write(w.apiResponse) - content.WriteString("\n") - } - content.WriteString("\n") - } - - // 3. Write RESPONSE section (status, headers, buffered chunks) - content.WriteString("=== RESPONSE ===\n") - if w.statusWritten { - content.WriteString(fmt.Sprintf("Status: %d\n", w.responseStatus)) - } - - for key, values := range w.responseHeaders { - for _, value := range values { - content.WriteString(fmt.Sprintf("%s: %s\n", key, value)) + writeErr := w.writeFinalLog(logFile) + if errClose := logFile.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close request log file") + if writeErr == nil { + writeErr = errClose } } - content.WriteString("\n") - // Write buffered response body chunks - if w.bufferedChunks != nil && w.bufferedChunks.Len() > 0 { - content.Write(w.bufferedChunks.Bytes()) - } - - // Write the complete content to file - if _, err := w.file.WriteString(content.String()); err != nil { - _ = w.file.Close() - return err - } - - return w.file.Close() + w.cleanupTempFiles() + return writeErr } // asyncWriter runs in a goroutine to buffer chunks from the channel. -// It continuously reads chunks from the channel and buffers them for later writing. +// It continuously reads chunks from the channel and appends them to a temp file for later assembly. func (w *FileStreamingLogWriter) asyncWriter() { defer close(w.closeChan) for chunk := range w.chunkChan { - if w.bufferedChunks != nil { - w.bufferedChunks.Write(chunk) + if w.responseBodyFile == nil { + continue } + if _, errWrite := w.responseBodyFile.Write(chunk); errWrite != nil { + select { + case w.errorChan <- errWrite: + default: + } + if errClose := w.responseBodyFile.Close(); errClose != nil { + select { + case w.errorChan <- errClose: + default: + } + } + w.responseBodyFile = nil + } + } + + if w.responseBodyFile == nil { + return + } + if errClose := w.responseBodyFile.Close(); errClose != nil { + select { + case w.errorChan <- errClose: + default: + } + } + w.responseBodyFile = nil +} + +func (w *FileStreamingLogWriter) writeFinalLog(logFile *os.File) error { + if errWrite := writeRequestInfoWithBody(logFile, w.url, w.method, w.requestHeaders, nil, w.requestBodyPath, w.timestamp); errWrite != nil { + return errWrite + } + if errWrite := writeAPISection(logFile, "=== API REQUEST ===\n", "=== API REQUEST", w.apiRequest); errWrite != nil { + return errWrite + } + if errWrite := writeAPISection(logFile, "=== API RESPONSE ===\n", "=== API RESPONSE", w.apiResponse); errWrite != nil { + return errWrite + } + + responseBodyFile, errOpen := os.Open(w.responseBodyPath) + if errOpen != nil { + return errOpen + } + defer func() { + if errClose := responseBodyFile.Close(); errClose != nil { + log.WithError(errClose).Warn("failed to close response body temp file") + } + }() + + return writeResponseSection(logFile, w.responseStatus, w.statusWritten, w.responseHeaders, responseBodyFile, nil, false) +} + +func (w *FileStreamingLogWriter) cleanupTempFiles() { + if w.requestBodyPath != "" { + if errRemove := os.Remove(w.requestBodyPath); errRemove != nil { + log.WithError(errRemove).Warn("failed to remove request body temp file") + } + w.requestBodyPath = "" + } + + if w.responseBodyPath != "" { + if errRemove := os.Remove(w.responseBodyPath); errRemove != nil { + log.WithError(errRemove).Warn("failed to remove response body temp file") + } + w.responseBodyPath = "" } }