diff --git a/beacon-chain/rpc/node/server.go b/beacon-chain/rpc/node/server.go index 2c8c13227abf..86544978af37 100644 --- a/beacon-chain/rpc/node/server.go +++ b/beacon-chain/rpc/node/server.go @@ -224,9 +224,11 @@ func (ns *Server) ListPeers(ctx context.Context, _ *ptypes.Empty) (*ethpb.Peers, // StreamBeaconLogs from the beacon node via a gRPC server-side stream. func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaconLogsServer) error { ch := make(chan []byte, ns.StreamLogsBufferSize) - defer close(ch) sub := ns.LogsStreamer.LogsFeed().Subscribe(ch) - defer sub.Unsubscribe() + defer func() { + sub.Unsubscribe() + close(ch) + }() recentLogs := ns.LogsStreamer.GetLastFewLogs() logStrings := make([]string, len(recentLogs)) @@ -247,6 +249,8 @@ func (ns *Server) StreamBeaconLogs(_ *ptypes.Empty, stream pb.Health_StreamBeaco if err := stream.Send(resp); err != nil { return status.Errorf(codes.Unavailable, "Could not send over stream: %v", err) } + case err := <-sub.Err(): + return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err) case <-stream.Context().Done(): return status.Error(codes.Canceled, "Context canceled") } diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index a9ae0ba97714..6043d4893cef 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -259,7 +259,7 @@ func (s *Service) Start() { } nodeServer := &node.Server{ LogsStreamer: logutil.NewStreamServer(), - StreamLogsBufferSize: 100, // Enough to handle bursts of beacon node logs for gRPC streaming. + StreamLogsBufferSize: 1000, // Enough to handle bursts of beacon node logs for gRPC streaming. BeaconDB: s.beaconDB, Server: s.grpcServer, SyncChecker: s.syncService, diff --git a/validator/rpc/health.go b/validator/rpc/health.go index 8c9d02854a6b..6d0e0725bfdb 100644 --- a/validator/rpc/health.go +++ b/validator/rpc/health.go @@ -82,9 +82,11 @@ func (s *Server) StreamBeaconLogs(req *ptypes.Empty, stream pb.Health_StreamBeac // StreamValidatorLogs from the validator client via a gRPC server-side stream. func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamValidatorLogsServer) error { ch := make(chan []byte, s.streamLogsBufferSize) - defer close(ch) sub := s.logsStreamer.LogsFeed().Subscribe(ch) - defer sub.Unsubscribe() + defer func() { + sub.Unsubscribe() + defer close(ch) + }() recentLogs := s.logsStreamer.GetLastFewLogs() logStrings := make([]string, len(recentLogs)) @@ -107,6 +109,8 @@ func (s *Server) StreamValidatorLogs(_ *ptypes.Empty, stream pb.Health_StreamVal } case <-s.ctx.Done(): return status.Error(codes.Canceled, "Context canceled") + case err := <-sub.Err(): + return status.Errorf(codes.Canceled, "Subscriber error, closing: %v", err) case <-stream.Context().Done(): return status.Error(codes.Canceled, "Context canceled") }