From c6439f2d55dcc4baf5380c6a58f7a25a1c110f85 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 26 Nov 2025 12:41:52 -0500 Subject: [PATCH 01/15] Skip incrementing bloom count after noop Add --- network/ip_tracker.go | 7 ++++--- network/p2p/gossip/bloom.go | 5 +++-- utils/bloom/filter.go | 16 +++++++++++++--- utils/bloom/filter_test.go | 15 ++++++++++++++- utils/bloom/hasher.go | 4 ++-- 5 files changed, 36 insertions(+), 11 deletions(-) diff --git a/network/ip_tracker.go b/network/ip_tracker.go index a660fabf25de..6dfd25a831fc 100644 --- a/network/ip_tracker.go +++ b/network/ip_tracker.go @@ -552,9 +552,10 @@ func (i *ipTracker) updateMostRecentTrackedIP(node *trackedNode, ip *ips.Claimed return } - i.bloomAdditions[ip.NodeID] = oldCount + 1 - bloom.Add(i.bloom, ip.GossipID[:], i.bloomSalt) - i.bloomMetrics.Count.Inc() + if bloom.Add(i.bloom, ip.GossipID[:], i.bloomSalt) { + i.bloomAdditions[ip.NodeID] = oldCount + 1 + i.bloomMetrics.Count.Inc() + } } // ResetBloom prunes the current bloom filter. This must be called periodically diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 9c05c8db78e0..c46e0776215c 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -62,8 +62,9 @@ type BloomFilter struct { func (b *BloomFilter) Add(gossipable Gossipable) { h := gossipable.GossipID() - bloom.Add(b.bloom, h[:], b.salt[:]) - b.metrics.Count.Inc() + if bloom.Add(b.bloom, h[:], b.salt[:]) { + b.metrics.Count.Inc() + } } func (b *BloomFilter) Has(gossipable Gossipable) bool { diff --git a/utils/bloom/filter.go b/utils/bloom/filter.go index d139caa52d69..487db11edbe6 100644 --- a/utils/bloom/filter.go +++ b/utils/bloom/filter.go @@ -59,19 +59,29 @@ func New(numHashes, numEntries int) (*Filter, error) { }, nil } -func (f *Filter) Add(hash uint64) { +// Add adds the provided hash to the bloom filter. It returns true if the hash +// was not already present in the bloom filter. +func (f *Filter) Add(hash uint64) bool { f.lock.Lock() defer f.lock.Unlock() - _ = 1 % f.numBits // hint to the compiler that numBits is not 0 + var ( + _ = 1 % f.numBits // hint to the compiler that numBits is not 0 + accumulator byte = 1 + ) for _, seed := range f.hashSeeds { hash = bits.RotateLeft64(hash, hashRotation) ^ seed index := hash % f.numBits byteIndex := index / bitsPerByte bitIndex := index % bitsPerByte + accumulator &= f.entries[byteIndex] >> bitIndex f.entries[byteIndex] |= 1 << bitIndex } - f.count++ + added := accumulator == 0 + if added { + f.count++ + } + return added } // Count returns the number of elements that have been added to the bloom diff --git a/utils/bloom/filter_test.go b/utils/bloom/filter_test.go index f8816e2cb89d..44937f66b549 100644 --- a/utils/bloom/filter_test.go +++ b/utils/bloom/filter_test.go @@ -42,6 +42,19 @@ func TestNewErrors(t *testing.T) { } } +func TestAdd(t *testing.T) { + require := require.New(t) + + initialNumHashes, initialNumBytes := OptimalParameters(1024, 0.01) + filter, err := New(initialNumHashes, initialNumBytes) + require.NoError(err) + + require.True(filter.Add(1)) + require.Equal(1, filter.Count()) + require.False(filter.Add(1)) + require.Equal(1, filter.Count()) +} + func TestNormalUsage(t *testing.T) { require := require.New(t) @@ -61,7 +74,7 @@ func TestNormalUsage(t *testing.T) { } } - require.Equal(len(toAdd), filter.Count()) + require.LessOrEqual(filter.Count(), len(toAdd)) filterBytes := filter.Marshal() parsedFilter, err := Parse(filterBytes) diff --git a/utils/bloom/hasher.go b/utils/bloom/hasher.go index 7b6441472c92..fca81a372b14 100644 --- a/utils/bloom/hasher.go +++ b/utils/bloom/hasher.go @@ -8,8 +8,8 @@ import ( "encoding/binary" ) -func Add(f *Filter, key, salt []byte) { - f.Add(Hash(key, salt)) +func Add(f *Filter, key, salt []byte) bool { + return f.Add(Hash(key, salt)) } func Contains(c Checker, key, salt []byte) bool { From 5c296849a3429c79afcddaa0b1c929b9dc03b225 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 11:18:04 -0500 Subject: [PATCH 02/15] Implement gossip.SetWithBloomFilter for usage in the sae vm --- .../plugin/evm/atomic/txpool/mempool.go | 9 +- .../plugin/evm/atomic/vm/tx_gossip_test.go | 37 ++-- graft/coreth/plugin/evm/eth_gossiper.go | 13 +- graft/coreth/plugin/evm/gossip/handler.go | 2 +- graft/coreth/plugin/evm/tx_gossip_test.go | 37 ++-- network/p2p/gossip/bloom.go | 13 +- network/p2p/gossip/bloom_test.go | 14 +- network/p2p/gossip/gossip.go | 41 ++++- network/p2p/gossip/gossip_test.go | 77 ++++----- network/p2p/gossip/gossipable.go | 31 ---- network/p2p/gossip/handler.go | 13 +- network/p2p/gossip/set.go | 159 ++++++++++++++++++ network/p2p/gossip/set_test.go | 130 ++++++++++++++ vms/avm/network/gossip.go | 5 +- vms/platformvm/network/gossip.go | 5 +- 15 files changed, 432 insertions(+), 154 deletions(-) delete mode 100644 network/p2p/gossip/gossipable.go create mode 100644 network/p2p/gossip/set.go create mode 100644 network/p2p/gossip/set_test.go diff --git a/graft/coreth/plugin/evm/atomic/txpool/mempool.go b/graft/coreth/plugin/evm/atomic/txpool/mempool.go index a28cbdd6f8bd..3674aaa86e2d 100644 --- a/graft/coreth/plugin/evm/atomic/txpool/mempool.go +++ b/graft/coreth/plugin/evm/atomic/txpool/mempool.go @@ -15,10 +15,13 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/bloom" ) var ( - _ gossip.Set[*atomic.Tx] = (*Mempool)(nil) + _ gossip.HandlerSet[*atomic.Tx] = (*Mempool)(nil) + _ gossip.PullGossiperSet[*atomic.Tx] = (*Mempool)(nil) + _ gossip.PushGossiperSet = (*Mempool)(nil) ErrAlreadyKnown = errors.New("already known") ErrConflict = errors.New("conflict present") @@ -298,9 +301,9 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error { return nil } -func (m *Mempool) GetFilter() ([]byte, []byte) { +func (m *Mempool) BloomFilter() (*bloom.Filter, ids.ID) { m.lock.RLock() defer m.lock.RUnlock() - return m.bloom.Marshal() + return m.bloom.BloomFilter() } diff --git a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go index 05569bb87738..c0cb3026cf90 100644 --- a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go @@ -16,7 +16,6 @@ import ( "github.com/ava-labs/avalanchego/database/memdb" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest" "github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest" "github.com/ava-labs/avalanchego/ids" @@ -27,6 +26,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -113,31 +113,20 @@ func TestAtomicTxGossip(t *testing.T) { } // Ask the VM for any new transactions. We should get nothing at first. - emptyBloomFilter, err := gossip.NewBloomFilter( - prometheus.NewRegistry(), - "", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, + requestBytes, err := gossip.MarshalAppRequest( + bloom.EmptyFilter.Marshal(), + agoUtils.RandomBytes(32), ) require.NoError(err) - emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal() - request := &sdk.PullGossipRequest{ - Filter: emptyBloomFilterBytes, - Salt: agoUtils.RandomBytes(32), - } - - requestBytes, err := proto.Marshal(request) - require.NoError(err) wg := &sync.WaitGroup{} wg.Add(1) onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Empty(response.Gossip) + responseGossip, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) + require.Empty(responseGossip) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse)) @@ -166,18 +155,14 @@ func TestAtomicTxGossip(t *testing.T) { // Ask the VM for new transactions. We should get the newly issued tx. wg.Add(1) - marshaller := atomic.TxMarshaller{} onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Len(response.Gossip, 1) - - gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0]) + responseGossip, err := gossip.ParseAppResponse(responseBytes) require.NoError(err) - require.Equal(tx.ID(), gotTx.GossipID()) - + require.Equal(responseGossip, [][]byte{ + tx.Bytes(), + }) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse)) diff --git a/graft/coreth/plugin/evm/eth_gossiper.go b/graft/coreth/plugin/evm/eth_gossiper.go index b6129670e012..438b15d2a4cd 100644 --- a/graft/coreth/plugin/evm/eth_gossiper.go +++ b/graft/coreth/plugin/evm/eth_gossiper.go @@ -21,6 +21,7 @@ import ( "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p/gossip" + "github.com/ava-labs/avalanchego/utils/bloom" ethcommon "github.com/ava-labs/libevm/common" ) @@ -28,9 +29,11 @@ import ( const pendingTxsBuffer = 10 var ( - _ gossip.Gossipable = (*GossipEthTx)(nil) - _ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil) - _ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.Gossipable = (*GossipEthTx)(nil) + _ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil) + _ gossip.HandlerSet[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.PullGossiperSet[*GossipEthTx] = (*GossipEthTxPool)(nil) + _ gossip.PushGossiperSet = (*GossipEthTxPool)(nil) _ eth.PushGossiper = (*EthPushGossiper)(nil) ) @@ -132,11 +135,11 @@ func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) { }) } -func (g *GossipEthTxPool) GetFilter() ([]byte, []byte) { +func (g *GossipEthTxPool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() } type GossipEthTxMarshaller struct{} diff --git a/graft/coreth/plugin/evm/gossip/handler.go b/graft/coreth/plugin/evm/gossip/handler.go index 69c376957e1a..99529ddea120 100644 --- a/graft/coreth/plugin/evm/gossip/handler.go +++ b/graft/coreth/plugin/evm/gossip/handler.go @@ -22,7 +22,7 @@ var _ p2p.Handler = (*txGossipHandler)(nil) func NewTxGossipHandler[T gossip.Gossipable]( log logging.Logger, marshaller gossip.Marshaller[T], - mempool gossip.Set[T], + mempool gossip.HandlerSet[T], metrics gossip.Metrics, maxMessageSize int, throttlingPeriod time.Duration, diff --git a/graft/coreth/plugin/evm/tx_gossip_test.go b/graft/coreth/plugin/evm/tx_gossip_test.go index 7f08d501969f..5567ef8776f4 100644 --- a/graft/coreth/plugin/evm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/tx_gossip_test.go @@ -17,7 +17,6 @@ import ( "google.golang.org/protobuf/proto" "github.com/ava-labs/avalanchego/database/memdb" - "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/upgrade/ap0" "github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest" "github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest" @@ -29,6 +28,7 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/snowtest" "github.com/ava-labs/avalanchego/snow/validators" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/crypto/secp256k1" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -107,31 +107,20 @@ func TestEthTxGossip(t *testing.T) { } // Ask the VM for any new transactions. We should get nothing at first. - emptyBloomFilter, err := gossip.NewBloomFilter( - prometheus.NewRegistry(), - "", - config.TxGossipBloomMinTargetElements, - config.TxGossipBloomTargetFalsePositiveRate, - config.TxGossipBloomResetFalsePositiveRate, + requestBytes, err := gossip.MarshalAppRequest( + bloom.EmptyFilter.Marshal(), + agoUtils.RandomBytes(32), ) require.NoError(err) - emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal() - request := &sdk.PullGossipRequest{ - Filter: emptyBloomFilterBytes, - Salt: agoUtils.RandomBytes(32), - } - - requestBytes, err := proto.Marshal(request) - require.NoError(err) wg := &sync.WaitGroup{} wg.Add(1) onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Empty(response.Gossip) + response, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) + require.Empty(response) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse)) @@ -151,19 +140,19 @@ func TestEthTxGossip(t *testing.T) { // wait so we aren't throttled by the vm time.Sleep(5 * time.Second) - marshaller := GossipEthTxMarshaller{} // Ask the VM for new transactions. We should get the newly issued tx. wg.Add(1) onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) { require.NoError(err) - response := &sdk.PullGossipResponse{} - require.NoError(proto.Unmarshal(responseBytes, response)) - require.Len(response.Gossip, 1) + response, err := gossip.ParseAppResponse(responseBytes) + require.NoError(err) - gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0]) + txBytes, err := signedTx.MarshalBinary() require.NoError(err) - require.Equal(signedTx.Hash(), gotTx.Tx.Hash()) + require.Equal(response, [][]byte{ + txBytes, + }) wg.Done() } diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index c46e0776215c..9faa510f12c2 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -18,6 +18,8 @@ import ( // // Invariant: The returned bloom filter is not safe to reset concurrently with // other operations. However, it is otherwise safe to access concurrently. +// +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. func NewBloomFilter( registerer prometheus.Registerer, namespace string, @@ -45,6 +47,7 @@ func NewBloomFilter( return filter, err } +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. type BloomFilter struct { minTargetElements int targetFalsePositiveProbability float64 @@ -72,12 +75,8 @@ func (b *BloomFilter) Has(gossipable Gossipable) bool { return bloom.Contains(b.bloom, h[:], b.salt[:]) } -func (b *BloomFilter) Marshal() ([]byte, []byte) { - bloomBytes := b.bloom.Marshal() - // salt must be copied here to ensure the bytes aren't overwritten if salt - // is later modified. - salt := b.salt - return bloomBytes, salt[:] +func (b *BloomFilter) BloomFilter() (*bloom.Filter, ids.ID) { + return b.bloom, b.salt } // ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability]. @@ -86,6 +85,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) { // the same [targetFalsePositiveProbability]. // // Returns true if the bloom filter was reset. +// +// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 0649972d4afc..4381af81e7ae 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -81,19 +81,21 @@ func TestBloomFilterRefresh(t *testing.T) { var resetCount uint64 for _, item := range tt.add { - bloomBytes, saltBytes := bloom.Marshal() - initialBloomBytes := slices.Clone(bloomBytes) - initialSaltBytes := slices.Clone(saltBytes) + bloomFilter, salt := bloom.BloomFilter() + initialBloomBytes := slices.Clone(bloomFilter.Marshal()) + initialSaltBytes := slices.Clone(salt[:]) reset, err := ResetBloomFilterIfNeeded(bloom, len(tt.add)) require.NoError(err) if reset { resetCount++ } - bloom.Add(item) + require.Equal(initialBloomBytes, bloomFilter.Marshal()) + require.Equal(initialSaltBytes, salt[:]) - require.Equal(initialBloomBytes, bloomBytes) - require.Equal(initialSaltBytes, saltBytes) + // If the bloom filter wasn't reset, adding an item may modify + // the returned bloom filter. + bloom.Add(item) } require.Equal(tt.resetCount, resetCount) diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index c03e513854df..ade88ee61684 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -18,6 +18,7 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/buffer" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -40,6 +41,7 @@ const ( var ( _ Gossiper = (*ValidatorGossiper)(nil) _ Gossiper = (*PullGossiper[Gossipable])(nil) + _ Gossiper = (*PushGossiper[Gossipable])(nil) ioTypeLabels = []string{ioLabel, typeLabel} sentPushLabels = prometheus.Labels{ @@ -75,6 +77,17 @@ var ( ErrInvalidRegossipFrequency = errors.New("re-gossip frequency cannot be negative") ) +// Gossipable is an item that can be gossiped across the network +type Gossipable interface { + GossipID() ids.ID +} + +// Marshaller handles parsing logic for a concrete Gossipable type +type Marshaller[T Gossipable] interface { + MarshalGossip(T) ([]byte, error) + UnmarshalGossip([]byte) (T, error) +} + // Gossiper gossips Gossipables to other nodes type Gossiper interface { // Gossip runs a cycle of gossip. Returns an error if we failed to gossip. @@ -191,7 +204,7 @@ func (v ValidatorGossiper) Gossip(ctx context.Context) error { func NewPullGossiper[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set PullGossiperSet[T], client *p2p.Client, metrics Metrics, pollSize int, @@ -206,17 +219,27 @@ func NewPullGossiper[T Gossipable]( } } +// PullGossiperSet exposes the current bloom filter and allows adding new items +// that were not included in the filter. +type PullGossiperSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // BloomFilter returns the bloom filter and its corresponding salt. + BloomFilter() (bloom *bloom.Filter, salt ids.ID) +} + type PullGossiper[T Gossipable] struct { log logging.Logger marshaller Marshaller[T] - set Set[T] + set PullGossiperSet[T] client *p2p.Client metrics Metrics pollSize int } func (p *PullGossiper[_]) Gossip(ctx context.Context) error { - msgBytes, err := MarshalAppRequest(p.set.GetFilter()) + bf, salt := p.set.BloomFilter() + msgBytes, err := MarshalAppRequest(bf.Marshal(), salt[:]) if err != nil { return err } @@ -293,7 +316,7 @@ func (p *PullGossiper[_]) handleResponse( // NewPushGossiper returns an instance of PushGossiper func NewPushGossiper[T Gossipable]( marshaller Marshaller[T], - mempool Set[T], + set PushGossiperSet, validators p2p.ValidatorSubset, client *p2p.Client, metrics Metrics, @@ -320,7 +343,7 @@ func NewPushGossiper[T Gossipable]( return &PushGossiper[T]{ marshaller: marshaller, - set: mempool, + set: set, validators: validators, client: client, metrics: metrics, @@ -336,10 +359,16 @@ func NewPushGossiper[T Gossipable]( }, nil } +// PushGossiperSet exposes whether hashes are still included in a set. +type PushGossiperSet interface { + // Has returns true if the hash is in the set. + Has(h ids.ID) bool +} + // PushGossiper broadcasts gossip to peers randomly in the network type PushGossiper[T Gossipable] struct { marshaller Marshaller[T] - set Set[T] + set PushGossiperSet validators p2p.ValidatorSubset client *p2p.Client metrics Metrics diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index cb8222ceff4f..5dbcc8e70958 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -20,7 +20,6 @@ import ( "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/snow/validators/validatorstest" - "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/constants" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/utils/set" @@ -69,29 +68,26 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) { type setDouble struct { txs set.Set[tx] - bloom *BloomFilter onAdd func(tx tx) } -func (s *setDouble) Add(gossipable tx) error { - if s.txs.Contains(gossipable) { - return fmt.Errorf("%s already present", ids.ID(gossipable)) +func (s *setDouble) Add(t tx) error { + if s.txs.Contains(t) { + return fmt.Errorf("%s already present", t) } - s.txs.Add(gossipable) - s.bloom.Add(gossipable) + s.txs.Add(t) if s.onAdd != nil { - s.onAdd(gossipable) + s.onAdd(t) } - return nil } -func (s *setDouble) Has(gossipID ids.ID) bool { - return s.txs.Contains(tx(gossipID)) +func (s *setDouble) Has(h ids.ID) bool { + return s.txs.Contains(tx(h)) } -func (s *setDouble) Iterate(f func(gossipable tx) bool) { +func (s *setDouble) Iterate(f func(t tx) bool) { for tx := range s.txs { if !f(tx) { return @@ -99,8 +95,8 @@ func (s *setDouble) Iterate(f func(gossipable tx) bool) { } } -func (s *setDouble) GetFilter() ([]byte, []byte) { - return s.bloom.Marshal() +func (s *setDouble) Len() int { + return s.txs.Len() } func TestGossiperGossip(t *testing.T) { @@ -175,13 +171,17 @@ func TestGossiperGossip(t *testing.T) { ) require.NoError(err) - responseBloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + responseSetWithBloom, err := NewSetWithBloomFilter( + &setDouble{}, + prometheus.NewRegistry(), + "", + 1000, + 0.01, + 0.05, + ) require.NoError(err) - responseSet := &setDouble{ - bloom: responseBloom, - } for _, item := range tt.responder { - require.NoError(responseSet.Add(item)) + require.NoError(responseSetWithBloom.Add(item)) } metrics, err := NewMetrics(prometheus.NewRegistry(), "") @@ -196,7 +196,7 @@ func TestGossiperGossip(t *testing.T) { handler := NewHandler[tx]( logging.NoLog{}, marshaller, - responseSet, + responseSetWithBloom, metrics, tt.targetResponseSize, ) @@ -218,13 +218,18 @@ func TestGossiperGossip(t *testing.T) { require.NoError(err) require.NoError(requestNetwork.Connected(t.Context(), ids.EmptyNodeID, nil)) - bloom, err := NewBloomFilter(prometheus.NewRegistry(), "", 1000, 0.01, 0.05) + var requestSet setDouble + requestSetWithBloom, err := NewSetWithBloomFilter( + &requestSet, + prometheus.NewRegistry(), + "", + 1000, + 0.01, + 0.05, + ) require.NoError(err) - requestSet := &setDouble{ - bloom: bloom, - } for _, item := range tt.requester { - require.NoError(requestSet.Add(item)) + require.NoError(requestSetWithBloom.Add(item)) } requestClient := requestNetwork.NewClient( @@ -236,7 +241,7 @@ func TestGossiperGossip(t *testing.T) { gossiper := NewPullGossiper[tx]( logging.NoLog{}, marshaller, - requestSet, + requestSetWithBloom, requestClient, metrics, 1, @@ -457,20 +462,10 @@ func TestPushGossiperNew(t *testing.T) { } } -type fullSet[T Gossipable] struct{} - -func (fullSet[T]) Add(T) error { - return nil -} - -func (fullSet[T]) Has(ids.ID) bool { - return true -} - -func (fullSet[T]) Iterate(func(gossipable T) bool) {} +type hasFunc func(id ids.ID) bool -func (fullSet[_]) GetFilter() ([]byte, []byte) { - return bloom.FullFilter.Marshal(), ids.Empty[:] +func (h hasFunc) Has(id ids.ID) bool { + return h(id) } // Tests that the outgoing gossip is equivalent to what was accumulated @@ -593,7 +588,9 @@ func TestPushGossiper(t *testing.T) { gossiper, err := NewPushGossiper[tx]( marshaller, - fullSet[tx]{}, + hasFunc(func(ids.ID) bool { + return true // Never remove the items from the set + }), validators, client, metrics, diff --git a/network/p2p/gossip/gossipable.go b/network/p2p/gossip/gossipable.go deleted file mode 100644 index b535609a7a48..000000000000 --- a/network/p2p/gossip/gossipable.go +++ /dev/null @@ -1,31 +0,0 @@ -// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. -// See the file LICENSE for licensing terms. - -package gossip - -import "github.com/ava-labs/avalanchego/ids" - -// Gossipable is an item that can be gossiped across the network -type Gossipable interface { - GossipID() ids.ID -} - -// Marshaller handles parsing logic for a concrete Gossipable type -type Marshaller[T Gossipable] interface { - MarshalGossip(T) ([]byte, error) - UnmarshalGossip([]byte) (T, error) -} - -// Set holds a set of known Gossipable items -type Set[T Gossipable] interface { - // Add adds a Gossipable to the set. Returns an error if gossipable was not - // added. - Add(gossipable T) error - // Has returns true if the gossipable is in the set. - Has(gossipID ids.ID) bool - // Iterate iterates over elements until [f] returns false - Iterate(f func(gossipable T) bool) - // GetFilter returns the byte representation of bloom filter and its - // corresponding salt. - GetFilter() (bloom []byte, salt []byte) -} diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 7c016030c913..20cc31520bf3 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -18,10 +18,19 @@ import ( var _ p2p.Handler = (*Handler[Gossipable])(nil) +// HandlerSet exposes the ability to add new values to the set in response to +// pushed information and for responding to pull requests. +type HandlerSet[T Gossipable] interface { + // Add adds a value to the set. Returns an error if v was not added. + Add(v T) error + // Iterate iterates over elements until f returns false. + Iterate(f func(v T) bool) +} + func NewHandler[T Gossipable]( log logging.Logger, marshaller Marshaller[T], - set Set[T], + set HandlerSet[T], metrics Metrics, targetResponseSize int, ) *Handler[T] { @@ -39,7 +48,7 @@ type Handler[T Gossipable] struct { p2p.Handler marshaller Marshaller[T] log logging.Logger - set Set[T] + set HandlerSet[T] metrics Metrics targetResponseSize int } diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go new file mode 100644 index 000000000000..187b13454fcf --- /dev/null +++ b/network/p2p/gossip/set.go @@ -0,0 +1,159 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "crypto/rand" + "fmt" + "sync" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/bloom" + "github.com/prometheus/client_golang/prometheus" +) + +var ( + _ Set[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) + _ PullGossiperSet[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) +) + +type Set[T Gossipable] interface { + HandlerSet[T] + PushGossiperSet + // Len returns the number of items in the set. + Len() int +} + +// NewSetWithBloomFilter wraps set with a bloom filter. It is expected for all +// future additions to the provided set to go through the returned set, or be +// followed by a call to AddToBloom. +func NewSetWithBloomFilter[T Gossipable]( + set Set[T], + registerer prometheus.Registerer, + namespace string, + minTargetElements int, + targetFalsePositiveProbability, + resetFalsePositiveProbability float64, +) (*SetWithBloomFilter[T], error) { + metrics, err := bloom.NewMetrics(namespace, registerer) + if err != nil { + return nil, err + } + m := &SetWithBloomFilter[T]{ + set: set, + + minTargetElements: minTargetElements, + targetFalsePositiveProbability: targetFalsePositiveProbability, + resetFalsePositiveProbability: resetFalsePositiveProbability, + + metrics: metrics, + } + return m, m.resetBloomFilter() +} + +type SetWithBloomFilter[T Gossipable] struct { + set Set[T] + + minTargetElements int + targetFalsePositiveProbability float64 + resetFalsePositiveProbability float64 + + metrics *bloom.Metrics + + l sync.RWMutex + maxCount int + bloom *bloom.Filter + // salt is provided to eventually unblock collisions in Bloom. It's possible + // that conflicting Gossipable items collide in the bloom filter, so a salt + // is generated to eventually resolve collisions. + salt ids.ID +} + +func (s *SetWithBloomFilter[T]) Add(v T) error { + if err := s.set.Add(v); err != nil { + return err + } + return s.AddToBloom(v.GossipID()) +} + +// AddToBloom adds the provided ID to the bloom filter. This function is exposed +// to allow modifications to the inner set without going through Add. +// +// Even if an error is returned, the ID has still been added to the bloom +// filter. +func (s *SetWithBloomFilter[T]) AddToBloom(h ids.ID) error { + s.l.RLock() + bloom.Add(s.bloom, h[:], s.salt[:]) + shouldReset := s.shouldReset() + s.l.RUnlock() + s.metrics.Count.Inc() + + if !shouldReset { + return nil + } + + s.l.Lock() + defer s.l.Unlock() + + // Bloom filter was already reset by another thread + if !s.shouldReset() { + return nil + } + return s.resetBloomFilter() +} + +func (s *SetWithBloomFilter[T]) shouldReset() bool { + return s.bloom.Count() > s.maxCount +} + +// resetBloomFilter attempts to generate a new bloom filter and fill it with the +// current entries in the set. +// +// If an error is returned, the bloom filter and salt are unchanged. +func (s *SetWithBloomFilter[T]) resetBloomFilter() error { + targetElements := max(2*s.set.Len(), s.minTargetElements) + numHashes, numEntries := bloom.OptimalParameters( + targetElements, + s.targetFalsePositiveProbability, + ) + newBloom, err := bloom.New(numHashes, numEntries) + if err != nil { + return fmt.Errorf("creating new bloom: %w", err) + } + var newSalt ids.ID + if _, err := rand.Read(newSalt[:]); err != nil { + return fmt.Errorf("generating new salt: %w", err) + } + s.set.Iterate(func(v T) bool { + h := v.GossipID() + bloom.Add(newBloom, h[:], newSalt[:]) + return true + }) + + s.maxCount = bloom.EstimateCount(numHashes, numEntries, s.resetFalsePositiveProbability) + s.bloom = newBloom + s.salt = newSalt + + s.metrics.Reset(newBloom, s.maxCount) + return nil +} + +func (s *SetWithBloomFilter[T]) Has(h ids.ID) bool { + return s.set.Has(h) +} + +func (s *SetWithBloomFilter[T]) Iterate(f func(T) bool) { + s.set.Iterate(f) +} + +func (s *SetWithBloomFilter[T]) Len() int { + return s.set.Len() +} + +func (s *SetWithBloomFilter[_]) BloomFilter() (*bloom.Filter, ids.ID) { + s.l.RLock() + defer s.l.RUnlock() + + return s.bloom, s.salt +} diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go new file mode 100644 index 000000000000..add3436ccfc2 --- /dev/null +++ b/network/p2p/gossip/set_test.go @@ -0,0 +1,130 @@ +// Copyright (C) 2019-2025, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package gossip + +import ( + "testing" + + "github.com/ava-labs/avalanchego/utils/bloom" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestSetWithBloomFilter_Refresh(t *testing.T) { + type ( + op struct { + add *tx + remove *tx + } + test struct { + name string + resetFalsePositiveProbability float64 + ops []op + expected []tx + expectedResetCount float64 + } + ) + tests := []test{ + { + name: "no refresh", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + }, + expected: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "no refresh - with removals", + resetFalsePositiveProbability: 1, // maxCount = 9223372036854775807 + ops: []op{ + {add: &tx{0}}, + {add: &tx{1}}, + {add: &tx{2}}, + {remove: &tx{0}}, + {remove: &tx{1}}, + {remove: &tx{2}}, + }, + expected: []tx{ + {0}, + {1}, + {2}, + }, + expectedResetCount: 0, + }, + { + name: "refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + }, + expected: []tx{ + {1}, + }, + expectedResetCount: 1, + }, + { + name: "multiple refresh", + resetFalsePositiveProbability: 0.0000000000000001, // maxCount = 1 + ops: []op{ + {add: &tx{0}}, // no reset + {remove: &tx{0}}, + {add: &tx{1}}, // reset + {remove: &tx{1}}, + {add: &tx{2}}, // reset + }, + expected: []tx{ + {2}, + }, + expectedResetCount: 2, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + require := require.New(t) + + const ( + minTargetElements = 1 + targetFalsePositiveProbability = 0.0001 + ) + var s setDouble + bs, err := NewSetWithBloomFilter( + &s, + prometheus.NewRegistry(), + "", + minTargetElements, + targetFalsePositiveProbability, + tt.resetFalsePositiveProbability, + ) + require.NoError(err) + + for _, op := range tt.ops { + if op.add != nil { + require.NoError(bs.Add(*op.add)) + } + if op.remove != nil { + s.txs.Remove(*op.remove) + } + } + + // Add one to expectedResetCount to account for the initial creation + // of the bloom filter. + require.Equal(tt.expectedResetCount+1, testutil.ToFloat64(bs.metrics.ResetCount)) + b, h := bs.BloomFilter() + for _, expected := range tt.expected { + require.True(bloom.Contains(b, expected[:], h[:])) + } + }) + } +} diff --git a/vms/avm/network/gossip.go b/vms/avm/network/gossip.go index fb2077449dae..4b78a3d9ba98 100644 --- a/vms/avm/network/gossip.go +++ b/vms/avm/network/gossip.go @@ -15,6 +15,7 @@ import ( "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/avm/txs" "github.com/ava-labs/avalanchego/vms/txs/mempool" @@ -154,9 +155,9 @@ func (g *gossipMempool) Iterate(f func(*txs.Tx) bool) { g.Mempool.Iterate(f) } -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { +func (g *gossipMempool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() } diff --git a/vms/platformvm/network/gossip.go b/vms/platformvm/network/gossip.go index 006c1740c57f..575f9bdc6bf7 100644 --- a/vms/platformvm/network/gossip.go +++ b/vms/platformvm/network/gossip.go @@ -16,6 +16,7 @@ import ( "github.com/ava-labs/avalanchego/network/p2p" "github.com/ava-labs/avalanchego/network/p2p/gossip" "github.com/ava-labs/avalanchego/snow/engine/common" + "github.com/ava-labs/avalanchego/utils/bloom" "github.com/ava-labs/avalanchego/utils/logging" "github.com/ava-labs/avalanchego/vms/platformvm/txs" "github.com/ava-labs/avalanchego/vms/platformvm/txs/mempool" @@ -150,9 +151,9 @@ func (g *gossipMempool) Has(txID ids.ID) bool { return ok } -func (g *gossipMempool) GetFilter() (bloom []byte, salt []byte) { +func (g *gossipMempool) BloomFilter() (*bloom.Filter, ids.ID) { g.lock.RLock() defer g.lock.RUnlock() - return g.bloom.Marshal() + return g.bloom.BloomFilter() } From 01e4f3381d30bcfacd91e225e95cac5f733ff5ef Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 11:25:31 -0500 Subject: [PATCH 03/15] nits --- .../coreth/plugin/evm/atomic/vm/tx_gossip_test.go | 9 ++++++--- graft/coreth/plugin/evm/tx_gossip_test.go | 9 ++++++--- network/p2p/gossip/set.go | 15 +++++++++------ 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go index c0cb3026cf90..554095f327e1 100644 --- a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go @@ -160,9 +160,12 @@ func TestAtomicTxGossip(t *testing.T) { responseGossip, err := gossip.ParseAppResponse(responseBytes) require.NoError(err) - require.Equal(responseGossip, [][]byte{ - tx.Bytes(), - }) + require.Equal( + responseGossip, + [][]byte{ + tx.Bytes(), + }, + ) wg.Done() } require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse)) diff --git a/graft/coreth/plugin/evm/tx_gossip_test.go b/graft/coreth/plugin/evm/tx_gossip_test.go index 5567ef8776f4..c9b29b32a857 100644 --- a/graft/coreth/plugin/evm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/tx_gossip_test.go @@ -150,9 +150,12 @@ func TestEthTxGossip(t *testing.T) { txBytes, err := signedTx.MarshalBinary() require.NoError(err) - require.Equal(response, [][]byte{ - txBytes, - }) + require.Equal( + response, + [][]byte{ + txBytes, + }, + ) wg.Done() } diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 187b13454fcf..066233f04a80 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -22,6 +22,9 @@ type Set[T Gossipable] interface { HandlerSet[T] PushGossiperSet // Len returns the number of items in the set. + // + // This value should always be at least as large as the number of items that + // can be iterated over with a call to Iterate. Len() int } @@ -74,20 +77,20 @@ func (s *SetWithBloomFilter[T]) Add(v T) error { if err := s.set.Add(v); err != nil { return err } - return s.AddToBloom(v.GossipID()) + return s.addToBloom(v.GossipID()) } -// AddToBloom adds the provided ID to the bloom filter. This function is exposed -// to allow modifications to the inner set without going through Add. +// addToBloom adds the provided ID to the bloom filter. // // Even if an error is returned, the ID has still been added to the bloom // filter. -func (s *SetWithBloomFilter[T]) AddToBloom(h ids.ID) error { +func (s *SetWithBloomFilter[T]) addToBloom(h ids.ID) error { s.l.RLock() - bloom.Add(s.bloom, h[:], s.salt[:]) + if bloom.Add(s.bloom, h[:], s.salt[:]) { + s.metrics.Count.Inc() + } shouldReset := s.shouldReset() s.l.RUnlock() - s.metrics.Count.Inc() if !shouldReset { return nil From 0b09d08d0b093a140ba4b5557c3df38b5c93397e Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 11:46:23 -0500 Subject: [PATCH 04/15] fix order --- graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go | 4 ++-- graft/coreth/plugin/evm/tx_gossip_test.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go index 554095f327e1..a17e6294e6e1 100644 --- a/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go @@ -161,10 +161,10 @@ func TestAtomicTxGossip(t *testing.T) { responseGossip, err := gossip.ParseAppResponse(responseBytes) require.NoError(err) require.Equal( - responseGossip, [][]byte{ - tx.Bytes(), + tx.SignedBytes(), }, + responseGossip, ) wg.Done() } diff --git a/graft/coreth/plugin/evm/tx_gossip_test.go b/graft/coreth/plugin/evm/tx_gossip_test.go index c9b29b32a857..5b4159eeb708 100644 --- a/graft/coreth/plugin/evm/tx_gossip_test.go +++ b/graft/coreth/plugin/evm/tx_gossip_test.go @@ -151,10 +151,10 @@ func TestEthTxGossip(t *testing.T) { txBytes, err := signedTx.MarshalBinary() require.NoError(err) require.Equal( - response, [][]byte{ txBytes, }, + response, ) wg.Done() From 1ca0b4f47250ccf59d94f4c331e9da180fce270c Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:14:17 -0500 Subject: [PATCH 05/15] add concurrency test --- network/p2p/gossip/gossip_test.go | 20 +++++++++++++ network/p2p/gossip/set_test.go | 47 +++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+) diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index 5dbcc8e70958..91c3e61e90aa 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -67,11 +67,15 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) { } type setDouble struct { + l sync.RWMutex txs set.Set[tx] onAdd func(tx tx) } func (s *setDouble) Add(t tx) error { + s.l.Lock() + defer s.l.Unlock() + if s.txs.Contains(t) { return fmt.Errorf("%s already present", t) } @@ -83,11 +87,24 @@ func (s *setDouble) Add(t tx) error { return nil } +func (s *setDouble) Remove(t tx) { + s.l.Lock() + defer s.l.Unlock() + + s.txs.Remove(t) +} + func (s *setDouble) Has(h ids.ID) bool { + s.l.RLock() + defer s.l.RUnlock() + return s.txs.Contains(tx(h)) } func (s *setDouble) Iterate(f func(t tx) bool) { + s.l.RLock() + defer s.l.RUnlock() + for tx := range s.txs { if !f(tx) { return @@ -96,6 +113,9 @@ func (s *setDouble) Iterate(f func(t tx) bool) { } func (s *setDouble) Len() int { + s.l.RLock() + defer s.l.RUnlock() + return s.txs.Len() } diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go index add3436ccfc2..058c44999f94 100644 --- a/network/p2p/gossip/set_test.go +++ b/network/p2p/gossip/set_test.go @@ -4,12 +4,15 @@ package gossip import ( + "fmt" "testing" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/bloom" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" ) func TestSetWithBloomFilter_Refresh(t *testing.T) { @@ -128,3 +131,47 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { }) } } + +// TestSetWithBloomFilter_Concurrent tests that SetWithBloomFilter is ensures +// that the returned bloom filter is a super set of the items in the Set at the +// time it is called, even under concurrent resets. +func TestSetWithBloomFilter_Concurrent(t *testing.T) { + require := require.New(t) + + const ( + minTargetElements = 1 + targetFalsePositiveProbability = 0.01 + resetFalsePositiveProbability = 0.0000000001 // Forces frequent resets + ) + var s setDouble + bs, err := NewSetWithBloomFilter( + &s, + prometheus.NewRegistry(), + "", + minTargetElements, + targetFalsePositiveProbability, + resetFalsePositiveProbability, + ) + require.NoError(err) + + var eg errgroup.Group + for range 10 { + eg.Go(func() error { + for range 1000 { + tx := tx(ids.GenerateTestID()) + if err := bs.Add(tx); err != nil { + return err + } + + bf, salt := bs.BloomFilter() + if !bloom.Contains(bf, tx[:], salt[:]) { + return fmt.Errorf("expected to find %s in bloom filter", tx) + } + + s.Remove(tx) + } + return nil + }) + } + require.NoError(eg.Wait()) +} From 88f1fb0bcbb0d17c925b11824c942ad69db68a79 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:15:47 -0500 Subject: [PATCH 06/15] fix lint --- network/p2p/gossip/set.go | 3 ++- network/p2p/gossip/set_test.go | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 066233f04a80..6bab81520234 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -8,9 +8,10 @@ import ( "fmt" "sync" + "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/bloom" - "github.com/prometheus/client_golang/prometheus" ) var ( diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go index 058c44999f94..4bcb6ab89ed5 100644 --- a/network/p2p/gossip/set_test.go +++ b/network/p2p/gossip/set_test.go @@ -7,12 +7,13 @@ import ( "fmt" "testing" - "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/utils/bloom" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" + + "github.com/ava-labs/avalanchego/ids" + "github.com/ava-labs/avalanchego/utils/bloom" ) func TestSetWithBloomFilter_Refresh(t *testing.T) { From 690e1ae9bb8a1f4deba5ce4be5ded8acc6b18493 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:21:18 -0500 Subject: [PATCH 07/15] Comments --- network/p2p/gossip/set.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 6bab81520234..75d32896d7bd 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -74,6 +74,10 @@ type SetWithBloomFilter[T Gossipable] struct { salt ids.ID } +// Add adds v to the set and bloom filter. +// +// If adding to the bloom filter fails after successfully adding to the set, the +// value remains in the set and is still added to the bloom filter. func (s *SetWithBloomFilter[T]) Add(v T) error { if err := s.set.Add(v); err != nil { return err From 98b2978e465a4e3eae8c36da6def78ea17f91e35 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:33:39 -0500 Subject: [PATCH 08/15] wip --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 1ae9b6401d26..45c4c575a84b 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/DataDog/zstd v1.5.2 github.com/StephenButtolph/canoto v0.17.3 github.com/antithesishq/antithesis-sdk-go v0.3.8 - github.com/ava-labs/avalanchego/graft/coreth v0.16.0-rc.0 + github.com/ava-labs/avalanchego/graft/coreth v0.0.0 github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d github.com/btcsuite/btcd/btcutil v1.1.3 From 3ba684b1591bffacde084aa4892d47838808d640 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:36:16 -0500 Subject: [PATCH 09/15] wip --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 45c4c575a84b..9f9afa07d8ec 100644 --- a/go.mod +++ b/go.mod @@ -21,7 +21,7 @@ require ( github.com/DataDog/zstd v1.5.2 github.com/StephenButtolph/canoto v0.17.3 github.com/antithesishq/antithesis-sdk-go v0.3.8 - github.com/ava-labs/avalanchego/graft/coreth v0.0.0 + github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201173339-98b2978e465a github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d github.com/btcsuite/btcd/btcutil v1.1.3 From edc2d777ccacd10785ca19369cfbed0cfdf6f490 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Mon, 1 Dec 2025 12:51:52 -0500 Subject: [PATCH 10/15] ci? --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 9f9afa07d8ec..3bff67f5e91b 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,7 @@ require ( github.com/antithesishq/antithesis-sdk-go v0.3.8 github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201173339-98b2978e465a github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 - github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d + github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d github.com/btcsuite/btcd/btcutil v1.1.3 github.com/cespare/xxhash/v2 v2.3.0 github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593 diff --git a/go.sum b/go.sum index c0068716ca84..9453b5e0cad6 100644 --- a/go.sum +++ b/go.sum @@ -77,8 +77,8 @@ github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 h1:hQ15IJxY7WO github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2/go.mod h1:DqSotSn4Dx/UJV+d3svfW8raR+cH7+Ohl9BpsQ5HlGU= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19 h1:S6oFasZsplNmw8B2S8cMJQMa62nT5ZKGzZRdCpd+5qQ= github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d h1:7pjEE0BXLjzQlq5uKP5B2BTl9jTgDKaOfJx2Qfb01Jo= -github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d/go.mod h1:JTvIe8YbCjHpy8vy9uZBSpDXxawNXSnIe/Wlf3I09Tk= +github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d h1:IlhCuTqhPEfpW+q/8ZlhmjflB/Onn9AhtXuRCRYa+oo= +github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d/go.mod h1:Hvl0SeW3Y/ZUgVQrfjzumterrF5T898YtkhDguq+pQA= github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= From 9d27671bd56d463215c54e27414ec1a243d97ebd Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 3 Dec 2025 11:12:20 -0500 Subject: [PATCH 11/15] Address most PR comments --- network/p2p/gossip/bloom.go | 6 +- network/p2p/gossip/bloom_test.go | 3 +- network/p2p/gossip/gossip_test.go | 48 ++++------ network/p2p/gossip/set.go | 147 +++++++++++++++++------------- network/p2p/gossip/set_test.go | 65 +++++++------ 5 files changed, 139 insertions(+), 130 deletions(-) diff --git a/network/p2p/gossip/bloom.go b/network/p2p/gossip/bloom.go index 9faa510f12c2..341284f257ca 100644 --- a/network/p2p/gossip/bloom.go +++ b/network/p2p/gossip/bloom.go @@ -19,7 +19,7 @@ import ( // Invariant: The returned bloom filter is not safe to reset concurrently with // other operations. However, it is otherwise safe to access concurrently. // -// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. +// Deprecated: [BloomSet] should be used to manage bloom filters. func NewBloomFilter( registerer prometheus.Registerer, namespace string, @@ -47,7 +47,7 @@ func NewBloomFilter( return filter, err } -// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. +// Deprecated: [BloomSet] should be used to manage bloom filters. type BloomFilter struct { minTargetElements int targetFalsePositiveProbability float64 @@ -86,7 +86,7 @@ func (b *BloomFilter) BloomFilter() (*bloom.Filter, ids.ID) { // // Returns true if the bloom filter was reset. // -// Deprecated: [SetWithBloomFilter] should be used to manage bloom filters. +// Deprecated: [BloomSet] should be used to manage bloom filters. func ResetBloomFilterIfNeeded( bloomFilter *BloomFilter, targetElements int, diff --git a/network/p2p/gossip/bloom_test.go b/network/p2p/gossip/bloom_test.go index 4381af81e7ae..03485e3d6024 100644 --- a/network/p2p/gossip/bloom_test.go +++ b/network/p2p/gossip/bloom_test.go @@ -94,7 +94,8 @@ func TestBloomFilterRefresh(t *testing.T) { require.Equal(initialSaltBytes, salt[:]) // If the bloom filter wasn't reset, adding an item may modify - // the returned bloom filter. + // the returned bloom filter, so this must be done after the + // checks. bloom.Add(item) } diff --git a/network/p2p/gossip/gossip_test.go b/network/p2p/gossip/gossip_test.go index 91c3e61e90aa..deb060a01707 100644 --- a/network/p2p/gossip/gossip_test.go +++ b/network/p2p/gossip/gossip_test.go @@ -67,14 +67,14 @@ func (marshaller) UnmarshalGossip(bytes []byte) (tx, error) { } type setDouble struct { - l sync.RWMutex + lock sync.RWMutex txs set.Set[tx] onAdd func(tx tx) } func (s *setDouble) Add(t tx) error { - s.l.Lock() - defer s.l.Unlock() + s.lock.Lock() + defer s.lock.Unlock() if s.txs.Contains(t) { return fmt.Errorf("%s already present", t) @@ -88,22 +88,22 @@ func (s *setDouble) Add(t tx) error { } func (s *setDouble) Remove(t tx) { - s.l.Lock() - defer s.l.Unlock() + s.lock.Lock() + defer s.lock.Unlock() s.txs.Remove(t) } func (s *setDouble) Has(h ids.ID) bool { - s.l.RLock() - defer s.l.RUnlock() + s.lock.RLock() + defer s.lock.RUnlock() return s.txs.Contains(tx(h)) } func (s *setDouble) Iterate(f func(t tx) bool) { - s.l.RLock() - defer s.l.RUnlock() + s.lock.RLock() + defer s.lock.RUnlock() for tx := range s.txs { if !f(tx) { @@ -113,8 +113,8 @@ func (s *setDouble) Iterate(f func(t tx) bool) { } func (s *setDouble) Len() int { - s.l.RLock() - defer s.l.RUnlock() + s.lock.RLock() + defer s.lock.RUnlock() return s.txs.Len() } @@ -191,17 +191,10 @@ func TestGossiperGossip(t *testing.T) { ) require.NoError(err) - responseSetWithBloom, err := NewSetWithBloomFilter( - &setDouble{}, - prometheus.NewRegistry(), - "", - 1000, - 0.01, - 0.05, - ) + responseBloomSet, err := NewBloomSet(&setDouble{}, BloomSetConfig{}) require.NoError(err) for _, item := range tt.responder { - require.NoError(responseSetWithBloom.Add(item)) + require.NoError(responseBloomSet.Add(item)) } metrics, err := NewMetrics(prometheus.NewRegistry(), "") @@ -216,7 +209,7 @@ func TestGossiperGossip(t *testing.T) { handler := NewHandler[tx]( logging.NoLog{}, marshaller, - responseSetWithBloom, + responseBloomSet, metrics, tt.targetResponseSize, ) @@ -239,17 +232,10 @@ func TestGossiperGossip(t *testing.T) { require.NoError(requestNetwork.Connected(t.Context(), ids.EmptyNodeID, nil)) var requestSet setDouble - requestSetWithBloom, err := NewSetWithBloomFilter( - &requestSet, - prometheus.NewRegistry(), - "", - 1000, - 0.01, - 0.05, - ) + requestBloomSet, err := NewBloomSet(&requestSet, BloomSetConfig{}) require.NoError(err) for _, item := range tt.requester { - require.NoError(requestSetWithBloom.Add(item)) + require.NoError(requestBloomSet.Add(item)) } requestClient := requestNetwork.NewClient( @@ -261,7 +247,7 @@ func TestGossiperGossip(t *testing.T) { gossiper := NewPullGossiper[tx]( logging.NoLog{}, marshaller, - requestSetWithBloom, + requestBloomSet, requestClient, metrics, 1, diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 75d32896d7bd..102453686592 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -8,64 +8,81 @@ import ( "fmt" "sync" - "github.com/prometheus/client_golang/prometheus" - "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/bloom" ) +const ( + DefaultMinTargetElements = 1000 + DefaultTargetFalsePositiveProbability = .01 + DefaultResetFalsePositiveProbability = .05 +) + var ( - _ Set[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) - _ PullGossiperSet[Gossipable] = (*SetWithBloomFilter[Gossipable])(nil) + _ Set[Gossipable] = (*BloomSet[Gossipable])(nil) + _ PullGossiperSet[Gossipable] = (*BloomSet[Gossipable])(nil) + + ErrBloomReset = fmt.Errorf("bloom reset") ) +type BloomSetConfig struct { + // Metrics allows exposing the current state of the bloom filter across + // additions and resets. If nil, no metrics are recorded. + Metrics *bloom.Metrics + // MinTargetElements is the minimum number of elements to target when + // creating a new bloom filter. If zero, [DefaultMinTargetElements] is used. + MinTargetElements int + // TargetFalsePositiveProbability is the target false positive probability + // when creating a new bloom filter. If zero, + // [DefaultTargetFalsePositiveProbability] is used. + TargetFalsePositiveProbability float64 + // ResetFalsePositiveProbability is the false positive probability at + // which the bloom filter is reset. If zero, + // [DefaultResetFalsePositiveProbability] is used. + ResetFalsePositiveProbability float64 +} + +func (c *BloomSetConfig) fillDefaults() { + if c.MinTargetElements == 0 { + c.MinTargetElements = DefaultMinTargetElements + } + if c.TargetFalsePositiveProbability == 0 { + c.TargetFalsePositiveProbability = DefaultTargetFalsePositiveProbability + } + if c.ResetFalsePositiveProbability == 0 { + c.ResetFalsePositiveProbability = DefaultResetFalsePositiveProbability + } +} + type Set[T Gossipable] interface { HandlerSet[T] PushGossiperSet // Len returns the number of items in the set. // - // This value should always be at least as large as the number of items that - // can be iterated over with a call to Iterate. + // This value should mast the number of items that can be iterated over with + // a call to Iterate. Len() int } -// NewSetWithBloomFilter wraps set with a bloom filter. It is expected for all -// future additions to the provided set to go through the returned set, or be -// followed by a call to AddToBloom. -func NewSetWithBloomFilter[T Gossipable]( +// NewBloomSet wraps the [Set] with a bloom filter. It is expected for all +// future additions to the provided set to go through the returned [BloomSet]. +func NewBloomSet[T Gossipable]( set Set[T], - registerer prometheus.Registerer, - namespace string, - minTargetElements int, - targetFalsePositiveProbability, - resetFalsePositiveProbability float64, -) (*SetWithBloomFilter[T], error) { - metrics, err := bloom.NewMetrics(namespace, registerer) - if err != nil { - return nil, err - } - m := &SetWithBloomFilter[T]{ + c BloomSetConfig, +) (*BloomSet[T], error) { + c.fillDefaults() + m := &BloomSet[T]{ set: set, - - minTargetElements: minTargetElements, - targetFalsePositiveProbability: targetFalsePositiveProbability, - resetFalsePositiveProbability: resetFalsePositiveProbability, - - metrics: metrics, + c: c, } - return m, m.resetBloomFilter() + return m, m.resetBloom() } -type SetWithBloomFilter[T Gossipable] struct { +type BloomSet[T Gossipable] struct { set Set[T] + c BloomSetConfig - minTargetElements int - targetFalsePositiveProbability float64 - resetFalsePositiveProbability float64 - - metrics *bloom.Metrics - - l sync.RWMutex + lock sync.RWMutex maxCount int bloom *bloom.Filter // salt is provided to eventually unblock collisions in Bloom. It's possible @@ -76,9 +93,10 @@ type SetWithBloomFilter[T Gossipable] struct { // Add adds v to the set and bloom filter. // -// If adding to the bloom filter fails after successfully adding to the set, the -// value remains in the set and is still added to the bloom filter. -func (s *SetWithBloomFilter[T]) Add(v T) error { +// If adding the inner set succeeds and resetting the bloom filter fails, +// [ErrBloomReset] is returned. However, it is still guaranteed that v has +// been added to the bloom filter. +func (s *BloomSet[T]) Add(v T) error { if err := s.set.Add(v); err != nil { return err } @@ -89,49 +107,52 @@ func (s *SetWithBloomFilter[T]) Add(v T) error { // // Even if an error is returned, the ID has still been added to the bloom // filter. -func (s *SetWithBloomFilter[T]) addToBloom(h ids.ID) error { - s.l.RLock() - if bloom.Add(s.bloom, h[:], s.salt[:]) { - s.metrics.Count.Inc() +func (s *BloomSet[T]) addToBloom(h ids.ID) error { + s.lock.RLock() + if bloom.Add(s.bloom, h[:], s.salt[:]) && s.c.Metrics != nil { + s.c.Metrics.Count.Inc() } shouldReset := s.shouldReset() - s.l.RUnlock() + s.lock.RUnlock() if !shouldReset { return nil } - s.l.Lock() - defer s.l.Unlock() + s.lock.Lock() + defer s.lock.Unlock() // Bloom filter was already reset by another thread if !s.shouldReset() { return nil } - return s.resetBloomFilter() + return s.resetBloom() } -func (s *SetWithBloomFilter[T]) shouldReset() bool { +// shouldReset expects either a read lock or a write lock to be held. +func (s *BloomSet[T]) shouldReset() bool { return s.bloom.Count() > s.maxCount } -// resetBloomFilter attempts to generate a new bloom filter and fill it with the +// resetBloom attempts to generate a new bloom filter and fill it with the // current entries in the set. // // If an error is returned, the bloom filter and salt are unchanged. -func (s *SetWithBloomFilter[T]) resetBloomFilter() error { - targetElements := max(2*s.set.Len(), s.minTargetElements) +// +// resetBloom expects a write lock to be held. +func (s *BloomSet[T]) resetBloom() error { + targetElements := max(2*s.set.Len(), s.c.MinTargetElements) numHashes, numEntries := bloom.OptimalParameters( targetElements, - s.targetFalsePositiveProbability, + s.c.TargetFalsePositiveProbability, ) newBloom, err := bloom.New(numHashes, numEntries) if err != nil { - return fmt.Errorf("creating new bloom: %w", err) + return fmt.Errorf("%w: creating new bloom: %w", ErrBloomReset, err) } var newSalt ids.ID if _, err := rand.Read(newSalt[:]); err != nil { - return fmt.Errorf("generating new salt: %w", err) + return fmt.Errorf("%w: generating new salt: %w", ErrBloomReset, err) } s.set.Iterate(func(v T) bool { h := v.GossipID() @@ -139,29 +160,31 @@ func (s *SetWithBloomFilter[T]) resetBloomFilter() error { return true }) - s.maxCount = bloom.EstimateCount(numHashes, numEntries, s.resetFalsePositiveProbability) + s.maxCount = bloom.EstimateCount(numHashes, numEntries, s.c.ResetFalsePositiveProbability) s.bloom = newBloom s.salt = newSalt - s.metrics.Reset(newBloom, s.maxCount) + if s.c.Metrics != nil { + s.c.Metrics.Reset(newBloom, s.maxCount) + } return nil } -func (s *SetWithBloomFilter[T]) Has(h ids.ID) bool { +func (s *BloomSet[T]) Has(h ids.ID) bool { return s.set.Has(h) } -func (s *SetWithBloomFilter[T]) Iterate(f func(T) bool) { +func (s *BloomSet[T]) Iterate(f func(T) bool) { s.set.Iterate(f) } -func (s *SetWithBloomFilter[T]) Len() int { +func (s *BloomSet[T]) Len() int { return s.set.Len() } -func (s *SetWithBloomFilter[_]) BloomFilter() (*bloom.Filter, ids.ID) { - s.l.RLock() - defer s.l.RUnlock() +func (s *BloomSet[_]) BloomFilter() (*bloom.Filter, ids.ID) { + s.lock.RLock() + defer s.lock.RUnlock() return s.bloom, s.salt } diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go index 4bcb6ab89ed5..ff31b75a83e3 100644 --- a/network/p2p/gossip/set_test.go +++ b/network/p2p/gossip/set_test.go @@ -16,7 +16,7 @@ import ( "github.com/ava-labs/avalanchego/utils/bloom" ) -func TestSetWithBloomFilter_Refresh(t *testing.T) { +func TestBloomSet_Refresh(t *testing.T) { type ( op struct { add *tx @@ -26,7 +26,7 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { name string resetFalsePositiveProbability float64 ops []op - expected []tx + expectedInFilter []tx expectedResetCount float64 } ) @@ -39,7 +39,7 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { {add: &tx{1}}, {add: &tx{2}}, }, - expected: []tx{ + expectedInFilter: []tx{ {0}, {1}, {2}, @@ -57,7 +57,7 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { {remove: &tx{1}}, {remove: &tx{2}}, }, - expected: []tx{ + expectedInFilter: []tx{ {0}, {1}, {2}, @@ -72,7 +72,7 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { {remove: &tx{0}}, {add: &tx{1}}, // reset }, - expected: []tx{ + expectedInFilter: []tx{ {1}, }, expectedResetCount: 1, @@ -87,7 +87,7 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { {remove: &tx{1}}, {add: &tx{2}}, // reset }, - expected: []tx{ + expectedInFilter: []tx{ {2}, }, expectedResetCount: 2, @@ -100,22 +100,25 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { const ( minTargetElements = 1 - targetFalsePositiveProbability = 0.0001 + targetFalsePositiveProbability = 0.000001 ) var s setDouble - bs, err := NewSetWithBloomFilter( + m, err := bloom.NewMetrics("", prometheus.NewRegistry()) + require.NoError(err, "NewMetrics()") + bs, err := NewBloomSet( &s, - prometheus.NewRegistry(), - "", - minTargetElements, - targetFalsePositiveProbability, - tt.resetFalsePositiveProbability, + BloomSetConfig{ + Metrics: m, + MinTargetElements: minTargetElements, + TargetFalsePositiveProbability: targetFalsePositiveProbability, + ResetFalsePositiveProbability: tt.resetFalsePositiveProbability, + }, ) - require.NoError(err) + require.NoError(err, "NewBloomSet()") for _, op := range tt.ops { if op.add != nil { - require.NoError(bs.Add(*op.add)) + require.NoErrorf(bs.Add(*op.add), "%T.Add(...)", bs) } if op.remove != nil { s.txs.Remove(*op.remove) @@ -124,34 +127,30 @@ func TestSetWithBloomFilter_Refresh(t *testing.T) { // Add one to expectedResetCount to account for the initial creation // of the bloom filter. - require.Equal(tt.expectedResetCount+1, testutil.ToFloat64(bs.metrics.ResetCount)) + require.Equal(tt.expectedResetCount+1, testutil.ToFloat64(m.ResetCount), "number of resets") b, h := bs.BloomFilter() - for _, expected := range tt.expected { - require.True(bloom.Contains(b, expected[:], h[:])) + for _, expected := range tt.expectedInFilter { + require.Truef(bloom.Contains(b, expected[:], h[:]), "%T.Contains(%s)", b, expected.GossipID()) } }) } } -// TestSetWithBloomFilter_Concurrent tests that SetWithBloomFilter is ensures -// that the returned bloom filter is a super set of the items in the Set at the -// time it is called, even under concurrent resets. -func TestSetWithBloomFilter_Concurrent(t *testing.T) { +// TestBloomSet_Concurrent tests that BloomSet ensures that the returned bloom +// filter is a super set of the items in the Set at the time it is called, even +// under concurrent resets, because resets of the filter are accompanied by +// refilling from the Set. +func TestBloomSet_Concurrent(t *testing.T) { require := require.New(t) - const ( - minTargetElements = 1 - targetFalsePositiveProbability = 0.01 - resetFalsePositiveProbability = 0.0000000001 // Forces frequent resets - ) var s setDouble - bs, err := NewSetWithBloomFilter( + bs, err := NewBloomSet( &s, - prometheus.NewRegistry(), - "", - minTargetElements, - targetFalsePositiveProbability, - resetFalsePositiveProbability, + BloomSetConfig{ + MinTargetElements: 1, + TargetFalsePositiveProbability: 0.01, + ResetFalsePositiveProbability: 0.0000000001, // Forces frequent resets + }, ) require.NoError(err) From fddd265598a2b954828fe918e19ce92e667efc78 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 3 Dec 2025 11:15:50 -0500 Subject: [PATCH 12/15] Add TODOs --- network/p2p/gossip/gossip.go | 6 ++++++ network/p2p/gossip/handler.go | 3 +++ 2 files changed, 9 insertions(+) diff --git a/network/p2p/gossip/gossip.go b/network/p2p/gossip/gossip.go index ade88ee61684..35867588a29f 100644 --- a/network/p2p/gossip/gossip.go +++ b/network/p2p/gossip/gossip.go @@ -221,6 +221,9 @@ func NewPullGossiper[T Gossipable]( // PullGossiperSet exposes the current bloom filter and allows adding new items // that were not included in the filter. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. type PullGossiperSet[T Gossipable] interface { // Add adds a value to the set. Returns an error if v was not added. Add(v T) error @@ -360,6 +363,9 @@ func NewPushGossiper[T Gossipable]( } // PushGossiperSet exposes whether hashes are still included in a set. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. type PushGossiperSet interface { // Has returns true if the hash is in the set. Has(h ids.ID) bool diff --git a/network/p2p/gossip/handler.go b/network/p2p/gossip/handler.go index 20cc31520bf3..de405fc40893 100644 --- a/network/p2p/gossip/handler.go +++ b/network/p2p/gossip/handler.go @@ -20,6 +20,9 @@ var _ p2p.Handler = (*Handler[Gossipable])(nil) // HandlerSet exposes the ability to add new values to the set in response to // pushed information and for responding to pull requests. +// +// TODO: Consider naming this interface based on what it provides rather than +// how its used. type HandlerSet[T Gossipable] interface { // Add adds a value to the set. Returns an error if v was not added. Add(v T) error From f3bbb0d12c1163e6c934d07fc859d5b1064e4167 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 3 Dec 2025 11:16:25 -0500 Subject: [PATCH 13/15] typo --- network/p2p/gossip/set.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 102453686592..80a697ce2796 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -59,8 +59,8 @@ type Set[T Gossipable] interface { PushGossiperSet // Len returns the number of items in the set. // - // This value should mast the number of items that can be iterated over with - // a call to Iterate. + // This value should match the number of items that can be iterated over + // with a call to Iterate. Len() int } From fb3b4131ee73a85ae09f9ffccaafae1678640b6c Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 3 Dec 2025 11:21:46 -0500 Subject: [PATCH 14/15] manually pass t --- network/p2p/gossip/set_test.go | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/network/p2p/gossip/set_test.go b/network/p2p/gossip/set_test.go index ff31b75a83e3..bbdb90a7c675 100644 --- a/network/p2p/gossip/set_test.go +++ b/network/p2p/gossip/set_test.go @@ -96,15 +96,13 @@ func TestBloomSet_Refresh(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - require := require.New(t) - const ( minTargetElements = 1 targetFalsePositiveProbability = 0.000001 ) var s setDouble m, err := bloom.NewMetrics("", prometheus.NewRegistry()) - require.NoError(err, "NewMetrics()") + require.NoError(t, err, "NewMetrics()") bs, err := NewBloomSet( &s, BloomSetConfig{ @@ -114,11 +112,11 @@ func TestBloomSet_Refresh(t *testing.T) { ResetFalsePositiveProbability: tt.resetFalsePositiveProbability, }, ) - require.NoError(err, "NewBloomSet()") + require.NoError(t, err, "NewBloomSet()") for _, op := range tt.ops { if op.add != nil { - require.NoErrorf(bs.Add(*op.add), "%T.Add(...)", bs) + require.NoErrorf(t, bs.Add(*op.add), "%T.Add(...)", bs) } if op.remove != nil { s.txs.Remove(*op.remove) @@ -127,10 +125,10 @@ func TestBloomSet_Refresh(t *testing.T) { // Add one to expectedResetCount to account for the initial creation // of the bloom filter. - require.Equal(tt.expectedResetCount+1, testutil.ToFloat64(m.ResetCount), "number of resets") + require.Equal(t, tt.expectedResetCount+1, testutil.ToFloat64(m.ResetCount), "number of resets") b, h := bs.BloomFilter() for _, expected := range tt.expectedInFilter { - require.Truef(bloom.Contains(b, expected[:], h[:]), "%T.Contains(%s)", b, expected.GossipID()) + require.Truef(t, bloom.Contains(b, expected[:], h[:]), "%T.Contains(%s)", b, expected.GossipID()) } }) } @@ -141,8 +139,6 @@ func TestBloomSet_Refresh(t *testing.T) { // under concurrent resets, because resets of the filter are accompanied by // refilling from the Set. func TestBloomSet_Concurrent(t *testing.T) { - require := require.New(t) - var s setDouble bs, err := NewBloomSet( &s, @@ -152,7 +148,7 @@ func TestBloomSet_Concurrent(t *testing.T) { ResetFalsePositiveProbability: 0.0000000001, // Forces frequent resets }, ) - require.NoError(err) + require.NoError(t, err) var eg errgroup.Group for range 10 { @@ -173,5 +169,5 @@ func TestBloomSet_Concurrent(t *testing.T) { return nil }) } - require.NoError(eg.Wait()) + require.NoError(t, eg.Wait()) } From cb16af9defaea528424d05c9f3ded262b54b4e85 Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Wed, 3 Dec 2025 11:28:24 -0500 Subject: [PATCH 15/15] lint --- network/p2p/gossip/set.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/network/p2p/gossip/set.go b/network/p2p/gossip/set.go index 80a697ce2796..b5b28a7f6ee9 100644 --- a/network/p2p/gossip/set.go +++ b/network/p2p/gossip/set.go @@ -5,6 +5,7 @@ package gossip import ( "crypto/rand" + "errors" "fmt" "sync" @@ -22,7 +23,7 @@ var ( _ Set[Gossipable] = (*BloomSet[Gossipable])(nil) _ PullGossiperSet[Gossipable] = (*BloomSet[Gossipable])(nil) - ErrBloomReset = fmt.Errorf("bloom reset") + ErrBloomReset = errors.New("bloom reset") ) type BloomSetConfig struct {