Skip to content

Commit

Permalink
Embed Config Pattern For Slasher, Slashing Protection (#8637)
Browse files Browse the repository at this point in the history
Co-authored-by: terence tsao <terence@prysmaticlabs.com>
  • Loading branch information
kevlu93 and terencechain authored Mar 21, 2021
1 parent 4a64d4d commit 14439d2
Show file tree
Hide file tree
Showing 19 changed files with 127 additions and 160 deletions.
8 changes: 4 additions & 4 deletions slasher/beaconclient/chain_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (s *Service) ChainHead(
) (*ethpb.ChainHead, error) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ChainHead")
defer span.End()
res, err := s.beaconClient.GetChainHead(ctx, &ptypes.Empty{})
res, err := s.cfg.BeaconClient.GetChainHead(ctx, &ptypes.Empty{})
if err != nil || res == nil {
return nil, errors.Wrap(err, "Could not retrieve chain head or got nil chain head")
}
Expand All @@ -36,7 +36,7 @@ func (s *Service) GenesisValidatorsRoot(
defer span.End()

if s.genesisValidatorRoot == nil {
res, err := s.nodeClient.GetGenesis(ctx, &ptypes.Empty{})
res, err := s.cfg.NodeClient.GetGenesis(ctx, &ptypes.Empty{})
if err != nil {
return nil, errors.Wrap(err, "could not retrieve genesis data")
}
Expand All @@ -51,7 +51,7 @@ func (s *Service) GenesisValidatorsRoot(
// Poll the beacon node every syncStatusPollingInterval until the node
// is no longer syncing.
func (s *Service) querySyncStatus(ctx context.Context) {
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}
Expand All @@ -65,7 +65,7 @@ func (s *Service) querySyncStatus(ctx context.Context) {
for {
select {
case <-ticker.C:
status, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
status, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
}
Expand Down
6 changes: 3 additions & 3 deletions slasher/beaconclient/chain_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestService_ChainHead(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
}
wanted := &ethpb.ChainHead{
HeadSlot: 4,
Expand All @@ -38,7 +38,7 @@ func TestService_GenesisValidatorsRoot(t *testing.T) {

client := mock.NewMockNodeClient(ctrl)
bs := Service{
nodeClient: client,
cfg: &Config{NodeClient: client},
}
wanted := &ethpb.Genesis{
GenesisValidatorsRoot: []byte("I am genesis"),
Expand All @@ -60,7 +60,7 @@ func TestService_QuerySyncStatus(t *testing.T) {
client := mock.NewMockNodeClient(ctrl)

bs := Service{
nodeClient: client,
cfg: &Config{NodeClient: client},
}
syncStatusPollingInterval = time.Millisecond
client.EXPECT().GetSyncStatus(gomock.Any(), gomock.Any()).Return(&ethpb.SyncStatus{
Expand Down
2 changes: 1 addition & 1 deletion slasher/beaconclient/historical_data_retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (s *Service) RequestHistoricalAttestations(
if res == nil {
res = &ethpb.ListIndexedAttestationsResponse{}
}
res, err = s.beaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
res, err = s.cfg.BeaconClient.ListIndexedAttestations(ctx, &ethpb.ListIndexedAttestationsRequest{
QueryFilter: &ethpb.ListIndexedAttestationsRequest_Epoch{
Epoch: epoch,
},
Expand Down
6 changes: 4 additions & 2 deletions slasher/beaconclient/historical_data_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ func TestService_RequestHistoricalAttestations(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
slasherDB: db,
cfg: &Config{
BeaconClient: client,
SlasherDB: db,
},
}

numAtts := 1000
Expand Down
12 changes: 6 additions & 6 deletions slasher/beaconclient/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ var reconnectPeriod = 5 * time.Second
func (s *Service) ReceiveBlocks(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveBlocks")
defer span.End()
stream, err := s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err := s.cfg.BeaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Failed to retrieve blocks stream")
return
Expand All @@ -53,7 +53,7 @@ func (s *Service) ReceiveBlocks(ctx context.Context) {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = s.beaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
stream, err = s.cfg.BeaconClient.StreamBlocks(ctx, &ethpb.StreamBlocksRequest{} /* Prefers unverified block to catch slashing */)
if err != nil {
log.WithError(err).Error("Could not restart block stream")
return
Expand Down Expand Up @@ -93,7 +93,7 @@ func (s *Service) ReceiveBlocks(ctx context.Context) {
func (s *Service) ReceiveAttestations(ctx context.Context) {
ctx, span := trace.StartSpan(ctx, "beaconclient.ReceiveAttestations")
defer span.End()
stream, err := s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err := s.cfg.BeaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Failed to retrieve attestations stream")
return
Expand Down Expand Up @@ -122,7 +122,7 @@ func (s *Service) ReceiveAttestations(ctx context.Context) {
log.WithError(err).Error("Could not restart beacon connection")
return
}
stream, err = s.beaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
stream, err = s.cfg.BeaconClient.StreamIndexedAttestations(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not restart attestation stream")
return
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *Service) collectReceivedAttestations(ctx context.Context) {
case att := <-s.receivedAttestationsBuffer:
atts = append(atts, att)
case collectedAtts := <-s.collectedAttestationsBuffer:
if err := s.slasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
if err := s.cfg.SlasherDB.SaveIndexedAttestations(ctx, collectedAtts); err != nil {
log.WithError(err).Error("Could not save indexed attestation")
continue
}
Expand Down Expand Up @@ -197,7 +197,7 @@ func (s *Service) restartBeaconConnection(ctx context.Context) error {
log.Info("Beacon node is still down")
continue
}
s, err := s.nodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
s, err := s.cfg.NodeClient.GetSyncStatus(ctx, &ptypes.Empty{})
if err != nil {
log.WithError(err).Error("Could not fetch sync status")
continue
Expand Down
12 changes: 7 additions & 5 deletions slasher/beaconclient/receivers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func TestService_ReceiveBlocks(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
blockFeed: new(event.Feed),
cfg: &Config{BeaconClient: client},
blockFeed: new(event.Feed),
}
stream := mock.NewMockBeaconChain_StreamBlocksClient(ctrl)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -46,7 +46,7 @@ func TestService_ReceiveAttestations(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
blockFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
Expand Down Expand Up @@ -78,9 +78,11 @@ func TestService_ReceiveAttestations_Batched(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
cfg: &Config{
BeaconClient: client,
SlasherDB: testDB.SetupSlasherDB(t, false),
},
blockFeed: new(event.Feed),
slasherDB: testDB.SetupSlasherDB(t, false),
attestationFeed: new(event.Feed),
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
Expand Down
28 changes: 8 additions & 20 deletions slasher/beaconclient/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,15 @@ type ChainFetcher interface {

// Service struct for the beaconclient service of the slasher.
type Service struct {
cfg *Config
ctx context.Context
cancel context.CancelFunc
cert string
conn *grpc.ClientConn
provider string
beaconClient ethpb.BeaconChainClient
slasherDB db.Database
nodeClient ethpb.NodeClient
clientFeed *event.Feed
blockFeed *event.Feed
attestationFeed *event.Feed
proposerSlashingsChan chan *ethpb.ProposerSlashing
attesterSlashingsChan chan *ethpb.AttesterSlashing
attesterSlashingsFeed *event.Feed
proposerSlashingsFeed *event.Feed
receivedAttestationsBuffer chan *ethpb.IndexedAttestation
collectedAttestationsBuffer chan []*ethpb.IndexedAttestation
publicKeyCache *cache.PublicKeyCache
Expand Down Expand Up @@ -82,23 +76,17 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
}

return &Service{
cert: cfg.BeaconCert,
cfg: cfg,
ctx: ctx,
cancel: cancel,
provider: cfg.BeaconProvider,
blockFeed: new(event.Feed),
clientFeed: new(event.Feed),
attestationFeed: new(event.Feed),
slasherDB: cfg.SlasherDB,
proposerSlashingsChan: make(chan *ethpb.ProposerSlashing, 1),
attesterSlashingsChan: make(chan *ethpb.AttesterSlashing, 1),
attesterSlashingsFeed: cfg.AttesterSlashingsFeed,
proposerSlashingsFeed: cfg.ProposerSlashingsFeed,
receivedAttestationsBuffer: make(chan *ethpb.IndexedAttestation, 1),
collectedAttestationsBuffer: make(chan []*ethpb.IndexedAttestation, 1),
publicKeyCache: publicKeyCache,
beaconClient: cfg.BeaconClient,
nodeClient: cfg.NodeClient,
}, nil
}

Expand Down Expand Up @@ -145,8 +133,8 @@ func (s *Service) Status() error {
// after they are detected by other services in the slasher.
func (s *Service) Start() {
var dialOpt grpc.DialOption
if s.cert != "" {
creds, err := credentials.NewClientTLSFromFile(s.cert, "")
if s.cfg.BeaconCert != "" {
creds, err := credentials.NewClientTLSFromFile(s.cfg.BeaconCert, "")
if err != nil {
log.Errorf("Could not get valid credentials: %v", err)
}
Expand All @@ -173,15 +161,15 @@ func (s *Service) Start() {
grpcutils.LogRequests,
)),
}
conn, err := grpc.DialContext(s.ctx, s.provider, beaconOpts...)
conn, err := grpc.DialContext(s.ctx, s.cfg.BeaconProvider, beaconOpts...)
if err != nil {
log.Fatalf("Could not dial endpoint: %s, %v", s.provider, err)
log.Fatalf("Could not dial endpoint: %s, %v", s.cfg.BeaconProvider, err)
}
s.beaconDialOptions = beaconOpts
log.Info("Successfully started gRPC connection")
s.conn = conn
s.beaconClient = ethpb.NewBeaconChainClient(s.conn)
s.nodeClient = ethpb.NewNodeClient(s.conn)
s.cfg.BeaconClient = ethpb.NewBeaconChainClient(s.conn)
s.cfg.NodeClient = ethpb.NewNodeClient(s.conn)

// We poll for the sync status of the beacon node until it is fully synced.
s.querySyncStatus(s.ctx)
Expand Down
8 changes: 4 additions & 4 deletions slasher/beaconclient/submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
func (s *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch chan *ethpb.ProposerSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitProposerSlashing")
defer span.End()
sub := s.proposerSlashingsFeed.Subscribe(ch)
sub := s.cfg.ProposerSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if _, err := s.beaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
if _, err := s.cfg.BeaconClient.SubmitProposerSlashing(ctx, slashing); err != nil {
log.Error(err)
}
case <-sub.Err():
Expand All @@ -43,14 +43,14 @@ func (s *Service) subscribeDetectedProposerSlashings(ctx context.Context, ch cha
func (s *Service) subscribeDetectedAttesterSlashings(ctx context.Context, ch chan *ethpb.AttesterSlashing) {
ctx, span := trace.StartSpan(ctx, "beaconclient.submitAttesterSlashing")
defer span.End()
sub := s.attesterSlashingsFeed.Subscribe(ch)
sub := s.cfg.AttesterSlashingsFeed.Subscribe(ch)
defer sub.Unsubscribe()
for {
select {
case slashing := <-ch:
if slashing != nil && slashing.Attestation_1 != nil && slashing.Attestation_2 != nil {
slashableIndices := sliceutil.IntersectionUint64(slashing.Attestation_1.AttestingIndices, slashing.Attestation_2.AttestingIndices)
_, err := s.beaconClient.SubmitAttesterSlashing(ctx, slashing)
_, err := s.cfg.BeaconClient.SubmitAttesterSlashing(ctx, slashing)
if err == nil {
log.WithFields(logrus.Fields{
"sourceEpoch": slashing.Attestation_1.Data.Source.Epoch,
Expand Down
12 changes: 8 additions & 4 deletions slasher/beaconclient/submit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ func TestService_SubscribeDetectedProposerSlashings(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
proposerSlashingsFeed: new(event.Feed),
cfg: &Config{
BeaconClient: client,
ProposerSlashingsFeed: new(event.Feed),
},
}

slashing := &ethpb.ProposerSlashing{
Expand Down Expand Up @@ -61,8 +63,10 @@ func TestService_SubscribeDetectedAttesterSlashings(t *testing.T) {
client := mock.NewMockBeaconChainClient(ctrl)

bs := Service{
beaconClient: client,
attesterSlashingsFeed: new(event.Feed),
cfg: &Config{
BeaconClient: client,
AttesterSlashingsFeed: new(event.Feed),
},
}

slashing := &ethpb.AttesterSlashing{
Expand Down
2 changes: 1 addition & 1 deletion slasher/beaconclient/validator_retrieval.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Service) FindOrGetPublicKeys(
if notFound == 0 {
return validators, nil
}
vc, err := s.beaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
vc, err := s.cfg.BeaconClient.ListValidators(ctx, &ethpb.ListValidatorsRequest{
Indices: validatorIndices,
})
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion slasher/beaconclient/validator_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestService_RequestValidator(t *testing.T) {
validatorCache, err := cache.NewPublicKeyCache(0, nil)
require.NoError(t, err, "Could not create new cache")
bs := Service{
beaconClient: client,
cfg: &Config{BeaconClient: client},
publicKeyCache: validatorCache,
}
wanted := &ethpb.Validators{
Expand Down
8 changes: 4 additions & 4 deletions slasher/detection/detect.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (s *Service) DetectAttesterSlashings(
}
}
if len(slashings) > 0 {
if err := s.slasherDB.SaveAttesterSlashings(ctx, status.Active, slashings); err != nil {
if err := s.cfg.SlasherDB.SaveAttesterSlashings(ctx, status.Active, slashings); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -187,7 +187,7 @@ func (s *Service) mapResultsToAtts(ctx context.Context, results []*types.Detecti
if _, ok := resultsToAtts[resultKey]; ok {
continue
}
matchingAtts, err := s.slasherDB.IndexedAttestationsWithPrefix(ctx, result.SlashableEpoch, result.SigBytes[:])
matchingAtts, err := s.cfg.SlasherDB.IndexedAttestationsWithPrefix(ctx, result.SlashableEpoch, result.SigBytes[:])
if err != nil {
return nil, err
}
Expand All @@ -214,7 +214,7 @@ func isDoubleVote(incomingAtt, prevAtt *ethpb.IndexedAttestation) bool {
// UpdateHighestAttestation updates to the db the highest source and target attestations for a each validator.
func (s *Service) UpdateHighestAttestation(ctx context.Context, att *ethpb.IndexedAttestation) error {
for _, idx := range att.AttestingIndices {
h, err := s.slasherDB.HighestAttestation(ctx, idx)
h, err := s.cfg.SlasherDB.HighestAttestation(ctx, idx)
if err != nil {
return err
}
Expand All @@ -238,7 +238,7 @@ func (s *Service) UpdateHighestAttestation(ctx context.Context, att *ethpb.Index

// If it's not a new instance of HighestAttestation, changing it will also change the cached instance.
if update {
if err := s.slasherDB.SaveHighestAttestation(ctx, h); err != nil {
if err := s.cfg.SlasherDB.SaveHighestAttestation(ctx, h); err != nil {
return err
}
}
Expand Down
Loading

0 comments on commit 14439d2

Please sign in to comment.