diff --git a/beacon-chain/rpc/eth/events/events.go b/beacon-chain/rpc/eth/events/events.go index 3835d7f2234a..f2745649cb9f 100644 --- a/beacon-chain/rpc/eth/events/events.go +++ b/beacon-chain/rpc/eth/events/events.go @@ -166,7 +166,7 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { } api.SetSSEHeaders(w) - sw := NewStreamingResponseController(w, timeout) + sw := newStreamingResponseController(w, timeout) ctx, cancel := context.WithCancel(ctx) defer func() { cancel() @@ -178,22 +178,22 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) { log.WithError(err).Debug("Shutting down StreamEvents handler.") } cleanupStart := time.Now() - es.waitForCleanup() + es.waitForExit() log.WithField("cleanup_wait", time.Since(cleanupStart)).Debug("streamEvents shutdown complete") } func newEventStreamer(buffSize int, ka time.Duration) *eventStreamer { return &eventStreamer{ - outbox: make(chan lazyReader, buffSize), - keepAlive: ka, - cleanedUp: make(chan struct{}), + outbox: make(chan lazyReader, buffSize), + keepAlive: ka, + openUntilExit: make(chan struct{}), } } type eventStreamer struct { - outbox chan lazyReader - keepAlive time.Duration - cleanedUp chan struct{} + outbox chan lazyReader + keepAlive time.Duration + openUntilExit chan struct{} } func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error { @@ -265,13 +265,13 @@ func newlineReader() io.Reader { // outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to // the client as fast as the client can read them. -func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *StreamingResponseWriterController) { +func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *streamingResponseWriterController) { var err error defer func() { if err != nil { log.WithError(err).Debug("Event streamer shutting down due to error.") } - es.cleanup() + es.exit() }() defer func() { cancel() @@ -313,20 +313,23 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can } } -func (es *eventStreamer) cleanup() { +func (es *eventStreamer) exit() { drained := 0 for range es.outbox { drained += 1 } log.WithField("undelivered_events", drained).Debug("Event stream outbox drained.") - close(es.cleanedUp) + close(es.openUntilExit) } -func (es *eventStreamer) waitForCleanup() { - <-es.cleanedUp +// waitForExit blocks until the outboxWriteLoop has exited. +// While this function blocks, it is not yet safe to exit the http handler, +// because the outboxWriteLoop may still be writing to the http ResponseWriter. +func (es *eventStreamer) waitForExit() { + <-es.openUntilExit } -func writeLazyReaderWithRecover(w *StreamingResponseWriterController, lr lazyReader) (err error) { +func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyReader) (err error) { defer func() { if r := recover(); r != nil { log.WithField("panic", r).Error("Recovered from panic while writing event to client.") @@ -342,7 +345,11 @@ func writeLazyReaderWithRecover(w *StreamingResponseWriterController, lr lazyRea return err } -func (es *eventStreamer) writeOutbox(ctx context.Context, w *StreamingResponseWriterController, first lazyReader) error { +func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWriterController, first lazyReader) error { + // The outboxWriteLoop is responsible for managing the keep-alive timer and toggling between reading from the outbox + // when it is ready, only allowing the keep-alive to fire when there hasn't been a write in the keep-alive interval. + // Since outboxWriteLoop will get either the first event or the keep-alive, we let it pass in the first event to write, + // either the event's lazyReader, or nil for a keep-alive. needKeepAlive := true if first != nil { if err := writeLazyReaderWithRecover(w, first); err != nil { @@ -358,6 +365,11 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w *StreamingResponseWr case <-ctx.Done(): return ctx.Err() case rf := <-es.outbox: + // We don't want to call Flush until we've exhausted all the writes - it's always preferrable to + // just keep draining the outbox and rely on the underlying Write code to flush+block when it + // needs to based on buffering. Whenever we fill the buffer with a string of writes, the underlying + // code will flush on its own, so it's better to explicitly flush only once, after we've totally + // drained the outbox, to catch any dangling bytes stuck in a buffer. if err := writeLazyReaderWithRecover(w, rf); err != nil { return err } @@ -659,22 +671,26 @@ func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, erro }, nil } -func NewStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *StreamingResponseWriterController { +func newStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *streamingResponseWriterController { rc := http.NewResponseController(rw) - return &StreamingResponseWriterController{ + return &streamingResponseWriterController{ timeout: timeout, rw: rw, rc: rc, } } -type StreamingResponseWriterController struct { +// streamingResponseWriterController provides an interface similar to an http.ResponseWriter, +// wrapping an http.ResponseWriter and an http.ResponseController, using the ResponseController +// to set and clear deadlines for Write and Flush methods, and delegating to the underlying +// types to Write and Flush. +type streamingResponseWriterController struct { timeout time.Duration rw http.ResponseWriter rc *http.ResponseController } -func (c *StreamingResponseWriterController) Write(b []byte) (int, error) { +func (c *streamingResponseWriterController) Write(b []byte) (int, error) { if err := c.setDeadline(); err != nil { return 0, err } @@ -685,15 +701,15 @@ func (c *StreamingResponseWriterController) Write(b []byte) (int, error) { return out, c.clearDeadline() } -func (c *StreamingResponseWriterController) setDeadline() error { +func (c *streamingResponseWriterController) setDeadline() error { return c.rc.SetWriteDeadline(time.Now().Add(c.timeout)) } -func (c *StreamingResponseWriterController) clearDeadline() error { +func (c *streamingResponseWriterController) clearDeadline() error { return c.rc.SetWriteDeadline(time.Time{}) } -func (c *StreamingResponseWriterController) Flush() error { +func (c *streamingResponseWriterController) Flush() error { if err := c.setDeadline(); err != nil { return err } @@ -702,5 +718,3 @@ func (c *StreamingResponseWriterController) Flush() error { } return c.clearDeadline() } - -var _ io.Writer = &StreamingResponseWriterController{}