Skip to content

Commit dfab0e0

Browse files
committed
refactor w.(http.Flusher).Flush() out of go func().
1 parent e160f19 commit dfab0e0

File tree

1 file changed

+34
-10
lines changed

1 file changed

+34
-10
lines changed

server/streamable_http.go

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -532,10 +532,15 @@ func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.
532532
w.(http.Flusher).Flush()
533533
}
534534

535-
// Start a goroutine to listen for notifications and forward them to the client
535+
// Create a channel to pass notifications from the goroutine to the main handler
536+
notificationCh := make(chan struct {
537+
eventID string
538+
data []byte
539+
}, 100) // Buffer size to prevent blocking
536540
notifDone := make(chan struct{})
537541
defer close(notifDone)
538542

543+
// Start a goroutine to listen for notifications and send them to the notification channel
539544
go func() {
540545
for {
541546
select {
@@ -560,21 +565,41 @@ func (s *StreamableHTTPServer) handleSSEResponse(w http.ResponseWriter, r *http.
560565
}
561566
}
562567

563-
// Write the event directly to the response writer
564-
if eventID != "" {
565-
fmt.Fprintf(w, "id: %s\ndata: %s\n\n", eventID, data)
566-
} else {
567-
fmt.Fprintf(w, "data: %s\n\n", data)
568+
// Send the notification to the main handler goroutine via channel
569+
select {
570+
case notificationCh <- struct {
571+
eventID string
572+
data []byte
573+
}{eventID: eventID, data: data}:
574+
case <-notifDone:
575+
return
568576
}
569-
w.(http.Flusher).Flush()
570577
case <-notifDone:
571578
return
572579
}
573580
}
574581
}()
575582

576-
// Wait for the request context to be done
577-
<-r.Context().Done()
583+
// Create a context with cancellation
584+
ctx, cancel := context.WithCancel(r.Context())
585+
defer cancel()
586+
587+
// Process notifications in the main handler goroutine
588+
for {
589+
select {
590+
case notification := <-notificationCh:
591+
// Write the event directly to the response writer from the main handler goroutine
592+
if notification.eventID != "" {
593+
fmt.Fprintf(w, "id: %s\ndata: %s\n\n", notification.eventID, notification.data)
594+
} else {
595+
fmt.Fprintf(w, "data: %s\n\n", notification.data)
596+
}
597+
w.(http.Flusher).Flush()
598+
case <-ctx.Done():
599+
// Request context is done, exit the loop
600+
return
601+
}
602+
}
578603
}
579604

580605
// handleGet processes GET requests to the MCP endpoint (for standalone SSE streams)
@@ -783,4 +808,3 @@ func (s *StreamableHTTPServer) validateSession(sessionID string) bool {
783808

784809
return false
785810
}
786-

0 commit comments

Comments
 (0)