feat: add graceful shutdown for streaming response handling

- Introduced `streamDone` channel to signal the completion of streaming goroutines.
- Updated `processStreamingChunks` to incorporate proper cleanup and defer close operations.
- Ensured `streamWriter` and associated resources are released when streaming completes.
This commit is contained in:
Luis Pater
2025-09-17 19:46:57 +08:00
parent 7f0c9b1942
commit 172f282e9e

View File

@@ -28,6 +28,7 @@ type ResponseWriterWrapper struct {
isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream). isStreaming bool // isStreaming indicates whether the response is a streaming type (e.g., text/event-stream).
streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries. streamWriter logging.StreamingLogWriter // streamWriter is a writer for handling streaming log entries.
chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger. chunkChannel chan []byte // chunkChannel is a channel for asynchronously passing response chunks to the logger.
streamDone chan struct{} // streamDone signals when the streaming goroutine completes.
logger logging.RequestLogger // logger is the instance of the request logger service. logger logging.RequestLogger // logger is the instance of the request logger service.
requestInfo *RequestInfo // requestInfo holds the details of the original request. requestInfo *RequestInfo // requestInfo holds the details of the original request.
statusCode int // statusCode stores the HTTP status code of the response. statusCode int // statusCode stores the HTTP status code of the response.
@@ -108,9 +109,11 @@ func (w *ResponseWriterWrapper) WriteHeader(statusCode int) {
if err == nil { if err == nil {
w.streamWriter = streamWriter w.streamWriter = streamWriter
w.chunkChannel = make(chan []byte, 100) // Buffered channel for async writes w.chunkChannel = make(chan []byte, 100) // Buffered channel for async writes
doneChan := make(chan struct{})
w.streamDone = doneChan
// Start async chunk processor // Start async chunk processor
go w.processStreamingChunks() go w.processStreamingChunks(doneChan)
// Write status immediately // Write status immediately
_ = streamWriter.WriteStatus(statusCode, w.headers) _ = streamWriter.WriteStatus(statusCode, w.headers)
@@ -168,7 +171,13 @@ func (w *ResponseWriterWrapper) detectStreaming(contentType string) bool {
// processStreamingChunks runs in a separate goroutine to process response chunks from the chunkChannel. // processStreamingChunks runs in a separate goroutine to process response chunks from the chunkChannel.
// It asynchronously writes each chunk to the streaming log writer. // It asynchronously writes each chunk to the streaming log writer.
func (w *ResponseWriterWrapper) processStreamingChunks() { func (w *ResponseWriterWrapper) processStreamingChunks(done chan struct{}) {
if done == nil {
return
}
defer close(done)
if w.streamWriter == nil || w.chunkChannel == nil { if w.streamWriter == nil || w.chunkChannel == nil {
return return
} }
@@ -194,8 +203,15 @@ func (w *ResponseWriterWrapper) Finalize(c *gin.Context) error {
w.chunkChannel = nil w.chunkChannel = nil
} }
if w.streamDone != nil {
<-w.streamDone
w.streamDone = nil
}
if w.streamWriter != nil { if w.streamWriter != nil {
return w.streamWriter.Close() err := w.streamWriter.Close()
w.streamWriter = nil
return err
} }
} else { } else {
// Capture final status code and headers if not already captured // Capture final status code and headers if not already captured