feat(logging): add upstream API request/response capture to streaming logs

This commit is contained in:
hkfires
2025-12-08 17:21:58 +08:00
parent de77903915
commit 92f13fc316
2 changed files with 182 additions and 30 deletions

View File

@@ -232,7 +232,16 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error {
w.streamDone = nil
}
// Write API Request and Response to the streaming log before closing
if w.streamWriter != nil {
apiRequest := w.extractAPIRequest(c)
if len(apiRequest) > 0 {
_ = w.streamWriter.WriteAPIRequest(apiRequest)
}
apiResponse := w.extractAPIResponse(c)
if len(apiResponse) > 0 {
_ = w.streamWriter.WriteAPIResponse(apiResponse)
}
if err := w.streamWriter.Close(); err != nil {
w.streamWriter = nil
return err

View File

@@ -84,6 +84,26 @@ type StreamingLogWriter interface {
// - error: An error if writing fails, nil otherwise
WriteStatus(status int, headers map[string][]string) error
// WriteAPIRequest writes the upstream API request details to the log.
// This should be called before WriteStatus to maintain proper log ordering.
//
// Parameters:
// - apiRequest: The API request data (typically includes URL, headers, body sent upstream)
//
// Returns:
// - error: An error if writing fails, nil otherwise
WriteAPIRequest(apiRequest []byte) error
// WriteAPIResponse writes the upstream API response details to the log.
// This should be called after the streaming response is complete.
//
// Parameters:
// - apiResponse: The API response data
//
// Returns:
// - error: An error if writing fails, nil otherwise
WriteAPIResponse(apiResponse []byte) error
// Close finalizes the log file and cleans up resources.
//
// Returns:
@@ -248,10 +268,11 @@ func (l *FileRequestLogger) LogStreamingRequest(url, method string, headers map[
// Create streaming writer
writer := &FileStreamingLogWriter{
file: file,
chunkChan: make(chan []byte, 100), // Buffered channel for async writes
closeChan: make(chan struct{}),
errorChan: make(chan error, 1),
file: file,
chunkChan: make(chan []byte, 100), // Buffered channel for async writes
closeChan: make(chan struct{}),
errorChan: make(chan error, 1),
bufferedChunks: &bytes.Buffer{},
}
// Start async writer goroutine
@@ -628,11 +649,12 @@ func (l *FileRequestLogger) formatRequestInfo(url, method string, headers map[st
// FileStreamingLogWriter implements StreamingLogWriter for file-based streaming logs.
// It handles asynchronous writing of streaming response chunks to a file.
// All data is buffered and written in the correct order when Close is called.
type FileStreamingLogWriter struct {
// file is the file where log data is written.
file *os.File
// chunkChan is a channel for receiving response chunks to write.
// chunkChan is a channel for receiving response chunks to buffer.
chunkChan chan []byte
// closeChan is a channel for signaling when the writer is closed.
@@ -641,8 +663,23 @@ type FileStreamingLogWriter struct {
// errorChan is a channel for reporting errors during writing.
errorChan chan error
// statusWritten indicates whether the response status has been written.
// bufferedChunks stores the response chunks in order.
bufferedChunks *bytes.Buffer
// responseStatus stores the HTTP status code.
responseStatus int
// statusWritten indicates whether a non-zero status was recorded.
statusWritten bool
// responseHeaders stores the response headers.
responseHeaders map[string][]string
// apiRequest stores the upstream API request data.
apiRequest []byte
// apiResponse stores the upstream API response data.
apiResponse []byte
}
// WriteChunkAsync writes a response chunk asynchronously (non-blocking).
@@ -666,39 +703,65 @@ func (w *FileStreamingLogWriter) WriteChunkAsync(chunk []byte) {
}
}
// WriteStatus writes the response status and headers to the log.
// WriteStatus buffers the response status and headers for later writing.
//
// Parameters:
// - status: The response status code
// - headers: The response headers
//
// Returns:
// - error: An error if writing fails, nil otherwise
// - error: Always returns nil (buffering cannot fail)
func (w *FileStreamingLogWriter) WriteStatus(status int, headers map[string][]string) error {
if w.file == nil || w.statusWritten {
if status == 0 {
return nil
}
var content strings.Builder
content.WriteString("========================================\n")
content.WriteString("=== RESPONSE ===\n")
content.WriteString(fmt.Sprintf("Status: %d\n", status))
for key, values := range headers {
for _, value := range values {
content.WriteString(fmt.Sprintf("%s: %s\n", key, value))
w.responseStatus = status
if headers != nil {
w.responseHeaders = make(map[string][]string, len(headers))
for key, values := range headers {
headerValues := make([]string, len(values))
copy(headerValues, values)
w.responseHeaders[key] = headerValues
}
}
content.WriteString("\n")
w.statusWritten = true
return nil
}
_, err := w.file.WriteString(content.String())
if err == nil {
w.statusWritten = true
// WriteAPIRequest buffers the upstream API request details for later writing.
//
// Parameters:
// - apiRequest: The API request data (typically includes URL, headers, body sent upstream)
//
// Returns:
// - error: Always returns nil (buffering cannot fail)
func (w *FileStreamingLogWriter) WriteAPIRequest(apiRequest []byte) error {
if len(apiRequest) == 0 {
return nil
}
return err
w.apiRequest = bytes.Clone(apiRequest)
return nil
}
// WriteAPIResponse buffers the upstream API response details for later writing.
//
// Parameters:
// - apiResponse: The API response data
//
// Returns:
// - error: Always returns nil (buffering cannot fail)
func (w *FileStreamingLogWriter) WriteAPIResponse(apiResponse []byte) error {
if len(apiResponse) == 0 {
return nil
}
w.apiResponse = bytes.Clone(apiResponse)
return nil
}
// Close finalizes the log file and cleans up resources.
// It writes all buffered data to the file in the correct order:
// API REQUEST -> API RESPONSE -> RESPONSE (status, headers, body chunks)
//
// Returns:
// - error: An error if closing fails, nil otherwise
@@ -707,27 +770,85 @@ func (w *FileStreamingLogWriter) Close() error {
close(w.chunkChan)
}
// Wait for async writer to finish
// Wait for async writer to finish buffering chunks
if w.closeChan != nil {
<-w.closeChan
w.chunkChan = nil
}
if w.file != nil {
return w.file.Close()
if w.file == nil {
return nil
}
return nil
// Write all content in the correct order
var content strings.Builder
// 1. Write API REQUEST section
if len(w.apiRequest) > 0 {
if bytes.HasPrefix(w.apiRequest, []byte("=== API REQUEST")) {
content.Write(w.apiRequest)
if !bytes.HasSuffix(w.apiRequest, []byte("\n")) {
content.WriteString("\n")
}
} else {
content.WriteString("=== API REQUEST ===\n")
content.Write(w.apiRequest)
content.WriteString("\n")
}
content.WriteString("\n")
}
// 2. Write API RESPONSE section
if len(w.apiResponse) > 0 {
if bytes.HasPrefix(w.apiResponse, []byte("=== API RESPONSE")) {
content.Write(w.apiResponse)
if !bytes.HasSuffix(w.apiResponse, []byte("\n")) {
content.WriteString("\n")
}
} else {
content.WriteString("=== API RESPONSE ===\n")
content.Write(w.apiResponse)
content.WriteString("\n")
}
content.WriteString("\n")
}
// 3. Write RESPONSE section (status, headers, buffered chunks)
content.WriteString("========================================\n")
content.WriteString("=== RESPONSE ===\n")
if w.statusWritten {
content.WriteString(fmt.Sprintf("Status: %d\n", w.responseStatus))
}
for key, values := range w.responseHeaders {
for _, value := range values {
content.WriteString(fmt.Sprintf("%s: %s\n", key, value))
}
}
content.WriteString("\n")
// Write buffered response body chunks
if w.bufferedChunks != nil && w.bufferedChunks.Len() > 0 {
content.Write(w.bufferedChunks.Bytes())
}
// Write the complete content to file
if _, err := w.file.WriteString(content.String()); err != nil {
_ = w.file.Close()
return err
}
return w.file.Close()
}
// asyncWriter runs in a goroutine to handle async chunk writing.
// It continuously reads chunks from the channel and writes them to the file.
// asyncWriter runs in a goroutine to buffer chunks from the channel.
// It continuously reads chunks from the channel and buffers them for later writing.
func (w *FileStreamingLogWriter) asyncWriter() {
defer close(w.closeChan)
for chunk := range w.chunkChan {
if w.file != nil {
_, _ = w.file.Write(chunk)
if w.bufferedChunks != nil {
w.bufferedChunks.Write(chunk)
}
}
}
@@ -754,6 +875,28 @@ func (w *NoOpStreamingLogWriter) WriteStatus(_ int, _ map[string][]string) error
return nil
}
// WriteAPIRequest is a no-op implementation that does nothing and always returns nil.
//
// Parameters:
// - apiRequest: The API request data (ignored)
//
// Returns:
// - error: Always returns nil
func (w *NoOpStreamingLogWriter) WriteAPIRequest(_ []byte) error {
return nil
}
// WriteAPIResponse is a no-op implementation that does nothing and always returns nil.
//
// Parameters:
// - apiResponse: The API response data (ignored)
//
// Returns:
// - error: Always returns nil
func (w *NoOpStreamingLogWriter) WriteAPIResponse(_ []byte) error {
return nil
}
// Close is a no-op implementation that does nothing and always returns nil.
//
// Returns: