diff --git a/beacon-chain/operations/attestations/kv/BUILD.bazel b/beacon-chain/operations/attestations/kv/BUILD.bazel index 8adab3f6faaa..be98776d75de 100644 --- a/beacon-chain/operations/attestations/kv/BUILD.bazel +++ b/beacon-chain/operations/attestations/kv/BUILD.bazel @@ -17,6 +17,7 @@ go_library( "//shared/hashutil:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_ssz//:go_default_library", ], ) @@ -31,6 +32,7 @@ go_test( ], embed = [":go_default_library"], deps = [ + "//shared/bls:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", "@com_github_prysmaticlabs_go_bitfield//:go_default_library", ], diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index f9a3b5a24d26..eab810f7edbd 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -3,10 +3,68 @@ package kv import ( "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state" ) +// AggregateUnaggregatedAttestations aggregates the unaggregated attestations and save the +// newly aggregated attestations in the pool. +// It tracks the unaggregated attestations that weren't able to aggregate to prevent +// the deletion of unaggregated attestations in the pool. +func (p *AttCaches) AggregateUnaggregatedAttestations() error { + attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) + unaggregatedAtts := p.UnaggregatedAttestations() + for _, att := range unaggregatedAtts { + attDataRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return err + } + attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) + } + + // Aggregate unaggregated attestations from the pool and save them in the pool. + // Track the unaggregated attestations that aren't able to aggregate. + leftOverUnaggregatedAtt := make(map[[32]byte]bool) + for _, atts := range attsByDataRoot { + aggregatedAtts := make([]*ethpb.Attestation, 0, len(atts)) + processedAtts, err := helpers.AggregateAttestations(atts) + if err != nil { + return err + } + for _, att := range processedAtts { + if helpers.IsAggregated(att) { + aggregatedAtts = append(aggregatedAtts, att) + } else { + h, err := ssz.HashTreeRoot(att) + if err != nil { + return err + } + leftOverUnaggregatedAtt[h] = true + } + } + if err := p.SaveAggregatedAttestations(aggregatedAtts); err != nil { + return err + } + } + + // Remove the unaggregated attestations from the pool that were successfully aggregated. + for _, att := range unaggregatedAtts { + h, err := ssz.HashTreeRoot(att) + if err != nil { + return err + } + if leftOverUnaggregatedAtt[h] { + continue + } + if err := p.DeleteUnaggregatedAttestation(att); err != nil { + return err + } + } + + return nil +} + // SaveAggregatedAttestation saves an aggregated attestation in cache. func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { if att == nil || att.Data == nil { diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index 99bbf56c8cb4..eb7034ae7903 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -3,21 +3,45 @@ package kv import ( "reflect" "sort" - "strings" "testing" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/shared/bls" ) -func TestKV_Aggregated_NotAggregated(t *testing.T) { +func TestKV_AggregateUnaggregatedAttestations(t *testing.T) { cache := NewAttCaches() + priv := bls.RandKey() + sig1 := priv.Sign([]byte{'a'}) + sig2 := priv.Sign([]byte{'b'}) + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig1.Marshal()} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1010}, Signature: sig1.Marshal()} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig1.Marshal()} + att4 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()} + att5 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig1.Marshal()} + att6 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1010}, Signature: sig1.Marshal()} + att7 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig1.Marshal()} + att8 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()} + atts := []*ethpb.Attestation{att1, att2, att3, att4, att5, att6, att7, att8} + if err := cache.SaveUnaggregatedAttestations(atts); err != nil { + t.Fatal(err) + } + if err := cache.AggregateUnaggregatedAttestations(); err != nil { + t.Fatal(err) + } - att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b11}, Data: ðpb.AttestationData{}} - - wanted := "attestation is not aggregated" - if err := cache.SaveAggregatedAttestation(att); !strings.Contains(err.Error(), wanted) { - t.Error("Did not received wanted error") + if len(cache.AggregatedAttestationsBySlotIndex(1, 0)) != 1 { + t.Fatal("Did not aggregate correctly") + } + if len(cache.AggregatedAttestationsBySlotIndex(2, 0)) != 1 { + t.Fatal("Did not aggregate correctly") + } + if len(cache.UnAggregatedAttestationsBySlotIndex(1, 0)) != 0 { + t.Fatal("Did not clear unaggregated correctly") + } + if len(cache.UnAggregatedAttestationsBySlotIndex(2, 0)) != 0 { + t.Fatal("Did not clear unaggregated correctly") } } diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 7cddc8bfb36a..8da2c572cf9d 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -11,6 +11,7 @@ import ( // for aggregator actor. type Pool interface { // For Aggregated attestations + AggregateUnaggregatedAttestations() error SaveAggregatedAttestation(att *ethpb.Attestation) error SaveAggregatedAttestations(atts []*ethpb.Attestation) error AggregatedAttestations() []*ethpb.Attestation diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go index 3e525c87de68..1d69732ec562 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -46,8 +46,10 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error { attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) - atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) - atts = append(atts, s.pool.BlockAttestations()...) + if err := s.pool.AggregateUnaggregatedAttestations(); err != nil { + return err + } + atts := append(s.pool.AggregatedAttestations(), s.pool.BlockAttestations()...) atts = append(atts, s.pool.ForkchoiceAttestations()...) // Consolidate attestations by aggregating them by similar data root. diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go index dee34635e555..b3e09bf58a2b 100644 --- a/beacon-chain/operations/attestations/prepare_forkchoice_test.go +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -98,21 +98,24 @@ func TestBatchAttestations_Multiple(t *testing.T) { t.Fatal(err) } - wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[0], aggregatedAtts[0], blockAtts[0]}) + wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[0], blockAtts[0]}) if err != nil { t.Fatal(err) } - aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[1], aggregatedAtts[1], blockAtts[1]}) + aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[1], blockAtts[1]}) if err != nil { t.Fatal(err) } wanted = append(wanted, aggregated...) - aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[2], aggregatedAtts[2], blockAtts[2]}) + aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{aggregatedAtts[2], blockAtts[2]}) if err != nil { t.Fatal(err) } wanted = append(wanted, aggregated...) + if err := s.pool.AggregateUnaggregatedAttestations(); err != nil { + return + } received := s.pool.ForkchoiceAttestations() sort.Slice(received, func(i, j int) bool { diff --git a/beacon-chain/rpc/validator/aggregator.go b/beacon-chain/rpc/validator/aggregator.go index aa4646f25ab8..6912f614f532 100644 --- a/beacon-chain/rpc/validator/aggregator.go +++ b/beacon-chain/rpc/validator/aggregator.go @@ -54,22 +54,10 @@ func (as *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb. return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator") } - // Retrieve the unaggregated attestation from pool. - unaggregatedAtts := as.AttPool.UnAggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) - // In case there's left over aggregated attestations in the pool. - aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) - aggregatedAtts, err = helpers.AggregateAttestations(append(aggregatedAtts, unaggregatedAtts...)) - if err != nil { - return nil, status.Errorf(codes.Internal, "Could not get aggregate attestations: %v", err) - } - - // Save the aggregated attestations to the pool. - if err := as.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil { - return nil, status.Errorf(codes.Internal, "Could not save aggregated attestations: %v", err) - } - if err := as.AttPool.DeleteUnaggregatedAttestations(unaggregatedAtts); err != nil { - return nil, status.Errorf(codes.Internal, "Could not delete unaggregated attestations: %v", err) + if err := as.AttPool.AggregateUnaggregatedAttestations(); err != nil { + return nil, status.Errorf(codes.Internal, "Could not aggregate unaggregated attestations") } + aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) // Filter out the best aggregated attestation (ie. the one with the most aggregated bits). if len(aggregatedAtts) == 0 {