From 92233734f7c5f31db0316683df3b50c0937fad84 Mon Sep 17 00:00:00 2001 From: nisdas Date: Tue, 17 Nov 2020 13:00:40 +0800 Subject: [PATCH] fix --- beacon-chain/flags/base.go | 4 ++++ beacon-chain/flags/config.go | 5 +++++ beacon-chain/main.go | 1 + beacon-chain/sync/metrics.go | 15 ++++++++++++--- beacon-chain/sync/subscriber.go | 19 ++++++++++++++----- beacon-chain/usage.go | 1 + 6 files changed, 37 insertions(+), 8 deletions(-) diff --git a/beacon-chain/flags/base.go b/beacon-chain/flags/base.go index d266b7955714..001a48acbb5a 100644 --- a/beacon-chain/flags/base.go +++ b/beacon-chain/flags/base.go @@ -130,6 +130,10 @@ var ( Name: "enable-debug-rpc-endpoints", Usage: "Enables the debug rpc service, containing utility endpoints such as /eth/v1alpha1/beacon/state.", } + SubscribeToAllSubnets = &cli.BoolFlag{ + Name: "subscribe-all-subnets", + Usage: "Subscribe to all possible attestation subnets.", + } // HistoricalSlasherNode is a set of beacon node flags required for performing historical detection with a slasher. HistoricalSlasherNode = &cli.BoolFlag{ Name: "historical-slasher-node", diff --git a/beacon-chain/flags/config.go b/beacon-chain/flags/config.go index cec524954622..f593ae85b031 100644 --- a/beacon-chain/flags/config.go +++ b/beacon-chain/flags/config.go @@ -12,6 +12,7 @@ type GlobalFlags struct { HeadSync bool DisableSync bool DisableDiscv5 bool + SubscribeToAllSubnets bool MinimumSyncPeers int BlockBatchLimit int BlockBatchLimitBurstFactor int @@ -44,6 +45,10 @@ func ConfigureGlobalFlags(ctx *cli.Context) { log.Warn("Using Disable Sync flag, using this flag on a live network might lead to adverse consequences.") cfg.DisableSync = true } + if ctx.Bool(SubscribeToAllSubnets.Name) { + log.Warn("Subscribing to All Attestation Subnets") + cfg.SubscribeToAllSubnets = true + } cfg.DisableDiscv5 = ctx.Bool(DisableDiscv5.Name) cfg.BlockBatchLimit = ctx.Int(BlockBatchLimit.Name) cfg.BlockBatchLimitBurstFactor = ctx.Int(BlockBatchLimitBurstFactor.Name) diff --git a/beacon-chain/main.go b/beacon-chain/main.go index 9d5343aa1102..90f720c25a07 100644 --- a/beacon-chain/main.go +++ b/beacon-chain/main.go @@ -50,6 +50,7 @@ var appFlags = []cli.Flag{ flags.InteropGenesisTimeFlag, flags.SlotsPerArchivedPoint, flags.EnableDebugRPCEndpoints, + flags.SubscribeToAllSubnets, flags.EnableBackupWebhookFlag, flags.BackupWebhookOutputDir, flags.HistoricalSlasherNode, diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index 2b3353902a15..bf68843f59d8 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -8,6 +8,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/shared/params" ) @@ -66,11 +67,19 @@ func (s *Service) updateMetrics() { if err != nil { log.WithError(err).Debugf("Could not compute fork digest") } + indices := s.aggregatorSubnetIndices(s.chain.CurrentSlot()) attTopic := p2p.GossipTypeMapping[reflect.TypeOf(&pb.Attestation{})] attTopic += s.p2p.Encoding().ProtocolSuffix() - for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { - formattedTopic := fmt.Sprintf(attTopic, digest, i) - topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + if flags.Get().SubscribeToAllSubnets { + for i := uint64(0); i < params.BeaconNetworkConfig().AttestationSubnetCount; i++ { + formattedTopic := fmt.Sprintf(attTopic, digest, i) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + } + } else { + for _, committeeIdx := range indices { + formattedTopic := fmt.Sprintf(attTopic, digest, committeeIdx) + topicPeerCount.WithLabelValues(formattedTopic).Set(float64(len(s.p2p.PubSub().ListPeers(formattedTopic)))) + } } // We update all other gossip topics. diff --git a/beacon-chain/sync/subscriber.go b/beacon-chain/sync/subscriber.go index 57e7ef416d52..e6e45afb3876 100644 --- a/beacon-chain/sync/subscriber.go +++ b/beacon-chain/sync/subscriber.go @@ -12,6 +12,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" pubsub "github.com/libp2p/go-libp2p-pubsub" pb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/flags" "github.com/prysmaticlabs/prysm/beacon-chain/p2p" "github.com/prysmaticlabs/prysm/shared/messagehandler" "github.com/prysmaticlabs/prysm/shared/p2putils" @@ -65,11 +66,19 @@ func (s *Service) registerSubscribers() { s.validateAttesterSlashing, s.attesterSlashingSubscriber, ) - s.subscribeDynamicWithSubnets( - "/eth2/%x/beacon_attestation_%d", - s.validateCommitteeIndexBeaconAttestation, /* validator */ - s.committeeIndexBeaconAttestationSubscriber, /* message handler */ - ) + if flags.Get().SubscribeToAllSubnets { + s.subscribeStaticWithSubnets( + "/eth2/%x/beacon_attestation_%d", + s.validateCommitteeIndexBeaconAttestation, /* validator */ + s.committeeIndexBeaconAttestationSubscriber, /* message handler */ + ) + } else { + s.subscribeDynamicWithSubnets( + "/eth2/%x/beacon_attestation_%d", + s.validateCommitteeIndexBeaconAttestation, /* validator */ + s.committeeIndexBeaconAttestationSubscriber, /* message handler */ + ) + } } // subscribe to a given topic with a given validator and subscription handler. diff --git a/beacon-chain/usage.go b/beacon-chain/usage.go index ef0c97ea95d2..df599b3fb99e 100644 --- a/beacon-chain/usage.go +++ b/beacon-chain/usage.go @@ -105,6 +105,7 @@ var appHelpFlagGroups = []flagGroup{ flags.BlockBatchLimit, flags.BlockBatchLimitBurstFactor, flags.EnableDebugRPCEndpoints, + flags.SubscribeToAllSubnets, flags.HistoricalSlasherNode, flags.ChainID, flags.NetworkID,