Skip to content

Commit

Permalink
Better Handling of Subscriber Errors in Logs Streams (#8505)
Browse files Browse the repository at this point in the history
* handle subscriber error and increase buffer sizes

* operation order in unsub and close
  • Loading branch information
rauljordan authored Feb 23, 2021
1 parent e40fba7 commit 0c59952
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
8 changes: 6 additions & 2 deletions beacon-chain/rpc/node/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,11 @@ func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.Peers,
// StreamBeaconLogs from the beacon node via a gRPC server-side stream.
func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaconLogsServer) error {
ch := make(chan []byte, ns.StreamLogsBufferSize)
defer close(ch)
sub := ns.LogsStreamer.LogsFeed().Subscribe(ch)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
close(ch)
}()

recentLogs := ns.LogsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
Expand All @@ -247,6 +249,8 @@ func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaco
if err := stream.Send(resp); err != nil {
return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err)
}
case err := <-sub.Err():
return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err)
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/rpc/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (s *Service) Start() {
}
nodeServer := &node.Server{
LogsStreamer: logutil.NewStreamServer(),
StreamLogsBufferSize: 100, // Enough to handle bursts of beacon node logs for gRPC streaming.
StreamLogsBufferSize: 1000, // Enough to handle bursts of beacon node logs for gRPC streaming.
BeaconDB: s.beaconDB,
Server: s.grpcServer,
SyncChecker: s.syncService,
Expand Down
8 changes: 6 additions & 2 deletions validator/rpc/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,11 @@ func (s *Server) StreamBeaconLogs(req *ptypes.Empty, stream pb.Health_StreamBeac
// StreamValidatorLogs from the validator client via a gRPC server-side stream.
func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamValidatorLogsServer) error {
ch := make(chan []byte, s.streamLogsBufferSize)
defer close(ch)
sub := s.logsStreamer.LogsFeed().Subscribe(ch)
defer sub.Unsubscribe()
defer func() {
sub.Unsubscribe()
defer close(ch)
}()

recentLogs := s.logsStreamer.GetLastFewLogs()
logStrings := make([]string, len(recentLogs))
Expand All @@ -107,6 +109,8 @@ func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamVal
}
case <-s.ctx.Done():
return status.Error(codes.Canceled, "Context canceled")
case err := <-sub.Err():
return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err)
case <-stream.Context().Done():
return status.Error(codes.Canceled, "Context canceled")
}
Expand Down

0 comments on commit 0c59952

Please sign in to comment.