diff --git a/beacon-chain/blockchain/BUILD.bazel b/beacon-chain/blockchain/BUILD.bazel index 72a6476d652a..dc437765c3cd 100644 --- a/beacon-chain/blockchain/BUILD.bazel +++ b/beacon-chain/blockchain/BUILD.bazel @@ -23,6 +23,7 @@ go_library( "//beacon-chain/core/state:go_default_library", "//beacon-chain/db:go_default_library", "//beacon-chain/operations:go_default_library", + "//beacon-chain/operations/attestations:go_default_library", "//beacon-chain/p2p:go_default_library", "//beacon-chain/powchain:go_default_library", "//proto/beacon/p2p/v1:go_default_library", diff --git a/beacon-chain/blockchain/receive_attestation.go b/beacon-chain/blockchain/receive_attestation.go index 2b9ae564850b..0682a8d9201e 100644 --- a/beacon-chain/blockchain/receive_attestation.go +++ b/beacon-chain/blockchain/receive_attestation.go @@ -59,13 +59,13 @@ func (s *Service) processAttestation() { period := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second ctx := context.Background() runutil.RunEvery(s.ctx, period, func() { - atts, err := s.opsPoolService.AttestationPoolForForkchoice(ctx) - if err != nil { - log.WithError(err).Error("Could not retrieve attestation from pool") - return - } + atts := s.attPool.ForkchoiceAttestations() for _, a := range atts { + if err := s.attPool.DeleteForkchoiceAttestation(a); err != nil { + log.WithError(err).Error("Could not delete fork choice attestation in pool") + } + if err := s.ReceiveAttestationNoPubsub(ctx, a); err != nil { log.WithFields(logrus.Fields{ "targetRoot": fmt.Sprintf("%#x", a.Data.Target.Root), diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 75d556ea468e..0d3b33f6f327 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -109,6 +109,12 @@ func (s *Service) ReceiveBlockNoPubsub(ctx context.Context, block *ethpb.BeaconB }, }) + // Add attestations from the block to the pool for fork choice. + if err := s.attPool.SaveBlockAttestations(block.Body.Attestations); err != nil { + log.Errorf("Could not save attestation for fork choice: %v", err) + return nil + } + // Reports on block and fork choice metrics. s.reportSlotMetrics(blockCopy.Slot) diff --git a/beacon-chain/blockchain/service.go b/beacon-chain/blockchain/service.go index 849d80073be2..c99e547cb877 100644 --- a/beacon-chain/blockchain/service.go +++ b/beacon-chain/blockchain/service.go @@ -22,6 +22,7 @@ import ( "github.com/prysmaticlabs/prysm/beacon-chain/core/state" "github.com/prysmaticlabs/prysm/beacon-chain/db" "github.com/prysmaticlabs/prysm/beacon-chain/operations" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/beacon-chain/powchain" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -40,6 +41,7 @@ type Service struct { depositCache *depositcache.DepositCache chainStartFetcher powchain.ChainStartFetcher opsPoolService operations.OperationFeeds + attPool attestations.Pool forkChoiceStore forkchoice.ForkChoicer genesisTime time.Time p2p p2p.Broadcaster @@ -59,6 +61,7 @@ type Config struct { BeaconDB db.Database DepositCache *depositcache.DepositCache OpsPoolService operations.OperationFeeds + AttPool attestations.Pool P2p p2p.Broadcaster MaxRoutines int64 StateNotifier statefeed.Notifier @@ -76,6 +79,7 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) { depositCache: cfg.DepositCache, chainStartFetcher: cfg.ChainStartFetcher, opsPoolService: cfg.OpsPoolService, + attPool: cfg.AttPool, forkChoiceStore: store, p2p: cfg.P2p, canonicalRoots: make(map[uint64][]byte), diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index 260436f56f32..79aa104bcd0c 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "encoding/hex" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" "io/ioutil" "reflect" "testing" @@ -121,6 +122,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { OpsPoolService: &ops.Operations{}, P2p: &mockBroadcaster{}, StateNotifier: &mockBeaconNode{}, + AttPool: attestations.NewPool(), } if err != nil { t.Fatalf("could not register blockchain service: %v", err) diff --git a/beacon-chain/node/node.go b/beacon-chain/node/node.go index 1d6da208f33b..5228f977b6e8 100644 --- a/beacon-chain/node/node.go +++ b/beacon-chain/node/node.go @@ -80,15 +80,6 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { flags.ConfigureGlobalFlags(ctx) registry := shared.NewServiceRegistry() - beacon := &BeaconNode{ - ctx: ctx, - services: registry, - stop: make(chan struct{}), - stateFeed: new(event.Feed), - opFeed: new(event.Feed), - attestationPool: attestations.NewPool(), - } - // Use custom config values if the --no-custom-config flag is not set. if !ctx.GlobalBool(flags.NoCustomConfigFlag.Name) { if featureconfig.Get().MinimalConfig { @@ -104,6 +95,15 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { } } + beacon := &BeaconNode{ + ctx: ctx, + services: registry, + stop: make(chan struct{}), + stateFeed: new(event.Feed), + opFeed: new(event.Feed), + attestationPool: attestations.NewPool(), + } + if err := beacon.startDB(ctx); err != nil { return nil, err } @@ -120,6 +120,10 @@ func NewBeaconNode(ctx *cli.Context) (*BeaconNode, error) { return nil, err } + if err := beacon.registerAttestationPool(ctx); err != nil { + return nil, err + } + if err := beacon.registerInteropServices(ctx); err != nil { return nil, err } @@ -294,6 +298,7 @@ func (b *BeaconNode) registerBlockchainService(ctx *cli.Context) error { DepositCache: b.depositCache, ChainStartFetcher: web3Service, OpsPoolService: opsService, + AttPool: b.attestationPool, P2p: b.fetchP2P(ctx), MaxRoutines: maxRoutines, StateNotifier: b, @@ -312,6 +317,16 @@ func (b *BeaconNode) registerOperationService(ctx *cli.Context) error { return b.services.RegisterService(operationService) } +func (b *BeaconNode) registerAttestationPool(ctx *cli.Context) error { + attPoolService, err := attestations.NewService(context.Background(), &attestations.Config{ + Pool: b.attestationPool, + }) + if err != nil { + return err + } + return b.services.RegisterService(attPoolService) +} + func (b *BeaconNode) registerPOWChainService(cliCtx *cli.Context) error { if cliCtx.GlobalBool(testSkipPowFlag) { return b.services.RegisterService(&powchain.Service{}) diff --git a/beacon-chain/operations/attestations/BUILD.bazel b/beacon-chain/operations/attestations/BUILD.bazel index 413817a09719..78f28b8a71aa 100644 --- a/beacon-chain/operations/attestations/BUILD.bazel +++ b/beacon-chain/operations/attestations/BUILD.bazel @@ -2,18 +2,40 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["pool.go"], + srcs = [ + "log.go", + "pool.go", + "prepare_forkchoice.go", + "service.go", + ], importpath = "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations", visibility = ["//beacon-chain:__subpackages__"], deps = [ + "//beacon-chain/core/helpers:go_default_library", "//beacon-chain/operations/attestations/kv:go_default_library", + "//shared/hashutil:go_default_library", + "//shared/params:go_default_library", + "@com_github_dgraph_io_ristretto//:go_default_library", "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_ssz//:go_default_library", + "@com_github_sirupsen_logrus//:go_default_library", + "@io_opencensus_go//trace:go_default_library", ], ) go_test( name = "go_default_test", - srcs = ["pool_test.go"], + srcs = [ + "pool_test.go", + "prepare_forkchoice_test.go", + "service_test.go", + ], embed = [":go_default_library"], - deps = ["//beacon-chain/operations/attestations/kv:go_default_library"], + deps = [ + "//beacon-chain/core/helpers:go_default_library", + "//beacon-chain/operations/attestations/kv:go_default_library", + "//shared/bls:go_default_library", + "@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library", + "@com_github_prysmaticlabs_go_bitfield//:go_default_library", + ], ) diff --git a/beacon-chain/operations/attestations/kv/BUILD.bazel b/beacon-chain/operations/attestations/kv/BUILD.bazel index 8b5725b877c0..e167898ae41e 100644 --- a/beacon-chain/operations/attestations/kv/BUILD.bazel +++ b/beacon-chain/operations/attestations/kv/BUILD.bazel @@ -4,6 +4,8 @@ go_library( name = "go_default_library", srcs = [ "aggregated.go", + "block.go", + "forkchoice.go", "kv.go", "unaggregated.go", ], @@ -23,6 +25,8 @@ go_test( name = "go_default_test", srcs = [ "aggregated_test.go", + "block_test.go", + "forkchoice_test.go", "unaggregated_test.go", ], embed = [":go_default_library"], diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index e104090a6d59..09443f8d32fd 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -26,8 +26,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return nil } -// AggregatedAttestation returns the aggregated attestations in cache. -func (p *AttCaches) AggregatedAttestation() []*ethpb.Attestation { +// SaveAggregatedAttestations saves a list of aggregated attestations in cache. +func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveAggregatedAttestation(att); err != nil { + return err + } + } + return nil +} + +// AggregatedAttestations returns the aggregated attestations in cache. +func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount()) for s, i := range p.aggregatedAtt.Items() { // Type assertion for the worst case. This shouldn't happen. diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index 589b562ef5cd..bfa9fb91bff3 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -40,7 +40,7 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) { } } - returned := cache.AggregatedAttestation() + returned := cache.AggregatedAttestations() sort.Slice(returned, func(i, j int) bool { return returned[i].Data.Slot < returned[j].Data.Slot @@ -72,7 +72,7 @@ func TestKV_Aggregated_CanDelete(t *testing.T) { t.Fatal(err) } - returned := cache.AggregatedAttestation() + returned := cache.AggregatedAttestations() wanted := []*ethpb.Attestation{att2} if !reflect.DeepEqual(wanted, returned) { diff --git a/beacon-chain/operations/attestations/kv/block.go b/beacon-chain/operations/attestations/kv/block.go new file mode 100644 index 000000000000..80d15078edcb --- /dev/null +++ b/beacon-chain/operations/attestations/kv/block.go @@ -0,0 +1,60 @@ +package kv + +import ( + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" +) + +// SaveBlockAttestation saves an block attestation in cache. +func (p *AttCaches) SaveBlockAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + // DefaultExpiration is set to what was given to New(). In this case + // it's one epoch. + p.blockAtt.Set(string(r[:]), att, cache.DefaultExpiration) + + return nil +} + +// SaveBlockAttestations saves a list of block attestations in cache. +func (p *AttCaches) SaveBlockAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveBlockAttestation(att); err != nil { + return err + } + } + + return nil +} + +// BlockAttestations returns the block attestations in cache. +func (p *AttCaches) BlockAttestations() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.blockAtt.ItemCount()) + for s, i := range p.blockAtt.Items() { + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.blockAtt.Delete(s) + } + atts = append(atts, att) + } + + return atts +} + +// DeleteBlockAttestation deletes a block attestation in cache. +func (p *AttCaches) DeleteBlockAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + p.blockAtt.Delete(string(r[:])) + + return nil +} diff --git a/beacon-chain/operations/attestations/kv/block_test.go b/beacon-chain/operations/attestations/kv/block_test.go new file mode 100644 index 000000000000..32cccc50a991 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/block_test.go @@ -0,0 +1,96 @@ +package kv + +import ( + "math" + "reflect" + "sort" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/shared/params" +) + +func TestKV_BlockAttestation_CanSaveRetrieve(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + returned := cache.BlockAttestations() + + sort.Slice(returned, func(i, j int) bool { + return returned[i].Data.Slot < returned[j].Data.Slot + }) + + if !reflect.DeepEqual(atts, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_BlockAttestation_CanDelete(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := cache.DeleteBlockAttestation(att1); err != nil { + t.Fatal(err) + } + if err := cache.DeleteBlockAttestation(att3); err != nil { + t.Fatal(err) + } + + returned := cache.BlockAttestations() + wanted := []*ethpb.Attestation{att2} + + if !reflect.DeepEqual(wanted, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_BlockAttestation_CheckExpTime(t *testing.T) { + cache := NewAttCaches() + + att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b111}} + r, _ := ssz.HashTreeRoot(att) + + if err := cache.SaveBlockAttestation(att); err != nil { + t.Fatal(err) + } + + item, exp, exists := cache.blockAtt.GetWithExpiration(string(r[:])) + if !exists { + t.Error("Saved att does not exist") + } + + receivedAtt := item.(*ethpb.Attestation) + if !proto.Equal(att, receivedAtt) { + t.Error("Did not receive correct aggregated att") + } + + wanted := float64(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + if math.RoundToEven(exp.Sub(time.Now()).Seconds()) != wanted { + t.Errorf("Did not receive correct exp time. Wanted: %f, got: %f", wanted, + math.RoundToEven(exp.Sub(time.Now()).Seconds())) + } +} diff --git a/beacon-chain/operations/attestations/kv/forkchoice.go b/beacon-chain/operations/attestations/kv/forkchoice.go new file mode 100644 index 000000000000..03ccb5acafa9 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/forkchoice.go @@ -0,0 +1,61 @@ +package kv + +import ( + "github.com/patrickmn/go-cache" + "github.com/pkg/errors" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" +) + +// SaveForkchoiceAttestation saves an forkchoice attestation in cache. +func (p *AttCaches) SaveForkchoiceAttestation(att *ethpb.Attestation) error { + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + // DefaultExpiration is set to what was given to New(). In this case + // it's one epoch. + p.forkchoiceAtt.Set(string(r[:]), att, cache.DefaultExpiration) + + return nil +} + +// SaveForkchoiceAttestations saves a list of forkchoice attestations in cache. +func (p *AttCaches) SaveForkchoiceAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveForkchoiceAttestation(att); err != nil { + return err + } + } + + return nil +} + +// ForkchoiceAttestations returns the forkchoice attestations in cache. +func (p *AttCaches) ForkchoiceAttestations() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.forkchoiceAtt.ItemCount()) + for s, i := range p.forkchoiceAtt.Items() { + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.forkchoiceAtt.Delete(s) + } + atts = append(atts, att) + } + + return atts +} + +// DeleteForkchoiceAttestation deletes a forkchoice attestation in cache. +func (p *AttCaches) DeleteForkchoiceAttestation(att *ethpb.Attestation) error { + + r, err := ssz.HashTreeRoot(att) + if err != nil { + return errors.Wrap(err, "could not tree hash attestation") + } + + p.forkchoiceAtt.Delete(string(r[:])) + + return nil +} diff --git a/beacon-chain/operations/attestations/kv/forkchoice_test.go b/beacon-chain/operations/attestations/kv/forkchoice_test.go new file mode 100644 index 000000000000..caba719c4516 --- /dev/null +++ b/beacon-chain/operations/attestations/kv/forkchoice_test.go @@ -0,0 +1,96 @@ +package kv + +import ( + "math" + "reflect" + "sort" + "testing" + "time" + + "github.com/gogo/protobuf/proto" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/shared/params" +) + +func TestKV_Forkchoice_CanSaveRetrieve(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + } + + returned := cache.ForkchoiceAttestations() + + sort.Slice(returned, func(i, j int) bool { + return returned[i].Data.Slot < returned[j].Data.Slot + }) + + if !reflect.DeepEqual(atts, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_Forkchoice_CanDelete(t *testing.T) { + cache := NewAttCaches() + + att1 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b1101}} + att2 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1101}} + att3 := ðpb.Attestation{Data: ðpb.AttestationData{Slot: 3}, AggregationBits: bitfield.Bitlist{0b1101}} + atts := []*ethpb.Attestation{att1, att2, att3} + + for _, att := range atts { + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + } + + if err := cache.DeleteForkchoiceAttestation(att1); err != nil { + t.Fatal(err) + } + if err := cache.DeleteForkchoiceAttestation(att3); err != nil { + t.Fatal(err) + } + + returned := cache.ForkchoiceAttestations() + wanted := []*ethpb.Attestation{att2} + + if !reflect.DeepEqual(wanted, returned) { + t.Error("Did not receive correct aggregated atts") + } +} + +func TestKV_Forkchoice_CheckExpTime(t *testing.T) { + cache := NewAttCaches() + + att := ðpb.Attestation{AggregationBits: bitfield.Bitlist{0b111}} + r, _ := ssz.HashTreeRoot(att) + + if err := cache.SaveForkchoiceAttestation(att); err != nil { + t.Fatal(err) + } + + item, exp, exists := cache.forkchoiceAtt.GetWithExpiration(string(r[:])) + if !exists { + t.Error("Saved att does not exist") + } + + receivedAtt := item.(*ethpb.Attestation) + if !proto.Equal(att, receivedAtt) { + t.Error("Did not receive correct aggregated att") + } + + wanted := float64(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) + if math.RoundToEven(exp.Sub(time.Now()).Seconds()) != wanted { + t.Errorf("Did not receive correct exp time. Wanted: %f, got: %f", wanted, + math.RoundToEven(exp.Sub(time.Now()).Seconds())) + } +} diff --git a/beacon-chain/operations/attestations/kv/kv.go b/beacon-chain/operations/attestations/kv/kv.go index 7cbbf5f00a45..362738c61e28 100644 --- a/beacon-chain/operations/attestations/kv/kv.go +++ b/beacon-chain/operations/attestations/kv/kv.go @@ -13,7 +13,8 @@ import ( type AttCaches struct { aggregatedAtt *cache.Cache unAggregatedAtt *cache.Cache - attInBlock *cache.Cache + forkchoiceAtt *cache.Cache + blockAtt *cache.Cache } // NewAttCaches initializes a new attestation pool consists of multiple KV store in cache for @@ -26,7 +27,8 @@ func NewAttCaches() *AttCaches { pool := &AttCaches{ unAggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), aggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), - attInBlock: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), + forkchoiceAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), + blockAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), } return pool diff --git a/beacon-chain/operations/attestations/kv/unaggregated.go b/beacon-chain/operations/attestations/kv/unaggregated.go index d585e77367e2..b796f1a0f249 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated.go +++ b/beacon-chain/operations/attestations/kv/unaggregated.go @@ -26,9 +26,20 @@ func (p *AttCaches) SaveUnaggregatedAttestation(att *ethpb.Attestation) error { return nil } -// UnaggregatedAttestations returns the aggregated attestations in cache, +// SaveUnaggregatedAttestations saves a list of unaggregated attestations in cache. +func (p *AttCaches) SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error { + for _, att := range atts { + if err := p.SaveUnaggregatedAttestation(att); err != nil { + return err + } + } + + return nil +} + +// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache, // filtered by committee index and slot. -func (p *AttCaches) UnaggregatedAttestations(slot uint64, committeeIndex uint64) []*ethpb.Attestation { +func (p *AttCaches) UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation { atts := make([]*ethpb.Attestation, 0, p.unAggregatedAtt.ItemCount()) for s, i := range p.unAggregatedAtt.Items() { @@ -46,6 +57,23 @@ func (p *AttCaches) UnaggregatedAttestations(slot uint64, committeeIndex uint64) return atts } +// UnaggregatedAttestations returns all the unaggregated attestations in cache. +func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation { + atts := make([]*ethpb.Attestation, 0, p.unAggregatedAtt.ItemCount()) + for s, i := range p.unAggregatedAtt.Items() { + + // Type assertion for the worst case. This shouldn't happen. + att, ok := i.Object.(*ethpb.Attestation) + if !ok { + p.unAggregatedAtt.Delete(s) + } + + atts = append(atts, att) + } + + return atts +} + // DeleteUnaggregatedAttestation deletes the unaggregated attestations in cache. func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error { if helpers.IsAggregated(att) { diff --git a/beacon-chain/operations/attestations/kv/unaggregated_test.go b/beacon-chain/operations/attestations/kv/unaggregated_test.go index 5bcc364c8e4a..471f9f24fb2f 100644 --- a/beacon-chain/operations/attestations/kv/unaggregated_test.go +++ b/beacon-chain/operations/attestations/kv/unaggregated_test.go @@ -42,10 +42,11 @@ func TestKV_Unaggregated_CanSaveRetrieve(t *testing.T) { } } - returned := cache.UnaggregatedAttestations(data.Slot, data.CommitteeIndex) + returned := cache.UnaggregatedAttestationsBySlotIndex(data.Slot, data.CommitteeIndex) sort.Slice(returned, func(i, j int) bool { return binary.BigEndian.Uint16(returned[i].Signature) < binary.BigEndian.Uint16(returned[j].Signature) }) + wanted := []*ethpb.Attestation{att2, att3} if !reflect.DeepEqual(len(wanted), len(returned)) { if len(returned) != len(atts) { @@ -73,7 +74,7 @@ func TestKV_Unaggregated_CanDelete(t *testing.T) { t.Fatal(err) } - returned := cache.UnaggregatedAttestations(2, 0) + returned := cache.UnaggregatedAttestationsBySlotIndex(2, 0) if !reflect.DeepEqual([]*ethpb.Attestation{}, returned) { t.Error("Did not receive correct aggregated atts") diff --git a/beacon-chain/operations/attestations/log.go b/beacon-chain/operations/attestations/log.go new file mode 100644 index 000000000000..9a30bf344e06 --- /dev/null +++ b/beacon-chain/operations/attestations/log.go @@ -0,0 +1,7 @@ +package attestations + +import ( + "github.com/sirupsen/logrus" +) + +var log = logrus.WithField("prefix", "pool/attestations") diff --git a/beacon-chain/operations/attestations/pool.go b/beacon-chain/operations/attestations/pool.go index 403f16ee68d8..e7118c959950 100644 --- a/beacon-chain/operations/attestations/pool.go +++ b/beacon-chain/operations/attestations/pool.go @@ -12,13 +12,25 @@ import ( type Pool interface { // For Aggregated attestations SaveAggregatedAttestation(att *ethpb.Attestation) error - AggregatedAttestation() []*ethpb.Attestation + SaveAggregatedAttestations(atts []*ethpb.Attestation) error + AggregatedAttestations() []*ethpb.Attestation DeleteAggregatedAttestation(att *ethpb.Attestation) error // For unaggregated attestations SaveUnaggregatedAttestation(att *ethpb.Attestation) error - UnaggregatedAttestations(slot uint64, committeeIndex uint64) []*ethpb.Attestation + SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error + UnaggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation + UnaggregatedAttestations() []*ethpb.Attestation DeleteUnaggregatedAttestation(att *ethpb.Attestation) error - // For forkchoice attestations + // For attestations that were included in the block + SaveBlockAttestation(att *ethpb.Attestation) error + SaveBlockAttestations(atts []*ethpb.Attestation) error + BlockAttestations() []*ethpb.Attestation + DeleteBlockAttestation(att *ethpb.Attestation) error + // For attestations to be passed to fork choice + SaveForkchoiceAttestation(att *ethpb.Attestation) error + SaveForkchoiceAttestations(atts []*ethpb.Attestation) error + ForkchoiceAttestations() []*ethpb.Attestation + DeleteForkchoiceAttestation(att *ethpb.Attestation) error } // NewPool initializes a new attestation pool. diff --git a/beacon-chain/operations/attestations/prepare_forkchoice.go b/beacon-chain/operations/attestations/prepare_forkchoice.go new file mode 100644 index 000000000000..0f7d3ac34dc7 --- /dev/null +++ b/beacon-chain/operations/attestations/prepare_forkchoice.go @@ -0,0 +1,106 @@ +package attestations + +import ( + "context" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-ssz" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/hashutil" + "github.com/prysmaticlabs/prysm/shared/params" + "go.opencensus.io/trace" +) + +// prepare attestations for fork choice at every half of the slot. +var prepareForkChoiceAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second + +// This prepares fork choice attestations by running batchForkChoiceAtts +// every prepareForkChoiceAttsPeriod. +func (s *Service) prepareForkChoiceAtts() { + ticker := time.NewTicker(prepareForkChoiceAttsPeriod) + for { + ctx := context.Background() + select { + case <-ticker.C: + if err := s.batchForkChoiceAtts(ctx); err != nil { + log.WithError(err).Error("Could not prepare attestations for fork choice") + } + case <-s.ctx.Done(): + log.Debug("Context closed, exiting routine") + return + } + } +} + +// This gets the attestations from the unaggregated, aggregated and block +// pool. Then finds the common data, aggregate and batch them for fork choice. +// The resulting attestations are saved in the fork choice pool. +func (s *Service) batchForkChoiceAtts(ctx context.Context) error { + _, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts") + defer span.End() + + attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation) + + atts := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...) + atts = append(atts, s.pool.BlockAttestations()...) + + for _, att := range atts { + seen, err := s.seen(att) + if err != nil { + return err + } + if seen { + continue + } + + attDataRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + return err + } + attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att) + } + + for _, atts := range attsByDataRoot { + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + return err + } + } + + for _, a := range s.pool.BlockAttestations() { + if err := s.pool.DeleteBlockAttestation(a); err != nil { + return err + } + } + + return nil +} + +// This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations +// and saves the attestations for fork choice. +func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) error { + aggregatedAtts, err := helpers.AggregateAttestations(atts) + if err != nil { + return err + } + + if err := s.pool.SaveForkchoiceAttestations(aggregatedAtts); err != nil { + return err + } + + return nil +} + +// This checks if the attestation has previously been aggregated for fork choice +// return true if yes, false if no. +func (s *Service) seen(att *ethpb.Attestation) (bool, error) { + attRoot, err := hashutil.HashProto(att) + if err != nil { + return false, err + } + if _, ok := s.forkChoiceProcessedRoots.Get(string(attRoot[:])); ok { + return true, nil + } + s.forkChoiceProcessedRoots.Set(string(attRoot[:]), true /*value*/, 1 /*cost*/) + return false, nil +} diff --git a/beacon-chain/operations/attestations/prepare_forkchoice_test.go b/beacon-chain/operations/attestations/prepare_forkchoice_test.go new file mode 100644 index 000000000000..55a2c4996b9d --- /dev/null +++ b/beacon-chain/operations/attestations/prepare_forkchoice_test.go @@ -0,0 +1,239 @@ +package attestations + +import ( + "context" + "reflect" + "sort" + "testing" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/bls" +) + +func TestBatchAttestations_Multiple(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + unaggregatedAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b101000}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, + } + aggregatedAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b110001}, Signature: sig.Marshal()}, + } + blockAtts := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b100001}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 0}, AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b111000}, Signature: sig.Marshal()}, // Duplicated + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b100011}, Signature: sig.Marshal()}, // Duplicated + } + if err := s.pool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) + } + if err := s.pool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) + } + if err := s.pool.SaveBlockAttestations(blockAtts); err != nil { + t.Fatal(err) + } + + if err := s.batchForkChoiceAtts(context.Background()); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[0], aggregatedAtts[0], blockAtts[0]}) + if err != nil { + t.Fatal(err) + } + aggregated, err := helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[1], aggregatedAtts[1], blockAtts[1]}) + if err != nil { + t.Fatal(err) + } + wanted = append(wanted, aggregated...) + aggregated, err = helpers.AggregateAttestations([]*ethpb.Attestation{unaggregatedAtts[2], aggregatedAtts[2], blockAtts[2]}) + if err != nil { + t.Fatal(err) + } + + wanted = append(wanted, aggregated...) + received := s.pool.ForkchoiceAttestations() + + sort.Slice(received, func(i, j int) bool { + return received[i].Data.Slot < received[j].Data.Slot + }) + sort.Slice(wanted, func(i, j int) bool { + return wanted[i].Data.Slot < wanted[j].Data.Slot + }) + + if !reflect.DeepEqual(wanted, received) { + t.Error("Did not aggregation and save for batch") + } +} + +func TestBatchAttestations_Single(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + unaggregatedAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101000}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b100100}, Signature: sig.Marshal()}, + } + aggregatedAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101100}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + } + blockAtts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b100010}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110010}, Signature: sig.Marshal()}, // Duplicated + } + if err := s.pool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) + } + + if err := s.pool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) + } + + if err := s.pool.SaveBlockAttestations(blockAtts); err != nil { + t.Fatal(err) + } + + if err := s.batchForkChoiceAtts(context.Background()); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(append(unaggregatedAtts, aggregatedAtts...)) + if err != nil { + t.Fatal(err) + } + + wanted, err = helpers.AggregateAttestations(append(wanted, blockAtts...)) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + t.Error("Did not aggregation and save for batch") + } +} + +func TestAggregateAndSaveForkChoiceAtts_Single(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + atts := []*ethpb.Attestation{ + {AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}} + if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(atts) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(wanted, s.pool.ForkchoiceAttestations()) { + t.Error("Did not aggregation and save") + } +} + +func TestAggregateAndSaveForkChoiceAtts_Multiple(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + sk := bls.RandKey() + sig := sk.Sign([]byte("dummy_test_data"), 0 /*domain*/) + + atts1 := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0b101}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0b110}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(atts1); err != nil { + t.Fatal(err) + } + atts2 := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b10110}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11100}, Signature: sig.Marshal()}, + {Data: ðpb.AttestationData{Slot: 1}, AggregationBits: bitfield.Bitlist{0b11000}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(atts2); err != nil { + t.Fatal(err) + } + att3 := []*ethpb.Attestation{ + {Data: ðpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1100}, Signature: sig.Marshal()}, + } + if err := s.aggregateAndSaveForkChoiceAtts(att3); err != nil { + t.Fatal(err) + } + + wanted, err := helpers.AggregateAttestations(atts1) + if err != nil { + t.Fatal(err) + } + aggregated, err := helpers.AggregateAttestations(atts2) + if err != nil { + t.Fatal(err) + } + + wanted = append(wanted, aggregated...) + wanted = append(wanted, att3...) + + received := s.pool.ForkchoiceAttestations() + sort.Slice(received, func(i, j int) bool { + return received[i].Data.Slot < received[j].Data.Slot + }) + if !reflect.DeepEqual(wanted, received) { + t.Error("Did not aggregation and save") + } +} + +func TestSeenAttestations_PresentInCache(t *testing.T) { + s, err := NewService(context.Background(), &Config{Pool: NewPool()}) + if err != nil { + t.Fatal(err) + } + + att1 := ðpb.Attestation{Signature: []byte{'A'}} + got, err := s.seen(att1) + if err != nil { + t.Fatal(err) + } + if got { + t.Error("Wanted false, got true") + } + + time.Sleep(100 * time.Millisecond) + got, err = s.seen(att1) + if err != nil { + t.Fatal(err) + } + if !got { + t.Error("Wanted true, got false") + } +} diff --git a/beacon-chain/operations/attestations/service.go b/beacon-chain/operations/attestations/service.go new file mode 100644 index 000000000000..08b7015c00b0 --- /dev/null +++ b/beacon-chain/operations/attestations/service.go @@ -0,0 +1,64 @@ +package attestations + +import ( + "context" + + "github.com/dgraph-io/ristretto" +) + +var forkChoiceProcessedRootsSize = int64(1 << 16) + +// Service of attestation pool operations +type Service struct { + ctx context.Context + cancel context.CancelFunc + pool Pool + err error + forkChoiceProcessedRoots *ristretto.Cache +} + +// Config options for the service. +type Config struct { + Pool Pool +} + +// NewService instantiates a new attestation pool service instance that will +// be registered into a running beacon node. +func NewService(ctx context.Context, cfg *Config) (*Service, error) { + cache, err := ristretto.NewCache(&ristretto.Config{ + NumCounters: forkChoiceProcessedRootsSize, + MaxCost: forkChoiceProcessedRootsSize, + BufferItems: 64, + }) + if err != nil { + return nil, err + } + + ctx, cancel := context.WithCancel(ctx) + return &Service{ + ctx: ctx, + cancel: cancel, + pool: cfg.Pool, + forkChoiceProcessedRoots: cache, + }, nil +} + +// Start an attestation pool service's main event loop. +func (s *Service) Start() { + go s.prepareForkChoiceAtts() +} + +// Stop the beacon block attestation pool service's main event loop +// and associated goroutines. +func (s *Service) Stop() error { + defer s.cancel() + return nil +} + +// Status returns the current service err if there's any. +func (s *Service) Status() error { + if s.err != nil { + return s.err + } + return nil +} diff --git a/beacon-chain/operations/attestations/service_test.go b/beacon-chain/operations/attestations/service_test.go new file mode 100644 index 000000000000..e2ffb263403f --- /dev/null +++ b/beacon-chain/operations/attestations/service_test.go @@ -0,0 +1,31 @@ +package attestations + +import ( + "context" + "errors" + "testing" +) + +func TestStop_OK(t *testing.T) { + s, err := NewService(context.Background(), &Config{}) + if err != nil { + t.Fatal(err) + } + + if err := s.Stop(); err != nil { + t.Fatalf("Unable to stop attestation pool service: %v", err) + } + + if s.ctx.Err() != context.Canceled { + t.Error("context was not canceled") + } +} + +func TestStatus_Error(t *testing.T) { + err := errors.New("bad bad bad") + s := &Service{err: err} + + if err := s.Status(); err != s.err { + t.Errorf("Wanted: %v, got: %v", s.err, s.Status()) + } +} diff --git a/beacon-chain/rpc/aggregator/server.go b/beacon-chain/rpc/aggregator/server.go index 3ecbb5a9a4a0..f5cf5eb47cd9 100644 --- a/beacon-chain/rpc/aggregator/server.go +++ b/beacon-chain/rpc/aggregator/server.go @@ -77,7 +77,7 @@ func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.Aggregati } // Retrieve the unaggregated attestation from pool - atts := as.AttPool.UnaggregatedAttestations(req.Slot, req.CommitteeIndex) + atts := as.AttPool.UnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex) headState, err := as.HeadFetcher.HeadState(ctx) if err != nil { @@ -100,12 +100,16 @@ func (as *Server) SubmitAggregateAndProof(ctx context.Context, req *pb.Aggregati if err != nil { return nil, status.Errorf(codes.Internal, "Could not aggregate attestations: %v", err) } - for _, att := range aggregatedAtts { - if helpers.IsAggregated(att) { - if err := as.P2p.Broadcast(ctx, att); err != nil { + for _, aggregatedAtt := range aggregatedAtts { + if helpers.IsAggregated(aggregatedAtt) { + if err := as.P2p.Broadcast(ctx, ðpb.AggregateAttestationAndProof{ + AggregatorIndex: validatorIndex, + SelectionProof: req.SlotSignature, + Aggregate: aggregatedAtt, + }); err != nil { return nil, status.Errorf(codes.Internal, "Could not broadcast aggregated attestation: %v", err) } - if err := as.AttPool.SaveAggregatedAttestation(att); err != nil { + if err := as.AttPool.SaveAggregatedAttestation(aggregatedAtt); err != nil { return nil, status.Errorf(codes.Internal, "Could not save aggregated attestation: %v", err) } } diff --git a/beacon-chain/rpc/aggregator/server_test.go b/beacon-chain/rpc/aggregator/server_test.go index 5e0da1259b48..7a1b6b6b1a73 100644 --- a/beacon-chain/rpc/aggregator/server_test.go +++ b/beacon-chain/rpc/aggregator/server_test.go @@ -144,7 +144,7 @@ func TestSubmitAggregateAndProof_AggregateOk(t *testing.T) { t.Fatal(err) } - aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestation() + aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestations() wanted, err := helpers.AggregateAttestation(att0, att1) if err != nil { t.Fatal(err) @@ -194,7 +194,7 @@ func TestSubmitAggregateAndProof_AggregateNotOk(t *testing.T) { t.Fatal(err) } - aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestation() + aggregatedAtts := aggregatorServer.AttPool.AggregatedAttestations() if len(aggregatedAtts) != 0 { t.Errorf("Wanted aggregated attestation 0, got %d", len(aggregatedAtts)) } diff --git a/beacon-chain/rpc/beacon/attestations.go b/beacon-chain/rpc/beacon/attestations.go index f671cb404f0d..55cc4ca5fd34 100644 --- a/beacon-chain/rpc/beacon/attestations.go +++ b/beacon-chain/rpc/beacon/attestations.go @@ -133,7 +133,7 @@ func (bs *Server) StreamAttestations( func (bs *Server) AttestationPool( ctx context.Context, _ *ptypes.Empty, ) (*ethpb.AttestationPoolResponse, error) { - atts := bs.Pool.AggregatedAttestation() + atts := bs.Pool.AggregatedAttestations() return ðpb.AttestationPoolResponse{ Attestations: atts, }, nil diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 027c259b04b7..74d78dc4175d 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -218,6 +218,8 @@ func (s *Service) Start() { BeaconDB: s.beaconDB, HeadFetcher: s.headFetcher, SyncChecker: s.syncService, + AttPool: s.attestationsPool, + P2p: s.p2p, } pb.RegisterAggregatorServiceServer(s.grpcServer, aggregatorServer) ethpb.RegisterNodeServer(s.grpcServer, nodeServer) diff --git a/beacon-chain/rpc/validator/assignments_test.go b/beacon-chain/rpc/validator/assignments_test.go index 60f118a6c42e..0a64c66833ec 100644 --- a/beacon-chain/rpc/validator/assignments_test.go +++ b/beacon-chain/rpc/validator/assignments_test.go @@ -352,7 +352,7 @@ func BenchmarkCommitteeAssignment(b *testing.B) { } req := ðpb.DutiesRequest{ PublicKeys: pks, - Epoch: 0, + Epoch: 0, } b.ResetTimer() for i := 0; i < b.N; i++ { diff --git a/beacon-chain/rpc/validator/proposer.go b/beacon-chain/rpc/validator/proposer.go index e5931615ce05..0f4218b94d7f 100644 --- a/beacon-chain/rpc/validator/proposer.go +++ b/beacon-chain/rpc/validator/proposer.go @@ -55,7 +55,7 @@ func (vs *Server) GetBlock(ctx context.Context, req *ethpb.BlockRequest) (*ethpb } // Pack aggregated attestations which have not been included in the beacon chain. - atts := vs.AttPool.AggregatedAttestation() + atts := vs.AttPool.AggregatedAttestations() atts, err = vs.filterAttestationsForBlockInclusion(ctx, req.Slot, atts) if err != nil { return nil, status.Errorf(codes.Internal, "Could not filter attestations: %v", err) @@ -110,6 +110,10 @@ func (vs *Server) ProposeBlock(ctx context.Context, blk *ethpb.BeaconBlock) (*et return nil, status.Errorf(codes.Internal, "Could not process beacon block: %v", err) } + if err := vs.deleteAttsInPool(blk.Body.Attestations); err != nil { + return nil, status.Errorf(codes.Internal, "Could not delete attestations in pool: %v", err) + } + return ðpb.ProposeResponse{ BlockRoot: root[:], }, nil @@ -334,6 +338,7 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot defer span.End() validAtts := make([]*ethpb.Attestation, 0, len(atts)) + inValidAtts := make([]*ethpb.Attestation, 0, len(atts)) bState, err := vs.BeaconDB.HeadState(ctx) if err != nil { @@ -353,24 +358,38 @@ func (vs *Server) filterAttestationsForBlockInclusion(ctx context.Context, slot break } - if err := blocks.VerifyAttestation(ctx, bState, att); err != nil { - if helpers.IsAggregated(att) { - if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil { - return nil, err - } - } else { - if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil { - return nil, err - } - } + if _, err := blocks.ProcessAttestation(ctx, bState, att); err != nil { + inValidAtts = append(inValidAtts, att) continue + } validAtts = append(validAtts, att) } + if err := vs.deleteAttsInPool(inValidAtts); err != nil { + return nil, err + } + return validAtts, nil } +// The input attestations are processed and seen by the node, this deletes them from pool +// so proposers don't include them in a block for the future. +func (vs *Server) deleteAttsInPool(atts []*ethpb.Attestation) error { + for _, att := range atts { + if helpers.IsAggregated(att) { + if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil { + return err + } + } else { + if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil { + return err + } + } + } + return nil +} + func constructMerkleProof(trie *trieutil.SparseMerkleTrie, index int, deposit *ethpb.Deposit) (*ethpb.Deposit, error) { proof, err := trie.MerkleProof(index) if err != nil { diff --git a/beacon-chain/rpc/validator/proposer_test.go b/beacon-chain/rpc/validator/proposer_test.go index ef80f395c217..5c7c4dd3bcdc 100644 --- a/beacon-chain/rpc/validator/proposer_test.go +++ b/beacon-chain/rpc/validator/proposer_test.go @@ -1047,7 +1047,8 @@ func TestFilterAttestation_OK(t *testing.T) { aggBits.SetBitAt(0, true) atts[i] = ðpb.Attestation{Data: ðpb.AttestationData{ CommitteeIndex: uint64(i), - Target: ðpb.Checkpoint{}}, + Target: ðpb.Checkpoint{}, + Source: ðpb.Checkpoint{Root: params.BeaconConfig().ZeroHash[:]}}, AggregationBits: aggBits, } committee, err := helpers.BeaconCommitteeFromState(state, atts[i].Data.Slot, atts[i].Data.CommitteeIndex) @@ -1267,3 +1268,29 @@ func TestDeposits_ReturnsEmptyList_IfLatestEth1DataEqGenesisEth1Block(t *testing ) } } + +func TestDeleteAttsInPool_Aggregated(t *testing.T) { + s := &Server{ + AttPool: attestations.NewPool(), + } + + aggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b01101}}, {AggregationBits: bitfield.Bitlist{0b0111}}} + unaggregatedAtts := []*ethpb.Attestation{{AggregationBits: bitfield.Bitlist{0b001}}, {AggregationBits: bitfield.Bitlist{0b0001}}} + + if err := s.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil { + t.Fatal(err) + } + if err := s.AttPool.SaveUnaggregatedAttestations(unaggregatedAtts); err != nil { + t.Fatal(err) + } + + if err := s.deleteAttsInPool(append(aggregatedAtts, unaggregatedAtts...)); err != nil { + t.Fatal(err) + } + if len(s.AttPool.AggregatedAttestations()) != 0 { + t.Error("Did not delete aggregated attestation") + } + if len(s.AttPool.UnaggregatedAttestations()) != 0 { + t.Error("Did not delete unaggregated attestation") + } +} diff --git a/beacon-chain/rpc/validator/server.go b/beacon-chain/rpc/validator/server.go index aaa8d15b8954..a14e9cb31eff 100644 --- a/beacon-chain/rpc/validator/server.go +++ b/beacon-chain/rpc/validator/server.go @@ -41,7 +41,6 @@ type Server struct { Ctx context.Context BeaconDB db.Database AttestationCache *cache.AttestationCache - AttPool attestations.Pool HeadFetcher blockchain.HeadFetcher ForkFetcher blockchain.ForkFetcher CanonicalStateChan chan *pbp2p.BeaconState @@ -54,6 +53,7 @@ type Server struct { OperationsHandler operations.Handler P2P p2p.Broadcaster Pool operations.Pool + AttPool attestations.Pool BlockReceiver blockchain.BlockReceiver MockEth1Votes bool Eth1BlockFetcher powchain.POWBlockFetcher diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 77fca955dbe8..7bc692636230 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -84,6 +84,7 @@ go_test( "rpc_goodbye_test.go", "rpc_status_test.go", "rpc_test.go", + "subscriber_beacon_aggregate_proof_test.go", "subscriber_beacon_blocks_test.go", "subscriber_test.go", "validate_aggregate_proof_test.go", diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go index 2c19655fb5e7..7ef85c222667 100644 --- a/beacon-chain/sync/subscriber_beacon_aggregate_proof.go +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof.go @@ -2,6 +2,7 @@ package sync import ( "context" + "fmt" "github.com/gogo/protobuf/proto" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -10,5 +11,10 @@ import ( // beaconAggregateProofSubscriber forwards the incoming validated aggregated attestation and proof to the // attestation pool for processing. func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.Message) error { - return r.attPool.SaveAggregatedAttestation(msg.(*ethpb.Attestation)) + a, ok := msg.(*ethpb.AggregateAttestationAndProof) + if !ok { + return fmt.Errorf("message was not type *eth.AggregateAttestationAndProof, type=%T", msg) + } + + return r.attPool.SaveAggregatedAttestation(a.Aggregate) } diff --git a/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go new file mode 100644 index 000000000000..2a282864fb60 --- /dev/null +++ b/beacon-chain/sync/subscriber_beacon_aggregate_proof_test.go @@ -0,0 +1,25 @@ +package sync + +import ( + "context" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" + "reflect" + "testing" +) + +func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) { + r := &Service{ + attPool: attestations.NewPool(), + } + + a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100} + if err := r.beaconAggregateProofSubscriber(context.Background(), a); err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{a.Aggregate}) { + t.Error("Did not save aggregated attestation") + } +} diff --git a/beacon-chain/sync/subscriber_beacon_blocks_test.go b/beacon-chain/sync/subscriber_beacon_blocks_test.go index 81a691222e87..661e3477c27d 100644 --- a/beacon-chain/sync/subscriber_beacon_blocks_test.go +++ b/beacon-chain/sync/subscriber_beacon_blocks_test.go @@ -35,8 +35,8 @@ func TestRegularSyncBeaconBlockSubscriber_FilterByFinalizedEpoch(t *testing.T) { } parentRoot, _ := ssz.SigningRoot(parent) r := &Service{ - db: db, - chain: &mock.ChainService{State: s}, + db: db, + chain: &mock.ChainService{State: s}, attPool: attestations.NewPool(), } @@ -81,7 +81,7 @@ func TestDeleteAttsInPool(t *testing.T) { } // Only 2 should remain - if !reflect.DeepEqual(r.attPool.AggregatedAttestation(), []*ethpb.Attestation{att2}) { + if !reflect.DeepEqual(r.attPool.AggregatedAttestations(), []*ethpb.Attestation{att2}) { t.Error("Did not get wanted attestation from pool") } }