Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add topic scoring & reduce pubsub spam #573

Merged
merged 9 commits into from
Aug 14, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ var manifestServeCmd = cli.Command{
&cli.DurationFlag{
Name: "publishInterval",
Usage: "The interval at which manifest is published on pubsub.",
Value: 20 * time.Second,
Value: 2 * pubsub.TimeCacheDuration,
},
},

Expand Down Expand Up @@ -205,7 +205,11 @@ var manifestServeCmd = cli.Command{
return fmt.Errorf("loading initial manifest: %w", err)
}

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down
12 changes: 10 additions & 2 deletions cmd/f3/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

"github.com/filecoin-project/go-f3/cmd/f3/msgdump"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
leveldb "github.com/ipfs/go-ds-leveldb"
logging "github.com/ipfs/go-log/v2"
Expand Down Expand Up @@ -152,7 +153,11 @@ var observerCmd = cli.Command{
// Connect to bootstrappers once as soon as we start.
connectToBootstrappers()

pubSub, err := pubsub.NewGossipSub(c.Context, host, pubsub.WithPeerExchange(true))
pubSub, err := pubsub.NewGossipSub(c.Context, host,
pubsub.WithPeerExchange(true),
pubsub.WithFloodPublish(true),
pubsub.WithPeerScore(PubsubPeerScoreParams, PubsubPeerScoreThresholds),
)
if err != nil {
return fmt.Errorf("initialzing pubsub: %w", err)
}
Expand Down Expand Up @@ -241,10 +246,13 @@ func observeManifest(ctx context.Context, manif *manifest.Manifest, pubSub *pubs
return fmt.Errorf("registering topic validator: %w", err)
}

topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := pubSub.Join(manif.PubSubTopic(), pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn))
if err != nil {
return fmt.Errorf("joining topic: %w", err)
}
if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
return fmt.Errorf("failed to set topic params: %w", err)
}

sub, err := topic.Subscribe()
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions cmd/f3/pusub.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)

func init() {
Expand All @@ -20,3 +21,45 @@ func init() {
pubsub.GossipSubHistoryLength = 10
pubsub.GossipSubGossipFactor = 0.1
}

// Borrowed from lotus
var PubsubPeerScoreParams = &pubsub.PeerScoreParams{
AppSpecificScore: func(p peer.ID) float64 { return 0 },
AppSpecificWeight: 1,

// This sets the IP colocation threshold to 5 peers before we apply penalties
IPColocationFactorThreshold: 5,
IPColocationFactorWeight: -100,
IPColocationFactorWhitelist: nil,

// P7: behavioural penalties, decay after 1hr
BehaviourPenaltyThreshold: 6,
BehaviourPenaltyWeight: -10,
BehaviourPenaltyDecay: pubsub.ScoreParameterDecay(time.Hour),

DecayInterval: pubsub.DefaultDecayInterval,
DecayToZero: pubsub.DefaultDecayToZero,

// this retains non-positive scores for 6 hours
RetainScore: 6 * time.Hour,

// topic parameters
Topics: make(map[string]*pubsub.TopicScoreParams),
}

var PubsubPeerScoreThresholds = &pubsub.PeerScoreThresholds{
GossipThreshold: GossipScoreThreshold,
PublishThreshold: PublishScoreThreshold,
GraylistThreshold: GraylistScoreThreshold,
AcceptPXThreshold: AcceptPXScoreThreshold,
OpportunisticGraftThreshold: OpportunisticGraftScoreThreshold,
}

// Borrowed from lotus
const (
GossipScoreThreshold = -500
PublishScoreThreshold = -1000
GraylistScoreThreshold = -2500
AcceptPXScoreThreshold = 1000
OpportunisticGraftScoreThreshold = 3.5
)
9 changes: 8 additions & 1 deletion f3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"github.com/filecoin-project/go-f3/sim/signing"

Expand All @@ -25,7 +26,13 @@ import (
"golang.org/x/sync/errgroup"
)

const ManifestSenderTimeout = 30 * time.Second
func init() {
// Hash-based deduplication breaks fast rebroadcast, even if we set the time window to be
// really short because gossipsub has a minimum 1m cache scan interval.
psutil.GPBFTMessageIdFn = pubsub.DefaultMsgIdFn
}

var ManifestSenderTimeout = 2 * pubsub.TimeCacheDuration

func TestF3Simple(t *testing.T) {
t.Parallel()
Expand Down
9 changes: 8 additions & 1 deletion host.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"
"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/filecoin-project/go-f3/manifest"
"go.opentelemetry.io/otel/metric"

Expand Down Expand Up @@ -329,11 +330,17 @@ func (h *gpbftRunner) setupPubsub() error {
// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as we need to be able to re-broadcast duplicate messages with
// the same content.
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
topic, err := h.pubsub.Join(pubsubTopicName, pubsub.WithTopicMessageIdFn(psutil.GPBFTMessageIdFn))
if err != nil {
return fmt.Errorf("could not join on pubsub topic: %s: %w", pubsubTopicName, err)
}

if err := topic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("failed to set topic score params", "error", err)
}

h.topic = topic

return nil
}

Expand Down
97 changes: 97 additions & 0 deletions internal/psutil/psutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package psutil

import (
"encoding/binary"
"time"

"golang.org/x/crypto/blake2b"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

var ManifestMessageIdFn = pubsubMsgIdHashDataAndSender
var GPBFTMessageIdFn = pubsubMsgIdHashData

// Generate a pubsub ID from the message topic + data.
func pubsubMsgIdHashData(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")

Check warning on line 20 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L17-L20

Added lines #L17 - L20 were not covered by tests
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)

Check warning on line 25 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L23-L25

Added lines #L23 - L25 were not covered by tests
}
if _, err := hasher.Write(topic); err != nil {
panic(err)

Check warning on line 28 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L27-L28

Added lines #L27 - L28 were not covered by tests
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])

Check warning on line 32 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L31-L32

Added lines #L31 - L32 were not covered by tests
}

// Generate a pubsub ID from the message topic + sender + data.
func pubsubMsgIdHashDataAndSender(m *pubsub_pb.Message) string {
hasher, err := blake2b.New256(nil)
if err != nil {
panic("failed to construct hasher")

Check warning on line 39 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L39

Added line #L39 was not covered by tests
}

topic := []byte(m.GetTopic())
if err := binary.Write(hasher, binary.BigEndian, uint32(len(topic))); err != nil {
panic(err)

Check warning on line 44 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L44

Added line #L44 was not covered by tests
}
if _, err := hasher.Write(topic); err != nil {
panic(err)

Check warning on line 47 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L47

Added line #L47 was not covered by tests
}
if err := binary.Write(hasher, binary.BigEndian, uint32(len(m.From))); err != nil {
panic(err)

Check warning on line 50 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L50

Added line #L50 was not covered by tests
}
if _, err := hasher.Write(m.From); err != nil {
panic(err)

Check warning on line 53 in internal/psutil/psutil.go

View check run for this annotation

Codecov / codecov/patch

internal/psutil/psutil.go#L53

Added line #L53 was not covered by tests
}

hash := blake2b.Sum256(m.Data)
return string(hash[:])
}

// Borrowed from lotus
var PubsubTopicScoreParams = &pubsub.TopicScoreParams{
// expected > 400 msgs/second on average.
//
TopicWeight: 0.1, // max cap is 5, single invalid message is -100

// 1 tick per second, maxes at 1 hour
// XXX
TimeInMeshWeight: 0.0002778, // ~1/3600
TimeInMeshQuantum: time.Second,
TimeInMeshCap: 1,

// NOTE: Gives weight to the peer that tends to deliver first.
// deliveries decay after 10min, cap at 100 tx
FirstMessageDeliveriesWeight: 0.5, // max value is 50
FirstMessageDeliveriesDecay: pubsub.ScoreParameterDecay(10 * time.Minute),
FirstMessageDeliveriesCap: 100, // 100 messages in 10 minutes

// Mesh Delivery Failure is currently turned off for messages
// This is on purpose as the network is still too small, which results in
// asymmetries and potential unmeshing from negative scores.
// // tracks deliveries in the last minute
// // penalty activates at 1 min and expects 2.5 txs
// MeshMessageDeliveriesWeight: -16, // max penalty is -100
// MeshMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Minute),
// MeshMessageDeliveriesCap: 100, // 100 txs in a minute
// MeshMessageDeliveriesThreshold: 2.5, // 60/12/2 txs/minute
// MeshMessageDeliveriesWindow: 10 * time.Millisecond,
// MeshMessageDeliveriesActivation: time.Minute,

// // decays after 5min
// MeshFailurePenaltyWeight: -16,
// MeshFailurePenaltyDecay: pubsub.ScoreParameterDecay(5 * time.Minute),

// invalid messages decay after 1 hour
InvalidMessageDeliveriesWeight: -1000,
InvalidMessageDeliveriesDecay: pubsub.ScoreParameterDecay(time.Hour),
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/filecoin-project/go-f3/internal/psutil"
"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -20,7 +22,7 @@

var _ ManifestProvider = (*DynamicManifestProvider)(nil)

const ManifestPubSubTopicName = "/f3/manifests/0.0.1"
const ManifestPubSubTopicName = "/f3/manifests/0.0.2"

// DynamicManifestProvider is a manifest provider that allows
// the manifest to be changed at runtime.
Expand All @@ -35,6 +37,7 @@

initialManifest *Manifest
manifestChanges chan *Manifest
sequenceNumber atomic.Uint64
}

// ManifestUpdateMessage updates the GPBFT manifest.
Expand Down Expand Up @@ -96,23 +99,24 @@
return err
}

// Force the default (sender + seqno) message de-duplication mechanism instead of hashing
// the message (as lotus does) as validation depends on the sender, not the contents of the
// message.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
// Use the message hash as the message ID to reduce the chances of routing cycles. We ensure
// our rebroadcast interval is greater than our cache timeout.
manifestTopic, err := m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn))
if err != nil {
return fmt.Errorf("could not join manifest pubsub topic: %w", err)
}

if err := manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("failed to set topic score params", "error", err)
}

manifestSub, err := manifestTopic.Subscribe()
if err != nil {
return fmt.Errorf("subscribing to manifest pubsub topic: %w", err)
}

var msgSeqNumber uint64
var currentManifest *Manifest
if mBytes, err := m.ds.Get(startCtx, latestManifestKey); errors.Is(err, datastore.ErrNotFound) {
msgSeqNumber = 0
currentManifest = m.initialManifest
} else if err != nil {
return fmt.Errorf("error while checking saved manifest")
Expand All @@ -123,7 +127,7 @@
return fmt.Errorf("decoding saved manifest: %w", err)
}

msgSeqNumber = update.MessageSequence
m.sequenceNumber.Store(update.MessageSequence)
currentManifest = &update.Manifest
}

Expand Down Expand Up @@ -164,10 +168,13 @@
continue
}

if update.MessageSequence <= msgSeqNumber {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", msgSeqNumber)
oldSeq := m.sequenceNumber.Load()

if update.MessageSequence <= oldSeq {
log.Debugw("discarded manifest update", "newSeqNo", update.MessageSequence, "oldSeqNo", oldSeq)

Check warning on line 174 in manifest/dynamic_manifest_provider.go

View check run for this annotation

Codecov / codecov/patch

manifest/dynamic_manifest_provider.go#L174

Added line #L174 was not covered by tests
continue
}
m.sequenceNumber.Store(update.MessageSequence)

if err := update.Manifest.Validate(); err != nil {
log.Errorw("received invalid manifest, discarded", "error", err)
Expand All @@ -180,7 +187,6 @@
}

log.Infow("received manifest update", "seqNo", update.MessageSequence)
msgSeqNumber = update.MessageSequence

oldManifest := currentManifest
manifestCopy := update.Manifest
Expand Down Expand Up @@ -227,6 +233,11 @@
return pubsub.ValidationReject
}

// Only allow the latest sequence number through.
if update.MessageSequence < m.sequenceNumber.Load() {
return pubsub.ValidationIgnore

Check warning on line 238 in manifest/dynamic_manifest_provider.go

View check run for this annotation

Codecov / codecov/patch

manifest/dynamic_manifest_provider.go#L238

Added line #L238 was not covered by tests
}

// TODO: Any additional validation?
// Expect a sequence number that is over our current sequence number.
// Expect an BootstrapEpoch over the BootstrapEpoch of the current manifests?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

"github.com/filecoin-project/go-f3/internal/clock"
"github.com/filecoin-project/go-f3/internal/psutil"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -43,10 +45,13 @@ func NewManifestSender(ctx context.Context, h host.Host, ps *pubsub.PubSub, firs
}

var err error
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(pubsub.DefaultMsgIdFn))
m.manifestTopic, err = m.pubsub.Join(ManifestPubSubTopicName, pubsub.WithTopicMessageIdFn(psutil.ManifestMessageIdFn))
if err != nil {
return nil, fmt.Errorf("could not join on pubsub topic: %s: %w", ManifestPubSubTopicName, err)
}
if err := m.manifestTopic.SetScoreParams(psutil.PubsubTopicScoreParams); err != nil {
log.Infow("could not set topic score params", "error", err)
}

// Record one-off attributes about the sender for easier runtime debugging.
metrics.senderInfo.Record(ctx, 1, metric.WithAttributes(
Expand Down
2 changes: 1 addition & 1 deletion manifest/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (m *Manifest) PubSubTopic() string {
}

func PubSubTopicFromNetworkName(nn gpbft.NetworkName) string {
return "/f3/granite/0.0.1/" + string(nn)
return "/f3/granite/0.0.2/" + string(nn)
}

func (m *Manifest) GpbftOptions() []gpbft.Option {
Expand Down
Loading
Loading