Skip to content

Commit

Permalink
Add Spans To Attestation Caches (#8556)
Browse files Browse the repository at this point in the history
* add spans

* preston's comments

* add span
  • Loading branch information
nisdas authored Mar 5, 2021
1 parent 32f2f71 commit 067a519
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 40 deletions.
1 change: 1 addition & 0 deletions beacon-chain/operations/attestations/kv/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ go_library(
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_prysmaticlabs_go_bitfield//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@io_opencensus_go//trace:go_default_library",
],
)

Expand Down
27 changes: 20 additions & 7 deletions beacon-chain/operations/attestations/kv/aggregated.go
Original file line number Diff line number Diff line change
@@ -1,36 +1,46 @@
package kv

import (
"context"

"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
attaggregation "github.com/prysmaticlabs/prysm/shared/aggregation/attestations"
log "github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// AggregateUnaggregatedAttestations aggregates the unaggregated attestations and saves the
// newly aggregated attestations in the pool.
// It tracks the unaggregated attestations that weren't able to aggregate to prevent
// the deletion of unaggregated attestations in the pool.
func (c *AttCaches) AggregateUnaggregatedAttestations() error {
func (c *AttCaches) AggregateUnaggregatedAttestations(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestations")
defer span.End()
unaggregatedAtts, err := c.UnaggregatedAttestations()
if err != nil {
return err
}
return c.aggregateUnaggregatedAttestations(unaggregatedAtts)
return c.aggregateUnaggregatedAttestations(ctx, unaggregatedAtts)
}

// AggregateUnaggregatedAttestationsBySlotIndex aggregates the unaggregated attestations and saves
// newly aggregated attestations in the pool. Unaggregated attestations are filtered by slot and
// committee index.
func (c *AttCaches) AggregateUnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) error {
unaggregatedAtts := c.UnaggregatedAttestationsBySlotIndex(slot, committeeIndex)
return c.aggregateUnaggregatedAttestations(unaggregatedAtts)
func (c *AttCaches) AggregateUnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregateUnaggregatedAttestationsBySlotIndex")
defer span.End()
unaggregatedAtts := c.UnaggregatedAttestationsBySlotIndex(ctx, slot, committeeIndex)
return c.aggregateUnaggregatedAttestations(ctx, unaggregatedAtts)
}

func (c *AttCaches) aggregateUnaggregatedAttestations(unaggregatedAtts []*ethpb.Attestation) error {
func (c *AttCaches) aggregateUnaggregatedAttestations(ctx context.Context, unaggregatedAtts []*ethpb.Attestation) error {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.aggregateUnaggregatedAttestations")
defer span.End()

attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(unaggregatedAtts))
for _, att := range unaggregatedAtts {
attDataRoot, err := att.Data.HashTreeRoot()
Expand Down Expand Up @@ -158,7 +168,10 @@ func (c *AttCaches) AggregatedAttestations() []*ethpb.Attestation {

// AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) AggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
func (c *AttCaches) AggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.AggregatedAttestationsBySlotIndex")
defer span.End()

atts := make([]*ethpb.Attestation, 0)

c.aggregatedAttLock.RLock()
Expand Down
40 changes: 21 additions & 19 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kv

import (
"context"
"sort"
"testing"

Expand Down Expand Up @@ -31,10 +32,10 @@ func TestKV_Aggregated_AggregateUnaggregatedAttestations(t *testing.T) {
att8 := testutil.HydrateAttestation(&ethpb.Attestation{Data: &ethpb.AttestationData{Slot: 2}, AggregationBits: bitfield.Bitlist{0b1001}, Signature: sig2.Marshal()})
atts := []*ethpb.Attestation{att1, att2, att3, att4, att5, att6, att7, att8}
require.NoError(t, cache.SaveUnaggregatedAttestations(atts))
require.NoError(t, cache.AggregateUnaggregatedAttestations())
require.NoError(t, cache.AggregateUnaggregatedAttestations(context.Background()))

require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(1, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(2, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(context.Background(), 1, 0)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(context.Background(), 2, 0)), "Did not aggregate correctly")
}

func TestKV_Aggregated_AggregateUnaggregatedAttestationsBySlotIndex(t *testing.T) {
Expand Down Expand Up @@ -63,32 +64,33 @@ func TestKV_Aggregated_AggregateUnaggregatedAttestationsBySlotIndex(t *testing.T
{AggregationBits: bitfield.Bitlist{0b1010}, Data: genData(2, 3), Signature: genSign()},
{AggregationBits: bitfield.Bitlist{0b1100}, Data: genData(2, 4), Signature: genSign()},
}
ctx := context.Background()

// Make sure that no error is produced if aggregation is requested on empty unaggregated list.
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(2, 3))
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 2)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 3)), "Did not aggregate correctly")
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 2, 3))
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 3)), "Did not aggregate correctly")

// Persist unaggregated attestations, and aggregate on per slot/committee index base.
require.NoError(t, cache.SaveUnaggregatedAttestations(atts))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(2, 3))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 1, 2))
require.NoError(t, cache.AggregateUnaggregatedAttestationsBySlotIndex(ctx, 2, 3))

// Committee attestations at a slot should be aggregated.
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(1, 2)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(1, 2)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 2)), "Did not aggregate correctly")
// Committee attestations haven't been aggregated.
require.Equal(t, 2, len(cache.UnaggregatedAttestationsBySlotIndex(1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(1, 3)), "Did not aggregate correctly")
require.Equal(t, 2, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 3)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 1, 3)), "Did not aggregate correctly")
// Committee at a second slot is aggregated.
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(2, 3)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(2, 3)), "Did not aggregate correctly")
require.Equal(t, 0, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 3)))
require.Equal(t, 1, len(cache.AggregatedAttestationsBySlotIndex(ctx, 2, 3)), "Did not aggregate correctly")
// The second committee at second slot is not aggregated.
require.Equal(t, 1, len(cache.UnaggregatedAttestationsBySlotIndex(2, 4)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(2, 4)), "Did not aggregate correctly")
require.Equal(t, 1, len(cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 4)))
require.Equal(t, 0, len(cache.AggregatedAttestationsBySlotIndex(ctx, 2, 4)), "Did not aggregate correctly")
}

func TestKV_Aggregated_SaveAggregatedAttestation(t *testing.T) {
Expand Down
8 changes: 7 additions & 1 deletion beacon-chain/operations/attestations/kv/unaggregated.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package kv

import (
"context"

"github.com/pkg/errors"
types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
stateTrie "github.com/prysmaticlabs/prysm/beacon-chain/state"
"go.opencensus.io/trace"
)

// SaveUnaggregatedAttestation saves an unaggregated attestation in cache.
Expand Down Expand Up @@ -68,7 +71,10 @@ func (c *AttCaches) UnaggregatedAttestations() ([]*ethpb.Attestation, error) {

// UnaggregatedAttestationsBySlotIndex returns the unaggregated attestations in cache,
// filtered by committee index and slot.
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
func (c *AttCaches) UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation {
ctx, span := trace.StartSpan(ctx, "operations.attestations.kv.UnaggregatedAttestationsBySlotIndex")
defer span.End()

atts := make([]*ethpb.Attestation, 0)

c.unAggregateAttLock.RLock()
Expand Down
9 changes: 5 additions & 4 deletions beacon-chain/operations/attestations/kv/unaggregated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kv

import (
"bytes"
"context"
"sort"
"testing"

Expand Down Expand Up @@ -242,11 +243,11 @@ func TestKV_Unaggregated_UnaggregatedAttestationsBySlotIndex(t *testing.T) {
for _, att := range atts {
require.NoError(t, cache.SaveUnaggregatedAttestation(att))
}

returned := cache.UnaggregatedAttestationsBySlotIndex(1, 1)
ctx := context.Background()
returned := cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att1}, returned)
returned = cache.UnaggregatedAttestationsBySlotIndex(1, 2)
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 1, 2)
assert.DeepEqual(t, []*ethpb.Attestation{att2}, returned)
returned = cache.UnaggregatedAttestationsBySlotIndex(2, 1)
returned = cache.UnaggregatedAttestationsBySlotIndex(ctx, 2, 1)
assert.DeepEqual(t, []*ethpb.Attestation{att3}, returned)
}
10 changes: 6 additions & 4 deletions beacon-chain/operations/attestations/pool.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package attestations

import (
"context"

types "github.com/prysmaticlabs/eth2-types"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations/kv"
Expand All @@ -12,20 +14,20 @@ import (
// aggregator actor.
type Pool interface {
// For Aggregated attestations
AggregateUnaggregatedAttestations() error
AggregateUnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) error
AggregateUnaggregatedAttestations(ctx context.Context) error
AggregateUnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) error
SaveAggregatedAttestation(att *ethpb.Attestation) error
SaveAggregatedAttestations(atts []*ethpb.Attestation) error
AggregatedAttestations() []*ethpb.Attestation
AggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
AggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
DeleteAggregatedAttestation(att *ethpb.Attestation) error
HasAggregatedAttestation(att *ethpb.Attestation) (bool, error)
AggregatedAttestationCount() int
// For unaggregated attestations.
SaveUnaggregatedAttestation(att *ethpb.Attestation) error
SaveUnaggregatedAttestations(atts []*ethpb.Attestation) error
UnaggregatedAttestations() ([]*ethpb.Attestation, error)
UnaggregatedAttestationsBySlotIndex(slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
UnaggregatedAttestationsBySlotIndex(ctx context.Context, slot types.Slot, committeeIndex types.CommitteeIndex) []*ethpb.Attestation
DeleteUnaggregatedAttestation(att *ethpb.Attestation) error
DeleteSeenUnaggregatedAttestations() (int, error)
UnaggregatedAttestationCount() int
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/operations/attestations/prepare_forkchoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
defer span.End()

if err := s.pool.AggregateUnaggregatedAttestations(); err != nil {
if err := s.pool.AggregateUnaggregatedAttestations(ctx); err != nil {
return err
}
atts := append(s.pool.AggregatedAttestations(), s.pool.BlockAttestations()...)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestBatchAttestations_Multiple(t *testing.T) {
require.NoError(t, err)

wanted = append(wanted, aggregated...)
require.NoError(t, s.pool.AggregateUnaggregatedAttestations())
require.NoError(t, s.pool.AggregateUnaggregatedAttestations(context.Background()))
received := s.pool.ForkchoiceAttestations()

sort.Slice(received, func(i, j int) bool {
Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/rpc/validator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ func (vs *Server) SubmitAggregateSelectionProof(ctx context.Context, req *ethpb.
return nil, status.Errorf(codes.InvalidArgument, "Validator is not an aggregator")
}

if err := vs.AttPool.AggregateUnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex); err != nil {
if err := vs.AttPool.AggregateUnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex); err != nil {
return nil, status.Errorf(codes.Internal, "Could not aggregate unaggregated attestations")
}
aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts := vs.AttPool.AggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)

// Filter out the best aggregated attestation (ie. the one with the most aggregated bits).
if len(aggregatedAtts) == 0 {
aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(req.Slot, req.CommitteeIndex)
aggregatedAtts = vs.AttPool.UnaggregatedAttestationsBySlotIndex(ctx, req.Slot, req.CommitteeIndex)
if len(aggregatedAtts) == 0 {
return nil, status.Errorf(codes.NotFound, "Could not find attestation for slot and committee in pool")
}
Expand Down

0 comments on commit 067a519

Please sign in to comment.