Skip to content

Commit

Permalink
Aggregate on demand for v0.11 (#5302)
Browse files Browse the repository at this point in the history
* Add client implementation

* Update workspace

* Update server

* Update service

* Gaz

* Mocks

* Fixed validator tests

* Add round tirp tests

* Fixed subnet test

* Wait 1/3 on validator side

* Lint

* Comment

* Update committee cache

* Comment

* Update RPC

* Fixed test

* Nishant's comment

* Gaz

* Refresh ENR is for epoch

* Needs to be append

* Fixed duplication

* Tests

* Skip e2e

* Update beacon-chain/rpc/validator/aggregator.go

Co-Authored-By: shayzluf <thezluf@gmail.com>

* Apply suggestions from code review

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
Co-authored-by: shayzluf <thezluf@gmail.com>
Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
  • Loading branch information
4 people authored Apr 6, 2020
1 parent e5aef16 commit e60ea19
Show file tree
Hide file tree
Showing 14 changed files with 124 additions and 305 deletions.
3 changes: 0 additions & 3 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"aggregate.go",
"log.go",
"metrics.go",
"pool.go",
Expand Down Expand Up @@ -34,7 +33,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"aggregate_test.go",
"pool_test.go",
"prepare_forkchoice_test.go",
"prune_expired_test.go",
Expand All @@ -50,6 +48,5 @@ go_test(
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
],
)
79 changes: 0 additions & 79 deletions beacon-chain/operations/attestations/aggregate.go

This file was deleted.

127 changes: 0 additions & 127 deletions beacon-chain/operations/attestations/aggregate_test.go

This file was deleted.

39 changes: 39 additions & 0 deletions beacon-chain/operations/attestations/kv/unaggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation {
return atts
}

// UnAggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
// filtered by committee index and slot.
func (p *AttCaches) UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0)

p.unAggregateAttLock.RLock()
defer p.unAggregateAttLock.RUnlock()
for _, a := range p.unAggregatedAtt {
if slot == a.Data.Slot && committeeIndex == a.Data.CommitteeIndex {
atts = append(atts, a)
}
}

return atts
}

// DeleteUnaggregatedAttestation deletes the unaggregated attestations in cache.
func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error {
if att == nil {
Expand All @@ -73,6 +89,29 @@ func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
return nil
}

// DeleteUnaggregatedAttestations deletes the unaggregated attestations in cache.
func (p *AttCaches) DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error {
p.unAggregateAttLock.Lock()
defer p.unAggregateAttLock.Unlock()
for _, att := range atts {
if att == nil {
continue
}
if helpers.IsAggregated(att) {
return errors.New("attestation is aggregated")
}

r, err := hashFn(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

delete(p.unAggregatedAtt, r)
}

return nil
}

// UnaggregatedAttestationCount returns the number of unaggregated attestations key in the pool.
func (p *AttCaches) UnaggregatedAttestationCount() int {
p.unAggregateAttLock.RLock()
Expand Down
48 changes: 48 additions & 0 deletions beacon-chain/operations/attestations/kv/unaggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,51 @@ func TestKV_Unaggregated_CanDelete(t *testing.T) {
t.Error("Did not receive correct aggregated atts")
}
}

func TestKV_Unaggregated_FilterBySlotsAndCommitteeIDs(t *testing.T) {
cache := NewAttCaches()

att1 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}}
att2 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att3 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att4 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}}

atts := []*ethpb.Attestation{att1, att2, att3, att4}

for _, att := range atts {
if err := cache.SaveUnaggregatedAttestation(att); err != nil {
t.Fatal(err)
}
}

returned := cache.UnAggregatedAttestationsBySlotIndex(1, 1)

if !reflect.DeepEqual([]*ethpb.Attestation{att1, att3}, returned) {
t.Error("Did not receive correct aggregated atts")
}
}

func TestKV_Unaggregated_BatchDelete(t *testing.T) {
cache := NewAttCaches()

att1 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}}
att2 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att3 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att4 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}}

atts := []*ethpb.Attestation{att1, att2, att3, att4}

for _, att := range atts {
if err := cache.SaveUnaggregatedAttestation(att); err != nil {
t.Fatal(err)
}
}
if err := cache.DeleteUnaggregatedAttestations(atts); err != nil {
t.Fatal(err)
}

returned := cache.UnaggregatedAttestations()
if !reflect.DeepEqual([]*ethpb.Attestation{}, returned) {
t.Error("Did not receive correct aggregated atts")
}
}
2 changes: 2 additions & 0 deletions beacon-chain/operations/attestations/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ type Pool interface {
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestations() []*ethpb.Attestation
UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation
DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestationCount() int
// For attestations that were included in the block.
SaveBlockAttestation(att *ethpb.Attestation) error
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/operations/attestations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
// Start an attestation pool service's main event loop.
func (s *Service) Start() {
go s.prepareForkChoiceAtts()
go s.aggregateRoutine()
go s.pruneAttsPool()
}

Expand Down
2 changes: 0 additions & 2 deletions beacon-chain/rpc/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/rpc/validator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,21 @@ func (as *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
}

// Retrieve the unaggregated attestation from pool.
unaggregatedAtts := as.AttPool.UnAggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
// In case there's left over aggregated attestations in the pool.
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts, err = helpers.AggregateAttestations(append(aggregatedAtts, unaggregatedAtts...))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get aggregate attestations: %v", err)
}

// Save the aggregated attestations to the pool.
if err := as.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not save aggregated attestations: %v", err)
}
if err := as.AttPool.DeleteUnaggregatedAttestations(unaggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not delete unaggregated attestations: %v", err)
}

// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
if len(aggregatedAtts) == 0 {
Expand Down
Loading

0 comments on commit e60ea19

Please sign in to comment.