Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate on demand for v0.11 #5302

Merged
merged 34 commits into from
Apr 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
79e0674
Add client implementation
terencechain Apr 3, 2020
17b771b
Update workspace
terencechain Apr 3, 2020
d7cf690
Update server
terencechain Apr 3, 2020
d7615ea
Update service
terencechain Apr 3, 2020
10ca5c8
Gaz
terencechain Apr 3, 2020
5950ab6
Mocks
terencechain Apr 3, 2020
cd7a236
Fixed validator tests
terencechain Apr 3, 2020
d8f7690
Add round tirp tests
terencechain Apr 3, 2020
956e8a5
Merge branch 'v0.11' of github.com:prysmaticlabs/prysm into rpc-subsc…
terencechain Apr 3, 2020
5a996db
Fixed subnet test
terencechain Apr 3, 2020
5c94393
Wait 1/3 on validator side
terencechain Apr 3, 2020
2552ec7
Lint
terencechain Apr 3, 2020
e9d1ee1
Comment
terencechain Apr 3, 2020
de6b1ce
Update committee cache
terencechain Apr 4, 2020
38f9b68
Comment
terencechain Apr 4, 2020
cb1a4c6
Update RPC
terencechain Apr 4, 2020
3190aa3
Fixed test
terencechain Apr 4, 2020
81da6d9
Nishant's comment
terencechain Apr 4, 2020
a8e0784
Gaz
terencechain Apr 4, 2020
1202cea
Merge refs/heads/v0.11 into rpc-subscribe-subnet-1
prylabs-bulldozer[bot] Apr 4, 2020
62f9817
Refresh ENR is for epoch
terencechain Apr 4, 2020
9155e78
Needs to be append
terencechain Apr 4, 2020
1a8c7b4
Merge branch 'rpc-subscribe-subnet' into rpc-subscribe-subnet-1
terencechain Apr 4, 2020
76f951f
Merge branch 'v0.11' of github.com:prysmaticlabs/prysm into rpc-subsc…
terencechain Apr 4, 2020
4b5004c
Fixed duplication
terencechain Apr 4, 2020
5107058
Merge branch 'rpc-subscribe-subnet-1' of github.com:prysmaticlabs/pry…
terencechain Apr 4, 2020
c3e1db3
Tests
terencechain Apr 4, 2020
e55b598
Skip e2e
terencechain Apr 4, 2020
4b2511f
Merge refs/heads/v0.11 into rpc-subscribe-subnet-1
prylabs-bulldozer[bot] Apr 4, 2020
5479aec
Update beacon-chain/rpc/validator/aggregator.go
terencechain Apr 5, 2020
8b17c5c
Merge refs/heads/v0.11 into rpc-subscribe-subnet-1
prylabs-bulldozer[bot] Apr 6, 2020
403efa8
Apply suggestions from code review
rauljordan Apr 6, 2020
48faf54
Merge refs/heads/v0.11 into rpc-subscribe-subnet-1
prylabs-bulldozer[bot] Apr 6, 2020
42c7e3a
Merge refs/heads/v0.11 into rpc-subscribe-subnet-1
prylabs-bulldozer[bot] Apr 6, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions beacon-chain/operations/attestations/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"aggregate.go",
"log.go",
"metrics.go",
"pool.go",
Expand Down Expand Up @@ -34,7 +33,6 @@ go_library(
go_test(
name = "go_default_test",
srcs = [
"aggregate_test.go",
"pool_test.go",
"prepare_forkchoice_test.go",
"prune_expired_test.go",
Expand All @@ -50,6 +48,5 @@ go_test(
"@com_github_gogo_protobuf//proto:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@in_gopkg_d4l3k_messagediff_v1//:go_default_library",
],
)
79 changes: 0 additions & 79 deletions beacon-chain/operations/attestations/aggregate.go

This file was deleted.

127 changes: 0 additions & 127 deletions beacon-chain/operations/attestations/aggregate_test.go

This file was deleted.

39 changes: 39 additions & 0 deletions beacon-chain/operations/attestations/kv/unaggregated.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ func (p *AttCaches) UnaggregatedAttestations() []*ethpb.Attestation {
return atts
}

// UnAggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
// filtered by committee index and slot.
func (p *AttCaches) UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation {
atts := make([]*ethpb.Attestation, 0)

p.unAggregateAttLock.RLock()
defer p.unAggregateAttLock.RUnlock()
for _, a := range p.unAggregatedAtt {
if slot == a.Data.Slot && committeeIndex == a.Data.CommitteeIndex {
atts = append(atts, a)
}
}

return atts
}

// DeleteUnaggregatedAttestation deletes the unaggregated attestations in cache.
func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error {
if att == nil {
Expand All @@ -73,6 +89,29 @@ func (p *AttCaches) DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
return nil
}

// DeleteUnaggregatedAttestations deletes the unaggregated attestations in cache.
func (p *AttCaches) DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error {
p.unAggregateAttLock.Lock()
defer p.unAggregateAttLock.Unlock()
for _, att := range atts {
if att == nil {
continue
}
if helpers.IsAggregated(att) {
return errors.New("attestation is aggregated")
}

r, err := hashFn(att)
if err != nil {
return errors.Wrap(err, "could not tree hash attestation")
}

delete(p.unAggregatedAtt, r)
}

return nil
}

// UnaggregatedAttestationCount returns the number of unaggregated attestations key in the pool.
func (p *AttCaches) UnaggregatedAttestationCount() int {
p.unAggregateAttLock.RLock()
Expand Down
48 changes: 48 additions & 0 deletions beacon-chain/operations/attestations/kv/unaggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,51 @@ func TestKV_Unaggregated_CanDelete(t *testing.T) {
t.Error("Did not receive correct aggregated atts")
}
}

func TestKV_Unaggregated_FilterBySlotsAndCommitteeIDs(t *testing.T) {
cache := NewAttCaches()

att1 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}}
att2 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att3 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att4 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}}

atts := []*ethpb.Attestation{att1, att2, att3, att4}

for _, att := range atts {
if err := cache.SaveUnaggregatedAttestation(att); err != nil {
t.Fatal(err)
}
}

returned := cache.UnAggregatedAttestationsBySlotIndex(1, 1)

if !reflect.DeepEqual([]*ethpb.Attestation{att1, att3}, returned) {
t.Error("Did not receive correct aggregated atts")
}
}

func TestKV_Unaggregated_BatchDelete(t *testing.T) {
cache := NewAttCaches()

att1 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b101}}
att2 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att3 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 1}, AggregationBits: bitfield.Bitlist{0b110}}
att4 := &ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 1, CommitteeIndex: 2}, AggregationBits: bitfield.Bitlist{0b110}}

atts := []*ethpb.Attestation{att1, att2, att3, att4}

for _, att := range atts {
if err := cache.SaveUnaggregatedAttestation(att); err != nil {
t.Fatal(err)
}
}
if err := cache.DeleteUnaggregatedAttestations(atts); err != nil {
t.Fatal(err)
}

returned := cache.UnaggregatedAttestations()
if !reflect.DeepEqual([]*ethpb.Attestation{}, returned) {
t.Error("Did not receive correct aggregated atts")
}
}
2 changes: 2 additions & 0 deletions beacon-chain/operations/attestations/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ type Pool interface {
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestations() []*ethpb.Attestation
UnAggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation
DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
DeleteUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestationCount() int
// For attestations that were included in the block.
SaveBlockAttestation(att *ethpb.Attestation) error
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/operations/attestations/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func NewService(ctx context.Context, cfg *Config) (*Service, error) {
// Start an attestation pool service's main event loop.
func (s *Service) Start() {
go s.prepareForkChoiceAtts()
go s.aggregateRoutine()
go s.pruneAttsPool()
}

Expand Down
2 changes: 0 additions & 2 deletions beacon-chain/rpc/validator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@ go_library(
"//shared/featureconfig:go_default_library",
"//shared/hashutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/slotutil:go_default_library",
"//shared/traceutil:go_default_library",
"//shared/trieutil:go_default_library",
"@com_github_gogo_protobuf//types:go_default_library",
Expand Down
14 changes: 14 additions & 0 deletions beacon-chain/rpc/validator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,21 @@ func (as *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
}

// Retrieve the unaggregated attestation from pool.
unaggregatedAtts := as.AttPool.UnAggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
// In case there's left over aggregated attestations in the pool.
aggregatedAtts := as.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts, err = helpers.AggregateAttestations(append(aggregatedAtts, unaggregatedAtts...))
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not get aggregate attestations: %v", err)
}

// Save the aggregated attestations to the pool.
if err := as.AttPool.SaveAggregatedAttestations(aggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not save aggregated attestations: %v", err)
}
if err := as.AttPool.DeleteUnaggregatedAttestations(unaggregatedAtts); err != nil {
return nil, status.Errorf(codes.Internal, "Could not delete unaggregated attestations: %v", err)
}

// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
if len(aggregatedAtts) == 0 {
Expand Down
Loading