Skip to content

Commit

Permalink
Add topic scoring & reduce pubsub spam
Browse files Browse the repository at this point in the history
1. Add topic scoring to both the GPBFT topic and the manifest topic.
2. Hash the data + topic for GPBFT messages.
3. Hash the data + sender + topic for manifest sever messages. We need
to include the sender because we reject messages from invalid senders.
4. In the manifest validator, drop messages with old sequence numbers.
  • Loading branch information
Stebalien committed Aug 14, 2024
1 parent 4db84c3 commit 544544d
Show file tree
Hide file tree
Showing 8 changed files with 197 additions and 17 deletions.
8 changes: 6 additions & 2 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ var manifestServeCmd = cli.Command{
&cli.DurationFlag{
Name: "publishInterval",
Usage: "The interval at which manifest is published on pubsub.",
Value: 20 * time.Second,
Value: 2 * pubsub.TimeCacheDuration,
},
},

Expand Down Expand Up @@ -205,7 +205,11 @@ var manifestServeCmd = cli.Command{
return fmt.Errorf("loading initial manifest: %w", err)
}

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/f3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/filecoin-project/go-f3/cmd/f3/msgdump"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -152,7 +153,11 @@ var observerCmd = cli.Command{
// Connect to bootstrappers once as soon as we start.
connectToBootstrappers()

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down Expand Up @@ -241,10 +246,13 @@ func observeManifest(ctx context.Context, manif *manifest.Manifest, pubSub *pubs
return fmt.Errorf("registering topic validator: %w", err)
}

topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(psutil.PubsubMsgIdHashData))
if err != nil {
return fmt.Errorf("joining topic: %w", err)
}
if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
return fmt.Errorf("failed to set topic params: %w", err)
}

sub, err := topic.Subscribe()
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions cmd/f3/pusub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)

func init() {
Expand All @@ -20,3 +21,45 @@ func init() {
pubsub.GossipSubHistoryLength = 10
pubsub.GossipSubGossipFactor = 0.1
}

// Borrowed from lotus
var PubsubPeerScoreParams = &pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { return 0 },
AppSpecificWeight: 1,

// This sets the IP colocation threshold to 5 peers before we apply penalties
IPColocationFactorThreshold: 5,
IPColocationFactorWeight: -100,
IPColocationFactorWhitelist: nil,

// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),

DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,

// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,

// topic parameters
Topics: make(map[string]*pubsub.TopicScoreParams),
}

var PubsubPeerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: GossipScoreThreshold,
PublishThreshold: PublishScoreThreshold,
GraylistThreshold: GraylistScoreThreshold,
AcceptPXThreshold: AcceptPXScoreThreshold,
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
}

// Borrowed from lotus
const (
GossipScoreThreshold = -500
PublishScoreThreshold = -1000
GraylistScoreThreshold = -2500
AcceptPXScoreThreshold = 1000
OpportunisticGraftScoreThreshold = 3.5
)
2 changes: 1 addition & 1 deletion f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"golang.org/x/sync/errgroup"
)

const ManifestSenderTimeout = 30 * time.Second
var ManifestSenderTimeout = 2 * pubsub.TimeCacheDuration

func TestF3Simple(t *testing.T) {
t.Parallel()
Expand Down
9 changes: 8 additions & 1 deletion host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"go.opentelemetry.io/otel/metric"

Expand Down Expand Up @@ -329,11 +330,17 @@ func (h *gpbftRunner) setupPubsub() error {
// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as we need to be able to re-broadcast duplicate messages with
// the same content.
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(psutil.PubsubMsgIdHashData))
if err != nil {
return fmt.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err)
}

if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Error("failed to set topic score params", "error", err)
}

h.topic = topic

return nil
}

Expand Down
94 changes: 94 additions & 0 deletions internal/psutil/psutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package psutil

import (
"encoding/binary"
"time"

"golang.org/x/crypto/blake2b"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

// Generate a pubsub ID from the message topic + data.
func PubsubMsgIdHashData(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)
}
if _, err := hasher.Write(topic); err != nil {
panic(err)
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])
}

// Generate a pubsub ID from the message topic + sender + data.
func PubsubMsgIdHashDataAndSender(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)
}
if _, err := hasher.Write(topic); err != nil {
panic(err)
}
if err := binary.Write(hasher, binary.BigEndian, uint32(len(m.From))); err != nil {
panic(err)
}
if _, err := hasher.Write(m.From); err != nil {
panic(err)
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])
}

// Borrowed from lotus
var PubsubTopicScoreParams = &pubsub.TopicScoreParams{
// expected > 400 msgs/second on average.
//
TopicWeight: 0.1, // max cap is 5, single invalid message is -100

// 1 tick per second, maxes at 1 hour
// XXX
TimeInMeshWeight: 0.0002778, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,

// NOTE: Gives weight to the peer that tends to deliver first.
// deliveries decay after 10min, cap at 100 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 50
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes

// Mesh Delivery Failure is currently turned off for messages
// This is on purpose as the network is still too small, which results in
// asymmetries and potential unmeshing from negative scores.
// // tracks deliveries in the last minute
// // penalty activates at 1 min and expects 2.5 txs
// MeshMessageDeliveriesWeight: -16, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 100, // 100 txs in a minute
// MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,

// // decays after 5min
// MeshFailurePenaltyWeight: -16,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),

// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand Down Expand Up @@ -35,6 +37,7 @@ type DynamicManifestProvider struct {

initialManifest *Manifest
manifestChanges chan *Manifest
sequenceNumber atomic.Uint64
}

// ManifestUpdateMessage updates the GPBFT manifest.
Expand Down Expand Up @@ -96,23 +99,24 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
return err
}

// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as validation depends on the sender, not the contents of the
// message.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
// Use the message hash as the message ID to reduce the chances of routing cycles. We ensure
// our rebroadcast interval is greater than our cache timeout.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.PubsubMsgIdHashDataAndSender))
if err != nil {
return fmt.Errorf("could not join manifest pubsub topic: %w", err)
}

if err := manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Error("failed to set topic score params", "error", err)
}

manifestSub, err := manifestTopic.Subscribe()
if err != nil {
return fmt.Errorf("subscribing to manifest pubsub topic: %w", err)
}

var msgSeqNumber uint64
var currentManifest *Manifest
if mBytes, err := m.ds.Get(startCtx, latestManifestKey); errors.Is(err, datastore.ErrNotFound) {
msgSeqNumber = 0
currentManifest = m.initialManifest
} else if err != nil {
return fmt.Errorf("error while checking saved manifest")
Expand All @@ -123,7 +127,7 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
return fmt.Errorf("decoding saved manifest: %w", err)
}

msgSeqNumber = update.MessageSequence
m.sequenceNumber.Store(update.MessageSequence)
currentManifest = &update.Manifest
}

Expand Down Expand Up @@ -164,10 +168,13 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
continue
}

if update.MessageSequence <= msgSeqNumber {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", msgSeqNumber)
oldSeq := m.sequenceNumber.Load()

if update.MessageSequence <= oldSeq {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", oldSeq)
continue
}
m.sequenceNumber.Store(update.MessageSequence)

if err := update.Manifest.Validate(); err != nil {
log.Errorw("received invalid manifest, discarded", "error", err)
Expand All @@ -180,7 +187,6 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) error {
}

log.Infow("received manifest update", "seqNo", update.MessageSequence)
msgSeqNumber = update.MessageSequence

oldManifest := currentManifest
manifestCopy := update.Manifest
Expand Down Expand Up @@ -227,6 +233,11 @@ func (m *DynamicManifestProvider) registerTopicValidator() error {
return pubsub.ValidationReject
}

// Only allow the latest sequence number through.
if update.MessageSequence < m.sequenceNumber.Load() {
return pubsub.ValidationIgnore
}

// TODO: Any additional validation?
// Expect a sequence number that is over our current sequence number.
// Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -30,6 +32,14 @@ type ManifestSender struct {
}

func NewManifestSender(ctx context.Context, h host.Host, ps *pubsub.PubSub, firstManifest *Manifest, publishInterval time.Duration) (*ManifestSender, error) {
// The rebroadcast interval must be larger than the time cache duration. Default to 2x just in case.
if minInterval := 2 * pubsub.TimeCacheDuration; publishInterval < minInterval {
log.Warnf("manifest sender publish interval is too short (%s), increasing to 2x the time-cache duration %s",
publishInterval, minInterval,
)
publishInterval = minInterval
}

clk := clock.GetClock(ctx)
m := &ManifestSender{
manifest: firstManifest,
Expand All @@ -43,10 +53,13 @@ func NewManifestSender(ctx context.Context, h host.Host, ps *pubsub.PubSub, firs
}

var err error
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.PubsubMsgIdHashDataAndSender))
if err != nil {
return nil, fmt.Errorf("could not join on pubsub topic: %s: %w", ManifestPubSubTopicName, err)
}
if err := m.manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
return nil, fmt.Errorf("could not join set topic params: %s: %w", ManifestPubSubTopicName, err)
}

// Record one-off attributes about the sender for easier runtime debugging.
metrics.senderInfo.Record(ctx, 1, metric.WithAttributes(
Expand Down

0 comments on commit 544544d

Please sign in to comment.