From 172f282e9efaed80163742ae6418ddeb7de0052f Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Wed, 17 Sep 2025 19:46:57 +0800 Subject: [PATCH] feat: add graceful shutdown for streaming response handling - Introduced `streamDone` channel to signal the completion of streaming goroutines. - Updated `processStreamingChunks` to incorporate proper cleanup and defer close operations. - Ensured `streamWriter` and associated resources are released when streaming completes. --- internal/api/middleware/response_writer.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/internal/api/middleware/response_writer.go b/internal/api/middleware/response_writer.go index 58b7794d..761ac9e0 100644 --- a/internal/api/middleware/response_writer.go +++ b/internal/api/middleware/response_writer.go @@ -28,6 +28,7 @@ type ResponseWriterWrapper struct { isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. + streamDone chan struct{} // streamDone signals when the streaming goroutine completes. logger logging.RequestLogger // logger is the instance of the request logger service. requestInfo *RequestInfo // requestInfo holds the details of the original request. statusCode int // statusCode stores the HTTP status code of the response. @@ -108,9 +109,11 @@ func (w *ResponseWriterWrapper) WriteHeader(statusCode int) { if err == nil { w.streamWriter = streamWriter w.chunkChannel = make(chan []byte, 100) // Buffered channel for async writes + doneChan := make(chan struct{}) + w.streamDone = doneChan // Start async chunk processor - go w.processStreamingChunks() + go w.processStreamingChunks(doneChan) // Write status immediately _ = streamWriter.WriteStatus(statusCode, w.headers) @@ -168,7 +171,13 @@ func (w *ResponseWriterWrapper) detectStreaming(contentType string) bool { // processStreamingChunks runs in a separate goroutine to process response chunks from the chunkChannel. // It asynchronously writes each chunk to the streaming log writer. -func (w *ResponseWriterWrapper) processStreamingChunks() { +func (w *ResponseWriterWrapper) processStreamingChunks(done chan struct{}) { + if done == nil { + return + } + + defer close(done) + if w.streamWriter == nil || w.chunkChannel == nil { return } @@ -194,8 +203,15 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error { w.chunkChannel = nil } + if w.streamDone != nil { + <-w.streamDone + w.streamDone = nil + } + if w.streamWriter != nil { - return w.streamWriter.Close() + err := w.streamWriter.Close() + w.streamWriter = nil + return err } } else { // Capture final status code and headers if not already captured