From af3d3e8cd33466f256ac68b28e68f85aba8df993 Mon Sep 17 00:00:00 2001 From: Nishant Das Date: Fri, 2 Apr 2021 20:21:19 +0800 Subject: [PATCH] Update Gossip Parameters (#8683) * add in more accurate aggregate parameters * add more param changes * more cleanup * fix order of operations * comments * remove redundant declaration * clean up better * fix up * victor's review * disable mesh scoring * disable mesh scoring Co-authored-by: Raul Jordan Co-authored-by: Victor Farazdagi --- beacon-chain/p2p/gossip_scoring_params.go | 213 ++++++++++++++---- .../p2p/gossip_scoring_params_test.go | 17 ++ beacon-chain/p2p/parameter_test.go | 22 -- beacon-chain/p2p/pubsub.go | 36 ++- 4 files changed, 211 insertions(+), 77 deletions(-) diff --git a/beacon-chain/p2p/gossip_scoring_params.go b/beacon-chain/p2p/gossip_scoring_params.go index 7f4fc8014c16..f021e47ba062 100644 --- a/beacon-chain/p2p/gossip_scoring_params.go +++ b/beacon-chain/p2p/gossip_scoring_params.go @@ -2,6 +2,7 @@ package p2p import ( "math" + "reflect" "strings" "time" @@ -10,6 +11,7 @@ import ( "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" "github.com/prysmaticlabs/prysm/shared/params" + "github.com/sirupsen/logrus" ) const ( @@ -32,9 +34,22 @@ const ( // our voluntary exit topic. voluntaryExitWeight = 0.05 + // maxInMeshScore describes the max score a peer can attain from being in the mesh. + maxInMeshScore = 10 + // maxFirstDeliveryScore describes the max score a peer can obtain from first deliveries. + maxFirstDeliveryScore = 40 + // decayToZero specifies the terminal value that we will use when decaying // a value. decayToZero = 0.01 + + // dampeningFactor reduces the amount by which the various thresholds and caps are created. + dampeningFactor = 90 +) + +var ( + // a bool to check if we enable scoring for messages in the mesh sent for near first deliveries. + meshDeliveryIsScored = false ) func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) { @@ -74,9 +89,9 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro case strings.Contains(topic, "beacon_block"): return defaultBlockTopicParams(), nil case strings.Contains(topic, "beacon_aggregate_and_proof"): - return defaultAggregateTopicParams(activeValidators), nil + return defaultAggregateTopicParams(activeValidators) case strings.Contains(topic, "beacon_attestation"): - return defaultAggregateSubnetTopicParams(activeValidators), nil + return defaultAggregateSubnetTopicParams(activeValidators) case strings.Contains(topic, "voluntary_exit"): return defaultVoluntaryExitTopicParams(), nil case strings.Contains(topic, "proposer_slashing"): @@ -110,96 +125,139 @@ func (s *Service) retrieveActiveValidators() (uint64, error) { return helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState)) } -// Based on Ben's tested parameters for lighthouse. +// Based on the lighthouse parameters. // https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c func defaultBlockTopicParams() *pubsub.TopicScoreParams { decayEpoch := time.Duration(5) blocksPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch) + meshWeight := -0.717 + if !meshDeliveryIsScored { + // Set the mesh weight as zero as a temporary measure, so as to prevent + // the average nodes from being penalised. + meshWeight = 0 + } return &pubsub.TopicScoreParams{ TopicWeight: beaconBlockWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: 1 * oneSlotDuration(), - TimeInMeshCap: 300, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), FirstMessageDeliveriesWeight: 1, FirstMessageDeliveriesDecay: scoreDecay(20 * oneEpochDuration()), FirstMessageDeliveriesCap: 23, - MeshMessageDeliveriesWeight: -0.717, + MeshMessageDeliveriesWeight: meshWeight, MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * oneEpochDuration()), MeshMessageDeliveriesCap: float64(blocksPerEpoch * uint64(decayEpoch)), MeshMessageDeliveriesThreshold: float64(blocksPerEpoch*uint64(decayEpoch)) / 10, MeshMessageDeliveriesWindow: 2 * time.Second, MeshMessageDeliveriesActivation: 4 * oneEpochDuration(), - MeshFailurePenaltyWeight: -0.717, + MeshFailurePenaltyWeight: meshWeight, MeshFailurePenaltyDecay: scoreDecay(decayEpoch * oneEpochDuration()), InvalidMessageDeliveriesWeight: -140.4475, InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()), } } -func defaultAggregateTopicParams(activeValidators uint64) *pubsub.TopicScoreParams { - aggPerEpoch := aggregatorsPerSlot(activeValidators) * uint64(params.BeaconConfig().SlotsPerEpoch) +func defaultAggregateTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) { + // Determine the expected message rate for the particular gossip topic. + aggPerSlot := aggregatorsPerSlot(activeValidators) + firstMessageCap, err := decayLimit(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot*2/gossipSubD)) + if err != nil { + return nil, err + } + firstMessageWeight := maxFirstDeliveryScore / firstMessageCap + meshThreshold, err := decayThreshold(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot)/dampeningFactor) + if err != nil { + return nil, err + } + meshWeight := -scoreByWeight(aggregateWeight, meshThreshold) + meshCap := 4 * meshThreshold + if !meshDeliveryIsScored { + // Set the mesh weight as zero as a temporary measure, so as to prevent + // the average nodes from being penalised. + meshWeight = 0 + } return &pubsub.TopicScoreParams{ TopicWeight: aggregateWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: 1 * oneSlotDuration(), - TimeInMeshCap: 300, - FirstMessageDeliveriesWeight: 0.128, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), + FirstMessageDeliveriesWeight: firstMessageWeight, FirstMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()), - FirstMessageDeliveriesCap: 179, - MeshMessageDeliveriesWeight: -0.064, + FirstMessageDeliveriesCap: firstMessageCap, + MeshMessageDeliveriesWeight: meshWeight, MeshMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()), - MeshMessageDeliveriesCap: float64(aggPerEpoch), - MeshMessageDeliveriesThreshold: float64(aggPerEpoch / 50), + MeshMessageDeliveriesCap: meshCap, + MeshMessageDeliveriesThreshold: meshThreshold, MeshMessageDeliveriesWindow: 2 * time.Second, - MeshMessageDeliveriesActivation: 32 * oneSlotDuration(), - MeshFailurePenaltyWeight: -0.064, + MeshMessageDeliveriesActivation: 1 * oneEpochDuration(), + MeshFailurePenaltyWeight: meshWeight, MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()), - InvalidMessageDeliveriesWeight: -140.4475, + InvalidMessageDeliveriesWeight: -maxScore() / aggregateWeight, InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()), - } + }, nil } -func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicScoreParams { - topicWeight := attestationTotalWeight / float64(params.BeaconNetworkConfig().AttestationSubnetCount) - subnetWeight := activeValidators / params.BeaconNetworkConfig().AttestationSubnetCount - minimumWeight := subnetWeight / 50 +func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) { + subnetCount := params.BeaconNetworkConfig().AttestationSubnetCount + // Get weight for each specific subnet. + topicWeight := attestationTotalWeight / float64(subnetCount) + subnetWeight := activeValidators / subnetCount + // Determine the amount of validators expected in a subnet in a single slot. numPerSlot := time.Duration(subnetWeight / uint64(params.BeaconConfig().SlotsPerEpoch)) comsPerSlot := committeeCountPerSlot(activeValidators) - exceedsThreshold := comsPerSlot >= 2*params.BeaconNetworkConfig().AttestationSubnetCount/uint64(params.BeaconConfig().SlotsPerEpoch) + exceedsThreshold := comsPerSlot >= 2*subnetCount/uint64(params.BeaconConfig().SlotsPerEpoch) firstDecay := time.Duration(1) meshDecay := time.Duration(4) if exceedsThreshold { firstDecay = 4 meshDecay = 16 } + // Determine expected first deliveries based on the message rate. + firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(numPerSlot*2/gossipSubD)) + if err != nil { + return nil, err + } + firstMessageWeight := maxFirstDeliveryScore / firstMessageCap + // Determine expected mesh deliveries based on message rate applied with a dampening factor. + meshThreshold, err := decayThreshold(scoreDecay(meshDecay*oneEpochDuration()), float64(numPerSlot)/dampeningFactor) + if err != nil { + return nil, err + } + meshWeight := -scoreByWeight(topicWeight, meshThreshold) + meshCap := 4 * meshThreshold + if !meshDeliveryIsScored { + // Set the mesh weight as zero as a temporary measure, so as to prevent + // the average nodes from being penalised. + meshWeight = 0 + } return &pubsub.TopicScoreParams{ TopicWeight: topicWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: numPerSlot, - TimeInMeshCap: 300, - FirstMessageDeliveriesWeight: 0.955, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), + FirstMessageDeliveriesWeight: firstMessageWeight, FirstMessageDeliveriesDecay: scoreDecay(firstDecay * oneEpochDuration()), - FirstMessageDeliveriesCap: 24, - MeshMessageDeliveriesWeight: -37.55, + FirstMessageDeliveriesCap: firstMessageCap, + MeshMessageDeliveriesWeight: meshWeight, MeshMessageDeliveriesDecay: scoreDecay(meshDecay * oneEpochDuration()), - MeshMessageDeliveriesCap: float64(subnetWeight), - MeshMessageDeliveriesThreshold: float64(minimumWeight), + MeshMessageDeliveriesCap: meshCap, + MeshMessageDeliveriesThreshold: meshThreshold, MeshMessageDeliveriesWindow: 2 * time.Second, - MeshMessageDeliveriesActivation: 17 * oneSlotDuration(), - MeshFailurePenaltyWeight: -37.55, + MeshMessageDeliveriesActivation: 1 * oneEpochDuration(), + MeshFailurePenaltyWeight: meshWeight, MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()), - InvalidMessageDeliveriesWeight: -4544, + InvalidMessageDeliveriesWeight: -maxScore() / topicWeight, InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()), - } + }, nil } func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams { return &pubsub.TopicScoreParams{ TopicWeight: attesterSlashingWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: 1 * oneSlotDuration(), - TimeInMeshCap: 300, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), FirstMessageDeliveriesWeight: 36, FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()), FirstMessageDeliveriesCap: 1, @@ -219,9 +277,9 @@ func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams { func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams { return &pubsub.TopicScoreParams{ TopicWeight: proposerSlashingWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: 1 * oneSlotDuration(), - TimeInMeshCap: 300, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), FirstMessageDeliveriesWeight: 36, FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()), FirstMessageDeliveriesCap: 1, @@ -241,9 +299,9 @@ func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams { func defaultVoluntaryExitTopicParams() *pubsub.TopicScoreParams { return &pubsub.TopicScoreParams{ TopicWeight: voluntaryExitWeight, - TimeInMeshWeight: 0.0324, - TimeInMeshQuantum: 1 * oneSlotDuration(), - TimeInMeshCap: 300, + TimeInMeshWeight: maxInMeshScore / inMeshCap(), + TimeInMeshQuantum: inMeshTime(), + TimeInMeshCap: inMeshCap(), FirstMessageDeliveriesWeight: 2, FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()), FirstMessageDeliveriesCap: 5, @@ -268,11 +326,33 @@ func oneEpochDuration() time.Duration { return time.Duration(params.BeaconConfig().SlotsPerEpoch) * oneSlotDuration() } +// determines the decay rate from the provided time period till +// the decayToZero value. Ex: ( 1 -> 0.01) func scoreDecay(totalDurationDecay time.Duration) float64 { numOfTimes := totalDurationDecay / oneSlotDuration() return math.Pow(decayToZero, 1/float64(numOfTimes)) } +// is used to determine the threshold from the decay limit with +// a provided growth rate. This applies the decay rate to a +// computed limit. +func decayThreshold(decayRate, rate float64) (float64, error) { + d, err := decayLimit(decayRate, rate) + if err != nil { + return 0, err + } + return d * decayRate, nil +} + +// decayLimit provides the value till which a decay process will +// limit till provided with an expected growth rate. +func decayLimit(decayRate, rate float64) (float64, error) { + if 1 <= decayRate { + return 0, errors.Errorf("got an invalid decayLimit rate: %f", decayRate) + } + return rate / (1 - decayRate), nil +} + func committeeCountPerSlot(activeValidators uint64) uint64 { // Use a static parameter for now rather than a dynamic one, we can use // the actual parameter later when we have figured out how to fix a circular @@ -286,3 +366,40 @@ func aggregatorsPerSlot(activeValidators uint64) uint64 { totalAggs := comms * params.BeaconConfig().TargetAggregatorsPerCommittee return totalAggs } + +// provides the relevant score by the provided weight and threshold. +func scoreByWeight(weight float64, threshold float64) float64 { + return maxScore() / (weight * threshold * threshold) +} + +// maxScore attainable by a peer. +func maxScore() float64 { + totalWeight := beaconBlockWeight + aggregateWeight + attestationTotalWeight + + attesterSlashingWeight + proposerSlashingWeight + voluntaryExitWeight + return (maxInMeshScore + maxFirstDeliveryScore) * totalWeight +} + +// denotes the unit time in mesh for scoring tallying. +func inMeshTime() time.Duration { + return 1 * oneSlotDuration() +} + +// the cap for `inMesh` time scoring. +func inMeshCap() float64 { + return float64((3600 * time.Second) / inMeshTime()) +} + +func logGossipParameters(topic string, params *pubsub.TopicScoreParams) { + // Exit early in the event the parameter struct is nil. + if params == nil { + return + } + rawParams := reflect.ValueOf(params).Elem() + numOfFields := rawParams.NumField() + + fields := make(logrus.Fields, numOfFields) + for i := 0; i < numOfFields; i++ { + fields[reflect.TypeOf(params).Elem().Field(i).Name] = rawParams.Field(i).Interface() + } + log.WithFields(fields).Debugf("Topic Parameters for %s", topic) +} diff --git a/beacon-chain/p2p/gossip_scoring_params_test.go b/beacon-chain/p2p/gossip_scoring_params_test.go index 706d64948629..f6e51d04f01f 100644 --- a/beacon-chain/p2p/gossip_scoring_params_test.go +++ b/beacon-chain/p2p/gossip_scoring_params_test.go @@ -4,6 +4,7 @@ import ( "context" "testing" + pubsub "github.com/libp2p/go-libp2p-pubsub" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" @@ -60,3 +61,19 @@ func TestCorrect_ActiveValidatorsCount(t *testing.T) { assert.NoError(t, err, "genesis state not retrieved") assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount)+100, int(vals), "mainnet genesis active count isn't accurate") } + +func TestLoggingParameters(t *testing.T) { + logGossipParameters("testing", nil) + logGossipParameters("testing", &pubsub.TopicScoreParams{}) + // Test out actual gossip parameters. + logGossipParameters("testing", defaultBlockTopicParams()) + p, err := defaultAggregateSubnetTopicParams(10000) + assert.NoError(t, err) + logGossipParameters("testing", p) + p, err = defaultAggregateTopicParams(10000) + assert.NoError(t, err) + logGossipParameters("testing", p) + logGossipParameters("testing", defaultAttesterSlashingTopicParams()) + logGossipParameters("testing", defaultProposerSlashingTopicParams()) + logGossipParameters("testing", defaultVoluntaryExitTopicParams()) +} diff --git a/beacon-chain/p2p/parameter_test.go b/beacon-chain/p2p/parameter_test.go index d64d202d9b57..219ae6152624 100644 --- a/beacon-chain/p2p/parameter_test.go +++ b/beacon-chain/p2p/parameter_test.go @@ -2,33 +2,11 @@ package p2p import ( "testing" - "time" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/prysmaticlabs/prysm/shared/testutil/assert" ) -const ( - // overlay parameters - gossipSubD = 8 // topic stable mesh target count - gossipSubDlo = 6 // topic stable mesh low watermark - gossipSubDhi = 12 // topic stable mesh high watermark - - // gossip parameters - gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses - gossipSubMcacheGossip = 3 // number of windows to gossip about - gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs - - // fanout ttl - gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds - - // heartbeat interval - gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds - - // misc - randomSubD = 6 // random gossip target -) - func TestOverlayParameters(t *testing.T) { setPubSubParameters() assert.Equal(t, gossipSubD, pubsub.GossipSubD, "gossipSubD") diff --git a/beacon-chain/p2p/pubsub.go b/beacon-chain/p2p/pubsub.go index 61c210d9cd0f..71cb603406ea 100644 --- a/beacon-chain/p2p/pubsub.go +++ b/beacon-chain/p2p/pubsub.go @@ -15,6 +15,27 @@ import ( "github.com/prysmaticlabs/prysm/shared/params" ) +const ( + // overlay parameters + gossipSubD = 8 // topic stable mesh target count + gossipSubDlo = 6 // topic stable mesh low watermark + gossipSubDhi = 12 // topic stable mesh high watermark + + // gossip parameters + gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses + gossipSubMcacheGossip = 3 // number of windows to gossip about + gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs + + // fanout ttl + gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds + + // heartbeat interval + gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds + + // misc + randomSubD = 6 // random gossip target +) + // JoinTopic will join PubSub topic, if not already joined. func (s *Service) JoinTopic(topic string, opts ...pubsub.TopicOpt) (*pubsub.Topic, error) { s.joinedTopicsLock.Lock() @@ -80,10 +101,12 @@ func (s *Service) SubscribeToTopic(topic string, opts ...pubsub.SubOpt) (*pubsub if err != nil { return nil, err } + if scoringParams != nil { if err := topicHandle.SetScoreParams(scoringParams); err != nil { return nil, err } + logGossipParameters(topic, scoringParams) } } return topicHandle.Subscribe(opts...) @@ -124,13 +147,12 @@ func msgIDFunction(pmsg *pubsub_pb.Message) string { } func setPubSubParameters() { - heartBeatInterval := 700 * time.Millisecond - pubsub.GossipSubDlo = 6 - pubsub.GossipSubD = 8 - pubsub.GossipSubHeartbeatInterval = heartBeatInterval - pubsub.GossipSubHistoryLength = 6 - pubsub.GossipSubHistoryGossip = 3 - pubsub.TimeCacheDuration = 550 * heartBeatInterval + pubsub.GossipSubDlo = gossipSubDlo + pubsub.GossipSubD = gossipSubD + pubsub.GossipSubHeartbeatInterval = gossipSubHeartbeatInterval + pubsub.GossipSubHistoryLength = gossipSubMcacheLen + pubsub.GossipSubHistoryGossip = gossipSubMcacheGossip + pubsub.TimeCacheDuration = 550 * gossipSubHeartbeatInterval // Set a larger gossip history to ensure that slower // messages have a longer time to be propagated. This