Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reconnect slasher streams on beacon node shutdown #5376

Merged
merged 10 commits into from
Apr 10, 2020
62 changes: 61 additions & 1 deletion slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package beaconclient

import (
"context"
"errors"
"io"
"strings"
"time"

ptypes "github.com/gogo/protobuf/types"
Expand All @@ -11,6 +13,10 @@ import (
"go.opencensus.io/trace"
)

// ReconnectPeriod represents the frequency in which we check the number of
// open connections.
var ReconnectPeriod = 5 * time.Second
shayzluf marked this conversation as resolved.
Show resolved Hide resolved

// receiveBlocks starts a gRPC client stream listener to obtain
// blocks from the beacon node. Upon receiving a block, the service
// broadcasts it to a feed for other services in slasher to subscribe to.
Expand All @@ -33,6 +39,13 @@ func (bs *Service) receiveBlocks(ctx context.Context) {
log.WithError(ctx.Err()).Error("Context canceled - shutting down blocks receiver")
return
}
if err != nil && strings.Contains(err.Error(), "Context canceled") {
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
stream, err = bs.restartBlockStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
}
if err != nil {
log.WithError(err).Error("Could not receive block from beacon node")
break
Expand Down Expand Up @@ -70,9 +83,16 @@ func (bs *Service) receiveAttestations(ctx context.Context) {
log.WithError(ctx.Err()).Error("Context canceled - shutting down attestations receiver")
return
}
if err != nil && strings.Contains(err.Error(), "Context canceled") {
stream, err = bs.restartIndexedAttestationStream(ctx)
if err != nil {
log.WithError(err).Error("Could not restart stream")
return
}
}
if err != nil {
log.WithError(err).Error("Could not receive attestations from beacon node")
continue
break
}
if res == nil {
continue
Expand Down Expand Up @@ -120,3 +140,43 @@ func (bs *Service) collectReceivedAttestations(ctx context.Context) {
}
}
}

func (bs *Service) restartIndexedAttestationStream(ctx context.Context) (ethpb.BeaconChain_StreamIndexedAttestationsClient, error) {
ticker := time.NewTicker(ReconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, trying to restart attestation stream")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
stream, err := bs.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Attestation stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, quit restart stream retries")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
}
}

}

func (bs *Service) restartBlockStream(ctx context.Context) (ethpb.BeaconChain_StreamBlocksClient, error) {
ticker := time.NewTicker(ReconnectPeriod)
for {
select {
case <-ticker.C:
log.Info("Context closed, trying to restart block stream")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
stream, err := bs.beaconClient.StreamBlocks(ctx, &ptypes.Empty{})
if err != nil {
continue
}
log.Info("Block stream restarted...")
return stream, nil
case <-ctx.Done():
log.Debug("Context closed, exiting reconnect routine")
return nil, errors.New("context closed, quit restart stream retries")
shayzluf marked this conversation as resolved.
Show resolved Hide resolved
}
}

}