From cc6c1c9e7b060ab265ad0078b0ef1086a221b214 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 08:46:32 +0300 Subject: [PATCH 1/8] restart streams on beacon node shutdown --- slasher/beaconclient/receivers.go | 62 ++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 4b7220fb8bfd..771879c682f0 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -2,7 +2,9 @@ package beaconclient import ( "context" + "errors" "io" + "strings" "time" ptypes "github.com/gogo/protobuf/types" @@ -11,6 +13,10 @@ import ( "go.opencensus.io/trace" ) +// ReconnectPeriod represents the frequency in which we check the number of +// open connections. +var ReconnectPeriod = 5 * time.Second + // receiveBlocks starts a gRPC client stream listener to obtain // blocks from the beacon node. Upon receiving a block, the service // broadcasts it to a feed for other services in slasher to subscribe to. @@ -33,6 +39,13 @@ func (bs *Service) receiveBlocks(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver") return } + if err != nil && strings.Contains(err.Error(), "Context canceled") { + stream, err = bs.restartBlockStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + } if err != nil { log.WithError(err).Error("Could not receive block from beacon node") break @@ -70,9 +83,16 @@ func (bs *Service) receiveAttestations(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver") return } + if err != nil && strings.Contains(err.Error(), "Context canceled") { + stream, err = bs.restartIndexedAttestationStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + } if err != nil { log.WithError(err).Error("Could not receive attestations from beacon node") - continue + break } if res == nil { continue @@ -120,3 +140,43 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) { } } } + +func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) { + ticker := time.NewTicker(ReconnectPeriod) + for { + select { + case <-ticker.C: + log.Info("Context closed, trying to restart attestation stream") + stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) + if err != nil { + continue + } + log.Info("Attestation stream restarted...") + return stream, nil + case <-ctx.Done(): + log.Debug("Context closed, exiting reconnect routine") + return nil, errors.New("context closed, quit restart stream retries") + } + } + +} + +func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) { + ticker := time.NewTicker(ReconnectPeriod) + for { + select { + case <-ticker.C: + log.Info("Context closed, trying to restart block stream") + stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) + if err != nil { + continue + } + log.Info("Block stream restarted...") + return stream, nil + case <-ctx.Done(): + log.Debug("Context closed, exiting reconnect routine") + return nil, errors.New("context closed, quit restart stream retries") + } + } + +} From 2eb955d130e66b6fdb1232d9293e066de33900ae Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 09:41:09 +0300 Subject: [PATCH 2/8] fix comment --- slasher/beaconclient/receivers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 771879c682f0..8c509ac311cc 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -13,8 +13,8 @@ import ( "go.opencensus.io/trace" ) -// ReconnectPeriod represents the frequency in which we check the number of -// open connections. +// ReconnectPeriod represents the frequency in which we try to restart our +// streams when beacon chain is down. var ReconnectPeriod = 5 * time.Second // receiveBlocks starts a gRPC client stream listener to obtain From 0b42280b236024fdbd75a69a62704054f6e61a04 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 09:42:42 +0300 Subject: [PATCH 3/8] remove export --- slasher/beaconclient/receivers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 8c509ac311cc..aad2f684cd40 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -13,9 +13,9 @@ import ( "go.opencensus.io/trace" ) -// ReconnectPeriod represents the frequency in which we try to restart our +// represents the frequency in which we try to restart our // streams when beacon chain is down. -var ReconnectPeriod = 5 * time.Second +var reconnectPeriod = 5 * time.Second // receiveBlocks starts a gRPC client stream listener to obtain // blocks from the beacon node. Upon receiving a block, the service @@ -142,7 +142,7 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) { } func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) { - ticker := time.NewTicker(ReconnectPeriod) + ticker := time.NewTicker(reconnectPeriod) for { select { case <-ticker.C: @@ -162,7 +162,7 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B } func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) { - ticker := time.NewTicker(ReconnectPeriod) + ticker := time.NewTicker(reconnectPeriod) for { select { case <-ticker.C: From 29a96e683058925b0af4e0a401ea6bddaab5d821 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 09:45:07 +0300 Subject: [PATCH 4/8] ivan feedback --- slasher/beaconclient/receivers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index aad2f684cd40..7104e8ccf888 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -39,7 +39,7 @@ func (bs *Service) receiveBlocks(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver") return } - if err != nil && strings.Contains(err.Error(), "Context canceled") { + if err != nil && strings.Contains(err.Error(), context.Canceled.Error()) { stream, err = bs.restartBlockStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") @@ -83,7 +83,7 @@ func (bs *Service) receiveAttestations(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver") return } - if err != nil && strings.Contains(err.Error(), "Context canceled") { + if err != nil && strings.Contains(err.Error(), context.Canceled.Error()) { stream, err = bs.restartIndexedAttestationStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") From 9c01009b643ca93d45bd3ef6d40eee7cf4e4aff4 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 09:48:22 +0300 Subject: [PATCH 5/8] ivan feedback --- slasher/beaconclient/receivers.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 7104e8ccf888..eeb852bc9fd7 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -146,7 +146,7 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B for { select { case <-ticker.C: - log.Info("Context closed, trying to restart attestation stream") + log.Info("Context closed, attempting to restart attestation stream") stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{}) if err != nil { continue @@ -155,7 +155,7 @@ func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.B return stream, nil case <-ctx.Done(): log.Debug("Context closed, exiting reconnect routine") - return nil, errors.New("context closed, quit restart stream retries") + return nil, errors.New("context closed, no longer attempting to restart stream") } } @@ -166,7 +166,7 @@ func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_St for { select { case <-ticker.C: - log.Info("Context closed, trying to restart block stream") + log.Info("Context closed, attempting to restart block stream") stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{}) if err != nil { continue @@ -175,7 +175,7 @@ func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_St return stream, nil case <-ctx.Done(): log.Debug("Context closed, exiting reconnect routine") - return nil, errors.New("context closed, quit restart stream retries") + return nil, errors.New("context closed, no longer attempting to restart stream") } } From 2197cd84d3582a251f2f493c9eece09d1ed6d623 Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 09:52:19 +0300 Subject: [PATCH 6/8] case insensitive --- slasher/beaconclient/receivers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index eeb852bc9fd7..759c566c46bb 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -39,7 +39,7 @@ func (bs *Service) receiveBlocks(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver") return } - if err != nil && strings.Contains(err.Error(), context.Canceled.Error()) { + if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) { stream, err = bs.restartBlockStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") @@ -83,7 +83,7 @@ func (bs *Service) receiveAttestations(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver") return } - if err != nil && strings.Contains(err.Error(), context.Canceled.Error()) { + if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) { stream, err = bs.restartIndexedAttestationStream(ctx) if err != nil { log.WithError(err).Error("Could not restart stream") From fdd57cb59d7b054c8207817e2a9690e781f54f6a Mon Sep 17 00:00:00 2001 From: Ivan Martinez Date: Fri, 10 Apr 2020 03:09:56 -0400 Subject: [PATCH 7/8] Update slasher/beaconclient/receivers.go --- slasher/beaconclient/receivers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 759c566c46bb..3dff86433a6f 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -13,8 +13,8 @@ import ( "go.opencensus.io/trace" ) -// represents the frequency in which we try to restart our -// streams when beacon chain is down. +// reconnectPeriod is the frequency that we try to restart our +// streams when the beacon chain is node does not respond. var reconnectPeriod = 5 * time.Second // receiveBlocks starts a gRPC client stream listener to obtain From 051607a06a9ad3c36fd29ec8df9e346f37d80afa Mon Sep 17 00:00:00 2001 From: shayzluf Date: Fri, 10 Apr 2020 19:16:08 +0300 Subject: [PATCH 8/8] raul feedback --- slasher/beaconclient/BUILD.bazel | 2 ++ slasher/beaconclient/receivers.go | 51 ++++++++++++++++++++----------- 2 files changed, 36 insertions(+), 17 deletions(-) diff --git a/slasher/beaconclient/BUILD.bazel b/slasher/beaconclient/BUILD.bazel index 499ea9adfd3d..6759712b4d2d 100644 --- a/slasher/beaconclient/BUILD.bazel +++ b/slasher/beaconclient/BUILD.bazel @@ -30,7 +30,9 @@ go_library( "@io_opencensus_go//plugin/ocgrpc:go_default_library", "@io_opencensus_go//trace:go_default_library", "@org_golang_google_grpc//:go_default_library", + "@org_golang_google_grpc//codes:go_default_library", "@org_golang_google_grpc//credentials:go_default_library", + "@org_golang_google_grpc//status:go_default_library", ], ) diff --git a/slasher/beaconclient/receivers.go b/slasher/beaconclient/receivers.go index 759c566c46bb..b58d6453ccad 100644 --- a/slasher/beaconclient/receivers.go +++ b/slasher/beaconclient/receivers.go @@ -4,13 +4,14 @@ import ( "context" "errors" "io" - "strings" "time" ptypes "github.com/gogo/protobuf/types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/sirupsen/logrus" "go.opencensus.io/trace" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // represents the frequency in which we try to restart our @@ -39,17 +40,25 @@ func (bs *Service) receiveBlocks(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver") return } - if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) { - stream, err = bs.restartBlockStream(ctx) - if err != nil { - log.WithError(err).Error("Could not restart stream") + if err != nil { + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Canceled: + stream, err = bs.restartBlockStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + break + default: + log.WithError(err).Errorf("Could not receive block from beacon node. rpc status: %v", e.Code()) + return + } + } else { + log.WithError(err).Error("Could not receive blocks from beacon node") return } } - if err != nil { - log.WithError(err).Error("Could not receive block from beacon node") - break - } if res == nil { continue } @@ -83,17 +92,25 @@ func (bs *Service) receiveAttestations(ctx context.Context) { log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver") return } - if err != nil && strings.Contains(strings.ToLower(err.Error()), strings.ToLower(context.Canceled.Error())) { - stream, err = bs.restartIndexedAttestationStream(ctx) - if err != nil { - log.WithError(err).Error("Could not restart stream") + if err != nil { + if e, ok := status.FromError(err); ok { + switch e.Code() { + case codes.Canceled: + stream, err = bs.restartIndexedAttestationStream(ctx) + if err != nil { + log.WithError(err).Error("Could not restart stream") + return + } + break + default: + log.WithError(err).Errorf("Could not receive attestations from beacon node. rpc status: %v", e.Code()) + return + } + } else { + log.WithError(err).Error("Could not receive attestations from beacon node") return } } - if err != nil { - log.WithError(err).Error("Could not receive attestations from beacon node") - break - } if res == nil { continue }