Skip to content

Commit

Permalink
Update Gossip Parameters (#8683)
Browse files Browse the repository at this point in the history
* add in more accurate aggregate parameters

* add more param changes

* more cleanup

* fix order of operations

* comments

* remove redundant declaration

* clean up better

* fix up

* victor's review

* disable mesh scoring

* disable mesh scoring

Co-authored-by: Raul Jordan <raul@prysmaticlabs.com>
Co-authored-by: Victor Farazdagi <simple.square@gmail.com>
  • Loading branch information
3 people authored Apr 2, 2021
1 parent 8784390 commit af3d3e8
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 77 deletions.
213 changes: 165 additions & 48 deletions beacon-chain/p2p/gossip_scoring_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"math"
"reflect"
"strings"
"time"

Expand All @@ -10,6 +11,7 @@ import (
"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)

const (
Expand All @@ -32,9 +34,22 @@ const (
// our voluntary exit topic.
voluntaryExitWeight = 0.05

// maxInMeshScore describes the max score a peer can attain from being in the mesh.
maxInMeshScore = 10
// maxFirstDeliveryScore describes the max score a peer can obtain from first deliveries.
maxFirstDeliveryScore = 40

// decayToZero specifies the terminal value that we will use when decaying
// a value.
decayToZero = 0.01

// dampeningFactor reduces the amount by which the various thresholds and caps are created.
dampeningFactor = 90
)

var (
// a bool to check if we enable scoring for messages in the mesh sent for near first deliveries.
meshDeliveryIsScored = false
)

func peerScoringParams() (*pubsub.PeerScoreParams, *pubsub.PeerScoreThresholds) {
Expand Down Expand Up @@ -74,9 +89,9 @@ func (s *Service) topicScoreParams(topic string) (*pubsub.TopicScoreParams, erro
case strings.Contains(topic, "beacon_block"):
return defaultBlockTopicParams(), nil
case strings.Contains(topic, "beacon_aggregate_and_proof"):
return defaultAggregateTopicParams(activeValidators), nil
return defaultAggregateTopicParams(activeValidators)
case strings.Contains(topic, "beacon_attestation"):
return defaultAggregateSubnetTopicParams(activeValidators), nil
return defaultAggregateSubnetTopicParams(activeValidators)
case strings.Contains(topic, "voluntary_exit"):
return defaultVoluntaryExitTopicParams(), nil
case strings.Contains(topic, "proposer_slashing"):
Expand Down Expand Up @@ -110,96 +125,139 @@ func (s *Service) retrieveActiveValidators() (uint64, error) {
return helpers.ActiveValidatorCount(bState, helpers.CurrentEpoch(bState))
}

// Based on Ben's tested parameters for lighthouse.
// Based on the lighthouse parameters.
// https://gist.github.com/blacktemplar/5c1862cb3f0e32a1a7fb0b25e79e6e2c

func defaultBlockTopicParams() *pubsub.TopicScoreParams {
decayEpoch := time.Duration(5)
blocksPerEpoch := uint64(params.BeaconConfig().SlotsPerEpoch)
meshWeight := -0.717
if !meshDeliveryIsScored {
// Set the mesh weight as zero as a temporary measure, so as to prevent
// the average nodes from being penalised.
meshWeight = 0
}
return &pubsub.TopicScoreParams{
TopicWeight: beaconBlockWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 1,
FirstMessageDeliveriesDecay: scoreDecay(20 * oneEpochDuration()),
FirstMessageDeliveriesCap: 23,
MeshMessageDeliveriesWeight: -0.717,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(decayEpoch * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(blocksPerEpoch * uint64(decayEpoch)),
MeshMessageDeliveriesThreshold: float64(blocksPerEpoch*uint64(decayEpoch)) / 10,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 4 * oneEpochDuration(),
MeshFailurePenaltyWeight: -0.717,
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(decayEpoch * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}

func defaultAggregateTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
aggPerEpoch := aggregatorsPerSlot(activeValidators) * uint64(params.BeaconConfig().SlotsPerEpoch)
func defaultAggregateTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) {
// Determine the expected message rate for the particular gossip topic.
aggPerSlot := aggregatorsPerSlot(activeValidators)
firstMessageCap, err := decayLimit(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot*2/gossipSubD))
if err != nil {
return nil, err
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
meshThreshold, err := decayThreshold(scoreDecay(1*oneEpochDuration()), float64(aggPerSlot)/dampeningFactor)
if err != nil {
return nil, err
}
meshWeight := -scoreByWeight(aggregateWeight, meshThreshold)
meshCap := 4 * meshThreshold
if !meshDeliveryIsScored {
// Set the mesh weight as zero as a temporary measure, so as to prevent
// the average nodes from being penalised.
meshWeight = 0
}
return &pubsub.TopicScoreParams{
TopicWeight: aggregateWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
FirstMessageDeliveriesWeight: 0.128,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: firstMessageWeight,
FirstMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
FirstMessageDeliveriesCap: 179,
MeshMessageDeliveriesWeight: -0.064,
FirstMessageDeliveriesCap: firstMessageCap,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(1 * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(aggPerEpoch),
MeshMessageDeliveriesThreshold: float64(aggPerEpoch / 50),
MeshMessageDeliveriesCap: meshCap,
MeshMessageDeliveriesThreshold: meshThreshold,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 32 * oneSlotDuration(),
MeshFailurePenaltyWeight: -0.064,
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(1 * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -140.4475,
InvalidMessageDeliveriesWeight: -maxScore() / aggregateWeight,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}, nil
}

func defaultAggregateSubnetTopicParams(activeValidators uint64) *pubsub.TopicScoreParams {
topicWeight := attestationTotalWeight / float64(params.BeaconNetworkConfig().AttestationSubnetCount)
subnetWeight := activeValidators / params.BeaconNetworkConfig().AttestationSubnetCount
minimumWeight := subnetWeight / 50
func defaultAggregateSubnetTopicParams(activeValidators uint64) (*pubsub.TopicScoreParams, error) {
subnetCount := params.BeaconNetworkConfig().AttestationSubnetCount
// Get weight for each specific subnet.
topicWeight := attestationTotalWeight / float64(subnetCount)
subnetWeight := activeValidators / subnetCount
// Determine the amount of validators expected in a subnet in a single slot.
numPerSlot := time.Duration(subnetWeight / uint64(params.BeaconConfig().SlotsPerEpoch))
comsPerSlot := committeeCountPerSlot(activeValidators)
exceedsThreshold := comsPerSlot >= 2*params.BeaconNetworkConfig().AttestationSubnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
exceedsThreshold := comsPerSlot >= 2*subnetCount/uint64(params.BeaconConfig().SlotsPerEpoch)
firstDecay := time.Duration(1)
meshDecay := time.Duration(4)
if exceedsThreshold {
firstDecay = 4
meshDecay = 16
}
// Determine expected first deliveries based on the message rate.
firstMessageCap, err := decayLimit(scoreDecay(firstDecay*oneEpochDuration()), float64(numPerSlot*2/gossipSubD))
if err != nil {
return nil, err
}
firstMessageWeight := maxFirstDeliveryScore / firstMessageCap
// Determine expected mesh deliveries based on message rate applied with a dampening factor.
meshThreshold, err := decayThreshold(scoreDecay(meshDecay*oneEpochDuration()), float64(numPerSlot)/dampeningFactor)
if err != nil {
return nil, err
}
meshWeight := -scoreByWeight(topicWeight, meshThreshold)
meshCap := 4 * meshThreshold
if !meshDeliveryIsScored {
// Set the mesh weight as zero as a temporary measure, so as to prevent
// the average nodes from being penalised.
meshWeight = 0
}
return &pubsub.TopicScoreParams{
TopicWeight: topicWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: numPerSlot,
TimeInMeshCap: 300,
FirstMessageDeliveriesWeight: 0.955,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: firstMessageWeight,
FirstMessageDeliveriesDecay: scoreDecay(firstDecay * oneEpochDuration()),
FirstMessageDeliveriesCap: 24,
MeshMessageDeliveriesWeight: -37.55,
FirstMessageDeliveriesCap: firstMessageCap,
MeshMessageDeliveriesWeight: meshWeight,
MeshMessageDeliveriesDecay: scoreDecay(meshDecay * oneEpochDuration()),
MeshMessageDeliveriesCap: float64(subnetWeight),
MeshMessageDeliveriesThreshold: float64(minimumWeight),
MeshMessageDeliveriesCap: meshCap,
MeshMessageDeliveriesThreshold: meshThreshold,
MeshMessageDeliveriesWindow: 2 * time.Second,
MeshMessageDeliveriesActivation: 17 * oneSlotDuration(),
MeshFailurePenaltyWeight: -37.55,
MeshMessageDeliveriesActivation: 1 * oneEpochDuration(),
MeshFailurePenaltyWeight: meshWeight,
MeshFailurePenaltyDecay: scoreDecay(meshDecay * oneEpochDuration()),
InvalidMessageDeliveriesWeight: -4544,
InvalidMessageDeliveriesWeight: -maxScore() / topicWeight,
InvalidMessageDeliveriesDecay: scoreDecay(50 * oneEpochDuration()),
}
}, nil
}

func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: attesterSlashingWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 36,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesCap: 1,
Expand All @@ -219,9 +277,9 @@ func defaultAttesterSlashingTopicParams() *pubsub.TopicScoreParams {
func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: proposerSlashingWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 36,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesCap: 1,
Expand All @@ -241,9 +299,9 @@ func defaultProposerSlashingTopicParams() *pubsub.TopicScoreParams {
func defaultVoluntaryExitTopicParams() *pubsub.TopicScoreParams {
return &pubsub.TopicScoreParams{
TopicWeight: voluntaryExitWeight,
TimeInMeshWeight: 0.0324,
TimeInMeshQuantum: 1 * oneSlotDuration(),
TimeInMeshCap: 300,
TimeInMeshWeight: maxInMeshScore / inMeshCap(),
TimeInMeshQuantum: inMeshTime(),
TimeInMeshCap: inMeshCap(),
FirstMessageDeliveriesWeight: 2,
FirstMessageDeliveriesDecay: scoreDecay(100 * oneEpochDuration()),
FirstMessageDeliveriesCap: 5,
Expand All @@ -268,11 +326,33 @@ func oneEpochDuration() time.Duration {
return time.Duration(params.BeaconConfig().SlotsPerEpoch) * oneSlotDuration()
}

// determines the decay rate from the provided time period till
// the decayToZero value. Ex: ( 1 -> 0.01)
func scoreDecay(totalDurationDecay time.Duration) float64 {
numOfTimes := totalDurationDecay / oneSlotDuration()
return math.Pow(decayToZero, 1/float64(numOfTimes))
}

// is used to determine the threshold from the decay limit with
// a provided growth rate. This applies the decay rate to a
// computed limit.
func decayThreshold(decayRate, rate float64) (float64, error) {
d, err := decayLimit(decayRate, rate)
if err != nil {
return 0, err
}
return d * decayRate, nil
}

// decayLimit provides the value till which a decay process will
// limit till provided with an expected growth rate.
func decayLimit(decayRate, rate float64) (float64, error) {
if 1 <= decayRate {
return 0, errors.Errorf("got an invalid decayLimit rate: %f", decayRate)
}
return rate / (1 - decayRate), nil
}

func committeeCountPerSlot(activeValidators uint64) uint64 {
// Use a static parameter for now rather than a dynamic one, we can use
// the actual parameter later when we have figured out how to fix a circular
Expand All @@ -286,3 +366,40 @@ func aggregatorsPerSlot(activeValidators uint64) uint64 {
totalAggs := comms * params.BeaconConfig().TargetAggregatorsPerCommittee
return totalAggs
}

// provides the relevant score by the provided weight and threshold.
func scoreByWeight(weight float64, threshold float64) float64 {
return maxScore() / (weight * threshold * threshold)
}

// maxScore attainable by a peer.
func maxScore() float64 {
totalWeight := beaconBlockWeight + aggregateWeight + attestationTotalWeight +
attesterSlashingWeight + proposerSlashingWeight + voluntaryExitWeight
return (maxInMeshScore + maxFirstDeliveryScore) * totalWeight
}

// denotes the unit time in mesh for scoring tallying.
func inMeshTime() time.Duration {
return 1 * oneSlotDuration()
}

// the cap for `inMesh` time scoring.
func inMeshCap() float64 {
return float64((3600 * time.Second) / inMeshTime())
}

func logGossipParameters(topic string, params *pubsub.TopicScoreParams) {
// Exit early in the event the parameter struct is nil.
if params == nil {
return
}
rawParams := reflect.ValueOf(params).Elem()
numOfFields := rawParams.NumField()

fields := make(logrus.Fields, numOfFields)
for i := 0; i < numOfFields; i++ {
fields[reflect.TypeOf(params).Elem().Field(i).Name] = rawParams.Field(i).Interface()
}
log.WithFields(fields).Debugf("Topic Parameters for %s", topic)
}
17 changes: 17 additions & 0 deletions beacon-chain/p2p/gossip_scoring_params_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"testing"

pubsub "github.com/libp2p/go-libp2p-pubsub"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
dbutil "github.com/prysmaticlabs/prysm/beacon-chain/db/testing"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
Expand Down Expand Up @@ -60,3 +61,19 @@ func TestCorrect_ActiveValidatorsCount(t *testing.T) {
assert.NoError(t, err, "genesis state not retrieved")
assert.Equal(t, int(params.BeaconConfig().MinGenesisActiveValidatorCount)+100, int(vals), "mainnet genesis active count isn't accurate")
}

func TestLoggingParameters(t *testing.T) {
logGossipParameters("testing", nil)
logGossipParameters("testing", &pubsub.TopicScoreParams{})
// Test out actual gossip parameters.
logGossipParameters("testing", defaultBlockTopicParams())
p, err := defaultAggregateSubnetTopicParams(10000)
assert.NoError(t, err)
logGossipParameters("testing", p)
p, err = defaultAggregateTopicParams(10000)
assert.NoError(t, err)
logGossipParameters("testing", p)
logGossipParameters("testing", defaultAttesterSlashingTopicParams())
logGossipParameters("testing", defaultProposerSlashingTopicParams())
logGossipParameters("testing", defaultVoluntaryExitTopicParams())
}
22 changes: 0 additions & 22 deletions beacon-chain/p2p/parameter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,11 @@ package p2p

import (
"testing"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
)

const (
// overlay parameters
gossipSubD = 8 // topic stable mesh target count
gossipSubDlo = 6 // topic stable mesh low watermark
gossipSubDhi = 12 // topic stable mesh high watermark

// gossip parameters
gossipSubMcacheLen = 6 // number of windows to retain full messages in cache for `IWANT` responses
gossipSubMcacheGossip = 3 // number of windows to gossip about
gossipSubSeenTTL = 550 // number of heartbeat intervals to retain message IDs

// fanout ttl
gossipSubFanoutTTL = 60000000000 // TTL for fanout maps for topics we are not subscribed to but have published to, in nano seconds

// heartbeat interval
gossipSubHeartbeatInterval = 700 * time.Millisecond // frequency of heartbeat, milliseconds

// misc
randomSubD = 6 // random gossip target
)

func TestOverlayParameters(t *testing.T) {
setPubSubParameters()
assert.Equal(t, gossipSubD, pubsub.GossipSubD, "gossipSubD")
Expand Down
Loading

0 comments on commit af3d3e8

Please sign in to comment.