Skip to content

Commit

Permalink
Pubsub: Broadcast attestations to committee based subnets (#4316)
Browse files Browse the repository at this point in the history
* Working on un-aggregated pubsub topics

* update subscriber to call pool

* checkpointing

* fix

* untested message validation

* minor fixes

* rename slotsSinceGenesis to slotsSince

* some progress on a unit test, subscribe is not being called still...

* dont change topic

* need to set the data on the message

* restore topic

* fixes

* some helpful parameter changes for mainnet operations

* lint

* Terence feedback

* unskip e2e

* Unit test for validate committee index beacon attestation

* PR feedbacK

Co-authored-by: prylabs-bulldozer[bot] <58059840+prylabs-bulldozer[bot]@users.noreply.github.com>
  • Loading branch information
prestonvanloon and prylabs-bulldozer[bot] authored Dec 30, 2019
1 parent ef09782 commit fad01c7
Show file tree
Hide file tree
Showing 25 changed files with 461 additions and 66 deletions.
3 changes: 3 additions & 0 deletions beacon-chain/blockchain/chain_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (s *Service) HeadState(ctx context.Context) (*pb.BeaconState, error) {

// HeadValidatorsIndices returns a list of active validator indices from the head view of a given epoch.
func (s *Service) HeadValidatorsIndices(epoch uint64) ([]uint64, error) {
if s.headState == nil {
return []uint64{}, nil
}
return helpers.ActiveValidatorIndices(s.headState, epoch)
}

Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,9 @@ func (ms *ChainService) ReceiveBlockNoPubsubForkchoice(ctx context.Context, bloc

// HeadSlot mocks HeadSlot method in chain service.
func (ms *ChainService) HeadSlot() uint64 {
if ms.State == nil {
return 0
}
return ms.State.Slot

}
Expand Down Expand Up @@ -170,6 +173,9 @@ func (ms *ChainService) ReceiveAttestationNoPubsub(context.Context, *ethpb.Attes

// HeadValidatorsIndices mocks the same method in the chain service.
func (ms *ChainService) HeadValidatorsIndices(epoch uint64) ([]uint64, error) {
if ms.State == nil {
return []uint64{}, nil
}
return helpers.ActiveValidatorIndices(ms.State, epoch)
}

Expand Down
6 changes: 6 additions & 0 deletions beacon-chain/core/helpers/slot_epoch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package helpers

import (
"fmt"
"time"

pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/params"
Expand Down Expand Up @@ -100,3 +101,8 @@ func VerifySlotTime(genesisTime uint64, slot uint64) error {
}
return nil
}

// SlotsSince computes the number of time slots that have occurred since the given timestamp.
func SlotsSince(time time.Time) uint64 {
return uint64(roughtime.Since(time).Seconds()) / params.BeaconConfig().SecondsPerSlot
}
1 change: 1 addition & 0 deletions beacon-chain/p2p/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ go_test(
"@com_github_libp2p_go_libp2p_pubsub//:go_default_library",
"@com_github_libp2p_go_libp2p_swarm//testing:go_default_library",
"@com_github_multiformats_go_multiaddr//:go_default_library",
"@com_github_prysmaticlabs_ethereumapis//eth/v1alpha1:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
],
)
27 changes: 23 additions & 4 deletions beacon-chain/p2p/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ package p2p
import (
"bytes"
"context"
"fmt"
"reflect"

"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/hashutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"go.opencensus.io/trace"
Expand All @@ -20,11 +22,20 @@ var ErrMessageNotMapped = errors.New("message type is not mapped to a PubSub top
func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
ctx, span := trace.StartSpan(ctx, "p2p.Broadcast")
defer span.End()
topic, ok := GossipTypeMapping[reflect.TypeOf(msg)]
if !ok {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped

var topic string
switch msg.(type) {
case *eth.Attestation:
topic = attestationToTopic(msg.(*eth.Attestation))
default:
var ok bool
topic, ok = GossipTypeMapping[reflect.TypeOf(msg)]
if !ok {
traceutil.AnnotateError(span, ErrMessageNotMapped)
return ErrMessageNotMapped
}
}

span.AddAttributes(trace.StringAttribute("topic", topic))

buf := new(bytes.Buffer)
Expand All @@ -47,3 +58,11 @@ func (s *Service) Broadcast(ctx context.Context, msg proto.Message) error {
}
return nil
}

const attestationSubnetTopicFormat = "/eth2/committee_index%d_beacon_attestation"
func attestationToTopic(att *eth.Attestation) string {
if att == nil || att.Data == nil {
return ""
}
return fmt.Sprintf(attestationSubnetTopicFormat, att.Data.CommitteeIndex)
}
49 changes: 49 additions & 0 deletions beacon-chain/p2p/broadcaster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/gogo/protobuf/proto"
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
testpb "github.com/prysmaticlabs/prysm/proto/testing"
"github.com/prysmaticlabs/prysm/shared/testutil"
Expand Down Expand Up @@ -82,3 +83,51 @@ func TestService_Broadcast_ReturnsErr_TopicNotMapped(t *testing.T) {
t.Fatalf("Expected error %v, got %v", ErrMessageNotMapped, err)
}
}

func TestService_Attestation_Subnet(t *testing.T) {
if gtm := GossipTypeMapping[reflect.TypeOf(&eth.Attestation{})]; gtm != attestationSubnetTopicFormat {
t.Errorf("Constant is out of date. Wanted %s, got %s", attestationSubnetTopicFormat, gtm)
}

tests := []struct {
att *eth.Attestation
topic string
}{
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 0,
},
},
topic: "/eth2/committee_index0_beacon_attestation",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 11,
},
},
topic: "/eth2/committee_index11_beacon_attestation",
},
{
att: &eth.Attestation{
Data: &eth.AttestationData{
CommitteeIndex: 55,
},
},
topic: "/eth2/committee_index55_beacon_attestation",
},
{
att: &eth.Attestation{},
topic: "",
},
{
topic: "",
},
}
for _, tt := range tests {
if res := attestationToTopic(tt.att); res != tt.topic {
t.Errorf("Wrong topic, got %s wanted %s", res, tt.topic)
}
}
}
12 changes: 6 additions & 6 deletions beacon-chain/p2p/gossip_topic_mappings.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,12 @@ import (
// GossipTopicMappings represent the protocol ID to protobuf message type map for easy
// lookup.
var GossipTopicMappings = map[string]proto.Message{
"/eth2/beacon_block": &pb.BeaconBlock{},
"/eth2/beacon_attestation": &pb.Attestation{},
"/eth2/voluntary_exit": &pb.VoluntaryExit{},
"/eth2/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/attester_slashing": &pb.AttesterSlashing{},
"/eth2/beacon_aggregate_and_proof": &pb.AggregateAttestationAndProof{},
"/eth2/beacon_block": &pb.BeaconBlock{},
"/eth2/committee_index%d_beacon_attestation": &pb.Attestation{},
"/eth2/voluntary_exit": &pb.VoluntaryExit{},
"/eth2/proposer_slashing": &pb.ProposerSlashing{},
"/eth2/attester_slashing": &pb.AttesterSlashing{},
"/eth2/beacon_aggregate_and_proof": &pb.AggregateAttestationAndProof{},
}

// GossipTypeMapping is the inverse of GossipTopicMappings so that an arbitrary protobuf message
Expand Down
4 changes: 3 additions & 1 deletion beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ go_library(
"service.go",
"subscriber.go",
"subscriber_beacon_aggregate_proof.go",
"subscriber_beacon_attestation.go",
"subscriber_beacon_blocks.go",
"subscriber_committee_index_beacon_attestation.go",
"subscriber_handlers.go",
"validate_aggregate_proof.go",
"validate_attester_slashing.go",
"validate_beacon_attestation.go",
"validate_beacon_blocks.go",
"validate_committee_index_beacon_attestation.go",
"validate_proposer_slashing.go",
"validate_voluntary_exit.go",
],
Expand Down Expand Up @@ -84,11 +84,13 @@ go_test(
"rpc_test.go",
"subscriber_beacon_aggregate_proof_test.go",
"subscriber_beacon_blocks_test.go",
"subscriber_committee_index_beacon_attestation_test.go",
"subscriber_test.go",
"validate_aggregate_proof_test.go",
"validate_attester_slashing_test.go",
"validate_beacon_attestation_test.go",
"validate_beacon_blocks_test.go",
"validate_committee_index_beacon_attestation_test.go",
"validate_proposer_slashing_test.go",
"validate_voluntary_exit_test.go",
],
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ go_library(
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/mathutil:go_default_library",
"//shared/params:go_default_library",
"//shared/roughtime:go_default_library",
"@com_github_libp2p_go_libp2p_core//peer:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
Expand All @@ -41,6 +40,7 @@ go_test(
tags = ["race_on"],
deps = [
"//beacon-chain/blockchain/testing:go_default_library",
"//beacon-chain/core/helpers:go_default_library",
"//beacon-chain/db/testing:go_default_library",
"//beacon-chain/p2p/peers:go_default_library",
"//beacon-chain/p2p/testing:go_default_library",
Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/sync/initial-sync/round_robin.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func (s *Service) roundRobinSync(genesis time.Time) error {

log.Debug("Synced to finalized epoch - now syncing blocks up to current head")

if s.chain.HeadSlot() == slotsSinceGenesis(genesis) {
if s.chain.HeadSlot() == helpers.SlotsSince(genesis) {
return nil
}

Expand All @@ -234,11 +234,11 @@ func (s *Service) roundRobinSync(genesis time.Time) error {
best = s.bestPeer()
root, _, _ = s.bestFinalized()
}
for head := slotsSinceGenesis(genesis); s.chain.HeadSlot() < head; {
for head := helpers.SlotsSince(genesis); s.chain.HeadSlot() < head; {
req := &p2ppb.BeaconBlocksByRangeRequest{
HeadBlockRoot: root,
StartSlot: s.chain.HeadSlot() + 1,
Count: mathutil.Min(slotsSinceGenesis(genesis)-s.chain.HeadSlot()+1, 256),
Count: mathutil.Min(helpers.SlotsSince(genesis)-s.chain.HeadSlot()+1, 256),
Step: 1,
}

Expand Down Expand Up @@ -363,7 +363,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing
if rate == 0 {
rate = 1
}
timeRemaining := time.Duration(float64(slotsSinceGenesis(genesis)-blk.Slot)/rate) * time.Second
timeRemaining := time.Duration(float64(helpers.SlotsSince(genesis)-blk.Slot)/rate) * time.Second
log.WithField(
"peers",
fmt.Sprintf("%d/%d", len(syncingPeers), len(s.p2p.Peers().Connected())),
Expand All @@ -373,7 +373,7 @@ func (s *Service) logSyncStatus(genesis time.Time, blk *eth.BeaconBlock, syncing
).Infof(
"Processing block %d/%d - estimated time remaining %s",
blk.Slot,
slotsSinceGenesis(genesis),
helpers.SlotsSince(genesis),
timeRemaining,
)
}
5 changes: 3 additions & 2 deletions beacon-chain/sync/initial-sync/round_robin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/prysmaticlabs/go-ssz"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"

eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand Down Expand Up @@ -372,8 +373,8 @@ func makeGenesisTime(currentSlot uint64) time.Time {
func TestMakeGenesisTime(t *testing.T) {
currentSlot := uint64(64)
gt := makeGenesisTime(currentSlot)
if slotsSinceGenesis(gt) != currentSlot {
t.Fatalf("Wanted %d, got %d", currentSlot, slotsSinceGenesis(gt))
if helpers.SlotsSince(gt) != currentSlot {
t.Fatalf("Wanted %d, got %d", currentSlot, helpers.SlotsSince(gt))
}
}

Expand Down
6 changes: 1 addition & 5 deletions beacon-chain/sync/initial-sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/roughtime"
)

Expand Down Expand Up @@ -101,7 +100,7 @@ func (s *Service) Start() {
time.Sleep(roughtime.Until(genesis))
}
s.chainStarted = true
currentSlot := slotsSinceGenesis(genesis)
currentSlot := helpers.SlotsSince(genesis)
if helpers.SlotToEpoch(currentSlot) == 0 {
log.Info("Chain started within the last epoch - not syncing")
s.synced = true
Expand Down Expand Up @@ -154,6 +153,3 @@ func (s *Service) Syncing() bool {
return !s.synced
}

func slotsSinceGenesis(genesisTime time.Time) uint64 {
return uint64(roughtime.Since(genesisTime).Seconds()) / params.BeaconConfig().SecondsPerSlot
}
30 changes: 13 additions & 17 deletions beacon-chain/sync/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,6 @@ func (r *Service) registerSubscribers() {
r.validateBeaconBlockPubSub,
r.beaconBlockSubscriber,
)
r.subscribe(
"/eth2/beacon_attestation",
r.validateBeaconAttestation,
r.beaconAttestationSubscriber,
)
r.subscribe(
"/eth2/beacon_aggregate_and_proof",
r.validateAggregateAndProof,
Expand All @@ -92,13 +87,12 @@ func (r *Service) registerSubscribers() {
r.validateAttesterSlashing,
r.attesterSlashingSubscriber,
)
// TODO(4154): Uncomment.
//r.subscribeDynamic(
// "/eth2/committee_index/%d_beacon_attestation",
// r.currentCommitteeIndex, /* determineSubsLen */
// noopValidator, /* validator */
// r.committeeIndexBeaconAttestationSubscriber, /* message handler */
//)
r.subscribeDynamic(
"/eth2/committee_index%d_beacon_attestation",
r.currentCommitteeIndex, /* determineSubsLen */
r.validateCommitteeIndexBeaconAttestation, /* validator */
r.committeeIndexBeaconAttestationSubscriber, /* message handler */
)
}

// subscribe to a given topic with a given validator and subscription handler.
Expand All @@ -108,7 +102,10 @@ func (r *Service) subscribe(topic string, validator pubsub.Validator, handle sub
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topic))
}
return r.subscribeWithBase(base, topic, validator, handle)
}

func (r *Service) subscribeWithBase(base proto.Message, topic string, validator pubsub.Validator, handle subHandler) *pubsub.Subscription {
topic += r.p2p.Encoding().ProtocolSuffix()
log := log.WithField("topic", topic)

Expand Down Expand Up @@ -152,7 +149,7 @@ func (r *Service) subscribe(topic string, validator pubsub.Validator, handle sub
if err := handle(ctx, msg.ValidatorData.(proto.Message)); err != nil {
traceutil.AnnotateError(span, err)
log.WithError(err).Error("Failed to handle p2p pubsub")
messageFailedProcessingCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc()
messageFailedProcessingCounter.WithLabelValues(topic).Inc()
return
}
}
Expand All @@ -171,7 +168,7 @@ func (r *Service) subscribe(topic string, validator pubsub.Validator, handle sub
continue
}

messageReceivedCounter.WithLabelValues(topic + r.p2p.Encoding().ProtocolSuffix()).Inc()
messageReceivedCounter.WithLabelValues(topic).Inc()

go pipeline(msg)
}
Expand Down Expand Up @@ -204,7 +201,6 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i
if base == nil {
panic(fmt.Sprintf("%s is not mapped to any message in GossipTopicMappings", topicFormat))
}
topicFormat += r.p2p.Encoding().ProtocolSuffix()

var subscriptions []*pubsub.Subscription

Expand All @@ -227,8 +223,8 @@ func (r *Service) subscribeDynamic(topicFormat string, determineSubsLen func() i
sub.Cancel()
}
} else if len(subscriptions) < wantedSubs { // Increase topics
for i := len(subscriptions) - 1; i < wantedSubs; i++ {
sub := r.subscribe(fmt.Sprintf(topicFormat, i), validate, handle)
for i := len(subscriptions); i < wantedSubs; i++ {
sub := r.subscribeWithBase(base, fmt.Sprintf(topicFormat, i), validate, handle)
subscriptions = append(subscriptions, sub)
}
}
Expand Down
Loading

0 comments on commit fad01c7

Please sign in to comment.