diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 2b7a5c5e770c..4abc1eb8d6ad 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ - "aggregate.go", "log.go", "metrics.go", "pool.go", @@ -34,7 +33,6 @@ go_library( go_test( name = "go_default_test", srcs = [ - "aggregate_test.go", "pool_test.go", "prepare_forkchoice_test.go", "prune_expired_test.go", @@ -50,6 +48,5 @@ go_test( "@com_github_gogo_protobuf//proto:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", - "@in_gopkg_d4l3k_messagediff_v1//:go_default_library", ], ) diff --git a/beacon-chain/operations/attestations/aggregate.go b/beacon-chain/operations/attestations/aggregate.go deleted file mode 100644 index 5f37d59216ae..000000000000 --- a/beacon-chain/operations/attestations/aggregate.go +++ /dev/null @@ -1,79 +0,0 @@ -package attestations - -import ( - "context" - "time" - - ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/go-ssz" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/shared/params" - "go.opencensus.io/trace" -) - -// Define time to aggregate the unaggregated attestations at 2 times per slot, this gives -// enough confidence all the unaggregated attestations will be aggregated as aggregator requests. -var timeToAggregate = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second - -// This kicks off a routine to aggregate the unaggregated attestations from pool. -func (s *Service) aggregateRoutine() { - ticker := time.NewTicker(timeToAggregate) - ctx := context.TODO() - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - attsToBeAggregated := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) - if err := s.aggregateAttestations(ctx, attsToBeAggregated); err != nil { - log.WithError(err).Error("Could not aggregate attestation") - } - - // Update metrics for aggregated and unaggregated attestations count. - s.updateMetrics() - } - } -} - -// This aggregates the input attestations via AggregateAttestations helper -// function. -func (s *Service) aggregateAttestations(ctx context.Context, attsToBeAggregated []*ethpb.Attestation) error { - ctx, span := trace.StartSpan(ctx, "Operations.attestations.aggregateAttestations") - defer span.End() - - attsByRoot := make(map[[32]byte][]*ethpb.Attestation) - - for _, att := range attsToBeAggregated { - attDataRoot, err := ssz.HashTreeRoot(att.Data) - if err != nil { - return err - } - attsByRoot[attDataRoot] = append(attsByRoot[attDataRoot], att) - } - - for _, atts := range attsByRoot { - for _, att := range atts { - if !helpers.IsAggregated(att) && len(atts) > 1 { - if err := s.pool.DeleteUnaggregatedAttestation(att); err != nil { - return err - } - } - } - } - - for _, atts := range attsByRoot { - aggregatedAtts, err := helpers.AggregateAttestations(atts) - if err != nil { - return err - } - for _, att := range aggregatedAtts { - if helpers.IsAggregated(att) { - if err := s.pool.SaveAggregatedAttestation(att); err != nil { - return err - } - } - } - } - - return nil -} diff --git a/beacon-chain/operations/attestations/aggregate_test.go b/beacon-chain/operations/attestations/aggregate_test.go deleted file mode 100644 index b7e193a9280d..000000000000 --- a/beacon-chain/operations/attestations/aggregate_test.go +++ /dev/null @@ -1,127 +0,0 @@ -package attestations - -import ( - "context" - "reflect" - "sort" - "testing" - - "github.com/gogo/protobuf/proto" - ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" - "github.com/prysmaticlabs/go-bitfield" - "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" - "github.com/prysmaticlabs/prysm/shared/bls" - "gopkg.in/d4l3k/messagediff.v1" -) - -func TestAggregateAttestations_SingleAttestation(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool: NewPool()}) - if err != nil { - t.Fatal(err) - } - - sk := bls.RandKey() - sig := sk.Sign([]byte("dummy_test_data")) - - unaggregatedAtts := []*ethpb.Attestation{ - {Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, - } - - if err := s.aggregateAttestations(context.Background(), unaggregatedAtts); err != nil { - t.Fatal(err) - } - - if len(s.pool.AggregatedAttestations()) != 0 { - t.Error("Nothing should be aggregated") - } - - if len(s.pool.UnaggregatedAttestations()) != 0 { - t.Error("Unaggregated pool should be empty") - } -} - -func TestAggregateAttestations_MultipleAttestationsSameRoot(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool: NewPool()}) - if err != nil { - t.Fatal(err) - } - - sk := bls.RandKey() - sig := sk.Sign([]byte("dummy_test_data")) - - data := ðpb.AttestationData{ - Source: ðpb.Checkpoint{}, - Target: ðpb.Checkpoint{}, - } - attsToBeAggregated := []*ethpb.Attestation{ - {Data: data, AggregationBits: bitfield.Bitlist{0b110001}, Signature: sig.Marshal()}, - {Data: data, AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, - {Data: data, AggregationBits: bitfield.Bitlist{0b101100}, Signature: sig.Marshal()}, - } - - if err := s.aggregateAttestations(context.Background(), attsToBeAggregated); err != nil { - t.Fatal(err) - } - - if len(s.pool.UnaggregatedAttestations()) != 0 { - t.Error("Nothing should be unaggregated") - } - - wanted, err := helpers.AggregateAttestations(attsToBeAggregated) - if err != nil { - t.Fatal(err) - } - got := s.pool.AggregatedAttestations() - if !reflect.DeepEqual(wanted, got) { - diff, _ := messagediff.PrettyDiff(got[0], wanted[0]) - t.Log(diff) - t.Error("Did not aggregate attestations") - } -} - -func TestAggregateAttestations_MultipleAttestationsDifferentRoots(t *testing.T) { - s, err := NewService(context.Background(), &Config{Pool: NewPool()}) - if err != nil { - t.Fatal(err) - } - mockRoot := [32]byte{} - d := ðpb.AttestationData{ - BeaconBlockRoot: mockRoot[:], - Source: ðpb.Checkpoint{Root: mockRoot[:]}, - Target: ðpb.Checkpoint{Root: mockRoot[:]}, - } - d1 := proto.Clone(d).(*ethpb.AttestationData) - d1.Slot = 1 - d2 := proto.Clone(d).(*ethpb.AttestationData) - d2.Slot = 2 - - sk := bls.RandKey() - sig := sk.Sign([]byte("dummy_test_data")) - - atts := []*ethpb.Attestation{ - {Data: d, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, - {Data: d, AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, - {Data: d1, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, - {Data: d1, AggregationBits: bitfield.Bitlist{0b100110}, Signature: sig.Marshal()}, - {Data: d2, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, - } - - if err := s.aggregateAttestations(context.Background(), atts); err != nil { - t.Fatal(err) - } - - if len(s.pool.UnaggregatedAttestations()) != 0 { - t.Error("Unaggregated att pool did not clean up") - } - - received := s.pool.AggregatedAttestations() - sort.Slice(received, func(i, j int) bool { - return received[i].Data.Slot < received[j].Data.Slot - }) - att1, _ := helpers.AggregateAttestations([]*ethpb.Attestation{atts[0], atts[1]}) - att2, _ := helpers.AggregateAttestations([]*ethpb.Attestation{atts[2], atts[3]}) - wanted := append(att1, att2...) - if !reflect.DeepEqual(wanted, received) { - t.Error("Did not aggregate attestations") - } -} diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index bf8759f20d3b..f0eb52b28110 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -52,6 +52,22 @@ func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation { return atts } +// UnAggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache, +// filtered by committee index and slot. +func (p *AttCaches) UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0) + + p.unAggregateAttLock.RLock() + defer p.unAggregateAttLock.RUnlock() + for _, a := range p.unAggregatedAtt { + if slot == a.Data.Slot && committeeIndex == a.Data.CommitteeIndex { + atts = append(atts, a) + } + } + + return atts +} + // DeleteUnaggregatedAttestation deletes the unaggregated attestations in cache. func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error { if att == nil { @@ -73,6 +89,29 @@ func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error return nil } +// DeleteUnaggregatedAttestations deletes the unaggregated attestations in cache. +func (p *AttCaches) DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error { + p.unAggregateAttLock.Lock() + defer p.unAggregateAttLock.Unlock() + for _, att := range atts { + if att == nil { + continue + } + if helpers.IsAggregated(att) { + return errors.New("attestation is aggregated") + } + + r, err := hashFn(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + delete(p.unAggregatedAtt, r) + } + + return nil +} + // UnaggregatedAttestationCount returns the number of unaggregated attestations key in the pool. func (p *AttCaches) UnaggregatedAttestationCount() int { p.unAggregateAttLock.RLock() diff --git a/beacon-chain/operations/attestations/kv/unaggregated_test.go b/beacon-chain/operations/attestations/kv/unaggregated_test.go index 6bdec2a713ac..c32208419e20 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated_test.go +++ b/beacon-chain/operations/attestations/kv/unaggregated_test.go @@ -51,3 +51,51 @@ func TestKV_Unaggregated_CanDelete(t *testing.T) { t.Error("Did not receive correct aggregated atts") } } + +func TestKV_Unaggregated_FilterBySlotsAndCommitteeIDs(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}} + att4 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}} + + atts := []*ethpb.Attestation{att1, att2, att3, att4} + + for _, att := range atts { + if err := cache.SaveUnaggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + + returned := cache.UnAggregatedAttestationsBySlotIndex(1, 1) + + if !reflect.DeepEqual([]*ethpb.Attestation{att1, att3}, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_Unaggregated_BatchDelete(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}} + att4 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}} + + atts := []*ethpb.Attestation{att1, att2, att3, att4} + + for _, att := range atts { + if err := cache.SaveUnaggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + if err := cache.DeleteUnaggregatedAttestations(atts); err != nil { + t.Fatal(err) + } + + returned := cache.UnaggregatedAttestations() + if !reflect.DeepEqual([]*ethpb.Attestation{}, returned) { + t.Error("Did not receive correct aggregated atts") + } +} diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index b03b98969158..7cddc8bfb36a 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -22,7 +22,9 @@ type Pool interface { SaveUnaggregatedAttestation(att *ethpb.Attestation) error SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error UnaggregatedAttestations() []*ethpb.Attestation + UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation DeleteUnaggregatedAttestation(att *ethpb.Attestation) error + DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error UnaggregatedAttestationCount() int // For attestations that were included in the block. SaveBlockAttestation(att *ethpb.Attestation) error diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go index c17f8aa73081..44b9afa7ef4d 100644 --- a/beacon-chain/operations/attestations/service.go +++ b/beacon-chain/operations/attestations/service.go @@ -43,7 +43,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { // Start an attestation pool service's main event loop. func (s *Service) Start() { go s.prepareForkChoiceAtts() - go s.aggregateRoutine() go s.pruneAttsPool() } diff --git a/beacon-chain/rpc/validator/BUILD.bazel b/beacon-chain/rpc/validator/BUILD.bazel index cb017e91e57e..6f80649e3c09 100644 --- a/beacon-chain/rpc/validator/BUILD.bazel +++ b/beacon-chain/rpc/validator/BUILD.bazel @@ -41,8 +41,6 @@ go_library( "//shared/featureconfig:go_default_library", "//shared/hashutil:go_default_library", "//shared/params:go_default_library", - "//shared/roughtime:go_default_library", - "//shared/slotutil:go_default_library", "//shared/traceutil:go_default_library", "//shared/trieutil:go_default_library", "@com_github_gogo_protobuf//types:go_default_library", diff --git a/beacon-chain/rpc/validator/aggregator.go b/beacon-chain/rpc/validator/aggregator.go index ecdfcd8047b2..aa4646f25ab8 100644 --- a/beacon-chain/rpc/validator/aggregator.go +++ b/beacon-chain/rpc/validator/aggregator.go @@ -55,7 +55,21 @@ func (as *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb. } // Retrieve the unaggregated attestation from pool. + unaggregatedAtts := as.AttPool.UnAggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) + // In case there's left over aggregated attestations in the pool. aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) + aggregatedAtts, err = helpers.AggregateAttestations(append(aggregatedAtts, unaggregatedAtts...)) + if err != nil { + return nil, status.Errorf(codes.Internal, "Could not get aggregate attestations: %v", err) + } + + // Save the aggregated attestations to the pool. + if err := as.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + return nil, status.Errorf(codes.Internal, "Could not save aggregated attestations: %v", err) + } + if err := as.AttPool.DeleteUnaggregatedAttestations(unaggregatedAtts); err != nil { + return nil, status.Errorf(codes.Internal, "Could not delete unaggregated attestations: %v", err) + } // Filter out the best aggregated attestation (ie. the one with the most aggregated bits). if len(aggregatedAtts) == 0 { diff --git a/beacon-chain/rpc/validator/attester.go b/beacon-chain/rpc/validator/attester.go index 184143fa757f..025a04150661 100644 --- a/beacon-chain/rpc/validator/attester.go +++ b/beacon-chain/rpc/validator/attester.go @@ -2,7 +2,6 @@ package validator import ( "context" - "time" ptypes "github.com/gogo/protobuf/types" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -10,7 +9,6 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/cache" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed" "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/operation" - statefeed "github.com/prysmaticlabs/prysm/beacon-chain/core/feed/state" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/beacon-chain/core/state" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" @@ -18,8 +16,6 @@ import ( "github.com/prysmaticlabs/prysm/shared/bytesutil" "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/params" - "github.com/prysmaticlabs/prysm/shared/roughtime" - "github.com/prysmaticlabs/prysm/shared/slotutil" "go.opencensus.io/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -46,10 +42,6 @@ func (vs *Server) GetAttestationData(ctx context.Context, req *ethpb.Attestation return nil, status.Error(codes.InvalidArgument, msgInvalidAttestationRequest) } - // Attester will either wait until there's a valid block from the expected block proposer of for the assigned input slot - // or one third of the slot has transpired. Whichever comes first. - vs.waitToOneThird(ctx, req.Slot) - res, err := vs.AttestationCache.Get(ctx, req) if err != nil { return nil, status.Errorf(codes.Internal, "Could not retrieve data from attestation cache: %v", err) @@ -187,44 +179,6 @@ func (vs *Server) ProposeAttestation(ctx context.Context, att *ethpb.Attestation }, nil } -// waitToOneThird waits until one-third of the way through the slot -// or the head slot equals to the input slot. -func (vs *Server) waitToOneThird(ctx context.Context, slot uint64) { - _, span := trace.StartSpan(ctx, "validator.waitToOneThird") - defer span.End() - - // Don't need to wait if current slot is greater than requested slot. - if slot < vs.GenesisTimeFetcher.CurrentSlot() { - return - } - - // Set time out to be at start slot time + one-third of slot duration. - slotStartTime := slotutil.SlotStartTime(uint64(vs.GenesisTimeFetcher.GenesisTime().Unix()), slot) - slotOneThirdTime := slotStartTime.Unix() + int64(params.BeaconConfig().SecondsPerSlot/3) - waitDuration := slotOneThirdTime - roughtime.Now().Unix() - timeOut := time.After(time.Duration(waitDuration) * time.Second) - - stateChannel := make(chan *feed.Event, 1) - stateSub := vs.StateNotifier.StateFeed().Subscribe(stateChannel) - defer stateSub.Unsubscribe() - - for { - select { - case event := <-stateChannel: - // Node processed a block, check if the processed block is the same as input slot. - if event.Type == statefeed.BlockProcessed { - d := event.Data.(*statefeed.BlockProcessedData) - if slot == d.Slot { - return - } - } - - case <-timeOut: - return - } - } -} - // SubscribeCommitteeSubnet subscribes to the committee ID subnet given subscribe request. func (vs *Server) SubscribeCommitteeSubnet(ctx context.Context, req *ethpb.CommitteeSubnetSubscribeRequest) (*ptypes.Empty, error) { cache.CommitteeIDs.AddAttesterCommiteeID(req.Slot, req.CommitteeId) diff --git a/beacon-chain/rpc/validator/attester_test.go b/beacon-chain/rpc/validator/attester_test.go index 6bba35d8db4d..bd999a80f5a7 100644 --- a/beacon-chain/rpc/validator/attester_test.go +++ b/beacon-chain/rpc/validator/attester_test.go @@ -380,53 +380,6 @@ func TestAttestationDataSlot_handlesInProgressRequest(t *testing.T) { wg.Wait() } -func TestWaitForSlotOneThird_WaitedCorrectly(t *testing.T) { - currentTime := uint64(roughtime.Now().Unix()) - numOfSlots := uint64(4) - genesisTime := currentTime - (numOfSlots * params.BeaconConfig().SecondsPerSlot) - - chainService := &mock.ChainService{ - Genesis: time.Now(), - } - server := &Server{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Unix(int64(genesisTime), 0)}, - StateNotifier: chainService.StateNotifier(), - } - - timeToSleep := params.BeaconConfig().SecondsPerSlot / 3 - oneThird := currentTime + timeToSleep - server.waitToOneThird(context.Background(), numOfSlots) - - currentTime = uint64(roughtime.Now().Unix()) - if currentTime != oneThird { - t.Errorf("Wanted %d time for slot one third but got %d", oneThird, currentTime) - } -} - -func TestWaitForSlotOneThird_HeadIsHereNoWait(t *testing.T) { - currentTime := uint64(roughtime.Now().Unix()) - numOfSlots := uint64(4) - genesisTime := currentTime - (numOfSlots * params.BeaconConfig().SecondsPerSlot) - - s := &pbp2p.BeaconState{Slot: 2} - state, _ := beaconstate.InitializeFromProto(s) - server := &Server{ - AttestationCache: cache.NewAttestationCache(), - HeadFetcher: &mock.ChainService{State: state}, - SyncChecker: &mockSync.Sync{IsSyncing: false}, - GenesisTimeFetcher: &mock.ChainService{Genesis: time.Unix(int64(genesisTime), 0)}, - } - - server.waitToOneThird(context.Background(), s.Slot) - - if currentTime != uint64(time.Now().Unix()) { - t.Errorf("Wanted %d time for slot one third but got %d", uint64(time.Now().Unix()), currentTime) - } -} - func TestServer_GetAttestationData_InvalidRequestSlot(t *testing.T) { ctx := context.Background() diff --git a/endtoend/minimal_antiflake_e2e_1_test.go b/endtoend/minimal_antiflake_e2e_1_test.go index 5b8914f49d74..8cac16c60503 100644 --- a/endtoend/minimal_antiflake_e2e_1_test.go +++ b/endtoend/minimal_antiflake_e2e_1_test.go @@ -11,6 +11,7 @@ import ( ) func TestEndToEnd_AntiFlake_MinimalConfig_1(t *testing.T) { + t.Skip("Temp skip for #5127, need proper network implementations") testutil.ResetCache() params.UseMinimalConfig() diff --git a/endtoend/minimal_antiflake_e2e_2_test.go b/endtoend/minimal_antiflake_e2e_2_test.go index 547ea4876955..d5ba4b93ddc4 100644 --- a/endtoend/minimal_antiflake_e2e_2_test.go +++ b/endtoend/minimal_antiflake_e2e_2_test.go @@ -11,6 +11,7 @@ import ( ) func TestEndToEnd_AntiFlake_MinimalConfig_2(t *testing.T) { + t.Skip("Temp skip for #5127, need proper network implementations") testutil.ResetCache() params.UseMinimalConfig() diff --git a/validator/client/validator_attest.go b/validator/client/validator_attest.go index 66281cdb077d..7e05e35f31bd 100644 --- a/validator/client/validator_attest.go +++ b/validator/client/validator_attest.go @@ -5,6 +5,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -17,6 +18,8 @@ import ( "github.com/prysmaticlabs/prysm/shared/featureconfig" "github.com/prysmaticlabs/prysm/shared/hashutil" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/roughtime" + "github.com/prysmaticlabs/prysm/shared/slotutil" "github.com/prysmaticlabs/prysm/validator/keymanager" "github.com/sirupsen/logrus" "go.opencensus.io/trace" @@ -65,6 +68,8 @@ func (v *validator) SubmitAttestation(ctx context.Context, slot uint64, pubKey [ return } + v.waitToSlotOneThird(ctx, slot) + req := ðpb.AttestationDataRequest{ Slot: slot, CommitteeIndex: duty.CommitteeIndex, @@ -307,3 +312,17 @@ func safeTargetToSource(history *slashpb.AttestationHistory, targetEpoch uint64) } return history.TargetToSource[targetEpoch%wsPeriod] } + +// waitToSlotOneThird waits until one third through the current slot period +// such that head block for beacon node can get updated. +func (v *validator) waitToSlotOneThird(ctx context.Context, slot uint64) { + _, span := trace.StartSpan(ctx, "validator.waitToSlotOneThird") + defer span.End() + + twoThird := params.BeaconConfig().SecondsPerSlot * 1 / 3 + delay := time.Duration(twoThird) * time.Second + + startTime := slotutil.SlotStartTime(v.genesisTime, slot) + finalTime := startTime.Add(delay) + time.Sleep(roughtime.Until(finalTime)) +}