Skip to content

Commit

Permalink
naming/comments; make response controller private
Browse files Browse the repository at this point in the history
  • Loading branch information
kasey committed Oct 23, 2024
1 parent 7c9613f commit 23e54a8
Showing 1 changed file with 39 additions and 25 deletions.
64 changes: 39 additions & 25 deletions beacon-chain/rpc/eth/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
}

api.SetSSEHeaders(w)
sw := NewStreamingResponseController(w, timeout)
sw := newStreamingResponseController(w, timeout)
ctx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
Expand All @@ -178,22 +178,22 @@ func (s *Server) StreamEvents(w http.ResponseWriter, r *http.Request) {
log.WithError(err).Debug("Shutting down StreamEvents handler.")
}
cleanupStart := time.Now()
es.waitForCleanup()
es.waitForExit()
log.WithField("cleanup_wait", time.Since(cleanupStart)).Debug("streamEvents shutdown complete")
}

func newEventStreamer(buffSize int, ka time.Duration) *eventStreamer {
return &eventStreamer{
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
cleanedUp: make(chan struct{}),
outbox: make(chan lazyReader, buffSize),
keepAlive: ka,
openUntilExit: make(chan struct{}),
}
}

type eventStreamer struct {
outbox chan lazyReader
keepAlive time.Duration
cleanedUp chan struct{}
outbox chan lazyReader
keepAlive time.Duration
openUntilExit chan struct{}
}

func (es *eventStreamer) recvEventLoop(ctx context.Context, cancel context.CancelFunc, req *topicRequest, s *Server) error {
Expand Down Expand Up @@ -265,13 +265,13 @@ func newlineReader() io.Reader {

// outboxWriteLoop runs in a separate goroutine. Its job is to write the values in the outbox to
// the client as fast as the client can read them.
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *StreamingResponseWriterController) {
func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.CancelFunc, w *streamingResponseWriterController) {
var err error
defer func() {
if err != nil {
log.WithError(err).Debug("Event streamer shutting down due to error.")
}
es.cleanup()
es.exit()
}()
defer func() {
cancel()
Expand Down Expand Up @@ -313,20 +313,23 @@ func (es *eventStreamer) outboxWriteLoop(ctx context.Context, cancel context.Can
}
}

func (es *eventStreamer) cleanup() {
func (es *eventStreamer) exit() {
drained := 0
for range es.outbox {
drained += 1
}
log.WithField("undelivered_events", drained).Debug("Event stream outbox drained.")
close(es.cleanedUp)
close(es.openUntilExit)
}

func (es *eventStreamer) waitForCleanup() {
<-es.cleanedUp
// waitForExit blocks until the outboxWriteLoop has exited.
// While this function blocks, it is not yet safe to exit the http handler,
// because the outboxWriteLoop may still be writing to the http ResponseWriter.
func (es *eventStreamer) waitForExit() {
<-es.openUntilExit
}

func writeLazyReaderWithRecover(w *StreamingResponseWriterController, lr lazyReader) (err error) {
func writeLazyReaderWithRecover(w *streamingResponseWriterController, lr lazyReader) (err error) {
defer func() {
if r := recover(); r != nil {
log.WithField("panic", r).Error("Recovered from panic while writing event to client.")
Expand All @@ -342,7 +345,11 @@ func writeLazyReaderWithRecover(w *StreamingResponseWriterController, lr lazyRea
return err
}

func (es *eventStreamer) writeOutbox(ctx context.Context, w *StreamingResponseWriterController, first lazyReader) error {
func (es *eventStreamer) writeOutbox(ctx context.Context, w *streamingResponseWriterController, first lazyReader) error {
// The outboxWriteLoop is responsible for managing the keep-alive timer and toggling between reading from the outbox
// when it is ready, only allowing the keep-alive to fire when there hasn't been a write in the keep-alive interval.
// Since outboxWriteLoop will get either the first event or the keep-alive, we let it pass in the first event to write,
// either the event's lazyReader, or nil for a keep-alive.
needKeepAlive := true
if first != nil {
if err := writeLazyReaderWithRecover(w, first); err != nil {
Expand All @@ -358,6 +365,11 @@ func (es *eventStreamer) writeOutbox(ctx context.Context, w *StreamingResponseWr
case <-ctx.Done():
return ctx.Err()
case rf := <-es.outbox:
// We don't want to call Flush until we've exhausted all the writes - it's always preferrable to
// just keep draining the outbox and rely on the underlying Write code to flush+block when it
// needs to based on buffering. Whenever we fill the buffer with a string of writes, the underlying
// code will flush on its own, so it's better to explicitly flush only once, after we've totally
// drained the outbox, to catch any dangling bytes stuck in a buffer.
if err := writeLazyReaderWithRecover(w, rf); err != nil {
return err
}
Expand Down Expand Up @@ -659,22 +671,26 @@ func (s *Server) currentPayloadAttributes(ctx context.Context) (lazyReader, erro
}, nil
}

func NewStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *StreamingResponseWriterController {
func newStreamingResponseController(rw http.ResponseWriter, timeout time.Duration) *streamingResponseWriterController {
rc := http.NewResponseController(rw)
return &StreamingResponseWriterController{
return &streamingResponseWriterController{
timeout: timeout,
rw: rw,
rc: rc,
}
}

type StreamingResponseWriterController struct {
// streamingResponseWriterController provides an interface similar to an http.ResponseWriter,
// wrapping an http.ResponseWriter and an http.ResponseController, using the ResponseController
// to set and clear deadlines for Write and Flush methods, and delegating to the underlying
// types to Write and Flush.
type streamingResponseWriterController struct {
timeout time.Duration
rw http.ResponseWriter
rc *http.ResponseController
}

func (c *StreamingResponseWriterController) Write(b []byte) (int, error) {
func (c *streamingResponseWriterController) Write(b []byte) (int, error) {
if err := c.setDeadline(); err != nil {
return 0, err
}
Expand All @@ -685,15 +701,15 @@ func (c *StreamingResponseWriterController) Write(b []byte) (int, error) {
return out, c.clearDeadline()
}

func (c *StreamingResponseWriterController) setDeadline() error {
func (c *streamingResponseWriterController) setDeadline() error {
return c.rc.SetWriteDeadline(time.Now().Add(c.timeout))
}

func (c *StreamingResponseWriterController) clearDeadline() error {
func (c *streamingResponseWriterController) clearDeadline() error {
return c.rc.SetWriteDeadline(time.Time{})
}

func (c *StreamingResponseWriterController) Flush() error {
func (c *streamingResponseWriterController) Flush() error {
if err := c.setDeadline(); err != nil {
return err
}
Expand All @@ -702,5 +718,3 @@ func (c *StreamingResponseWriterController) Flush() error {
}
return c.clearDeadline()
}

var _ io.Writer = &StreamingResponseWriterController{}

0 comments on commit 23e54a8

Please sign in to comment.