Skip to content

Commit 39b7456

Browse files
Implement gossip.SetWithBloomFilter (#4632)
1 parent b57ece0 commit 39b7456

File tree

17 files changed

+533
-158
lines changed

17 files changed

+533
-158
lines changed

go.mod

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ require (
2121
github.com/DataDog/zstd v1.5.2
2222
github.com/StephenButtolph/canoto v0.17.3
2323
github.com/antithesishq/antithesis-sdk-go v0.3.8
24-
github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201152440-a263a7cedcd0
24+
github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201173339-98b2978e465a
2525
github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2
26-
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d
26+
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d
2727
github.com/btcsuite/btcd/btcutil v1.1.3
2828
github.com/cespare/xxhash/v2 v2.3.0
2929
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,8 @@ github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 h1:hQ15IJxY7WO
7777
github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2/go.mod h1:DqSotSn4Dx/UJV+d3svfW8raR+cH7+Ohl9BpsQ5HlGU=
7878
github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19 h1:S6oFasZsplNmw8B2S8cMJQMa62nT5ZKGzZRdCpd+5qQ=
7979
github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0=
80-
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d h1:7pjEE0BXLjzQlq5uKP5B2BTl9jTgDKaOfJx2Qfb01Jo=
81-
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d/go.mod h1:JTvIe8YbCjHpy8vy9uZBSpDXxawNXSnIe/Wlf3I09Tk=
80+
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d h1:IlhCuTqhPEfpW+q/8ZlhmjflB/Onn9AhtXuRCRYa+oo=
81+
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d/go.mod h1:Hvl0SeW3Y/ZUgVQrfjzumterrF5T898YtkhDguq+pQA=
8282
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
8383
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
8484
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=

graft/coreth/plugin/evm/atomic/txpool/mempool.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,13 @@ import (
1515
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
1616
"github.com/ava-labs/avalanchego/ids"
1717
"github.com/ava-labs/avalanchego/network/p2p/gossip"
18+
"github.com/ava-labs/avalanchego/utils/bloom"
1819
)
1920

2021
var (
21-
_ gossip.Set[*atomic.Tx] = (*Mempool)(nil)
22+
_ gossip.HandlerSet[*atomic.Tx] = (*Mempool)(nil)
23+
_ gossip.PullGossiperSet[*atomic.Tx] = (*Mempool)(nil)
24+
_ gossip.PushGossiperSet = (*Mempool)(nil)
2225

2326
ErrAlreadyKnown = errors.New("already known")
2427
ErrConflict = errors.New("conflict present")
@@ -298,9 +301,9 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error {
298301
return nil
299302
}
300303

301-
func (m *Mempool) GetFilter() ([]byte, []byte) {
304+
func (m *Mempool) BloomFilter() (*bloom.Filter, ids.ID) {
302305
m.lock.RLock()
303306
defer m.lock.RUnlock()
304307

305-
return m.bloom.Marshal()
308+
return m.bloom.BloomFilter()
306309
}

graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go

Lines changed: 14 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616

1717
"github.com/ava-labs/avalanchego/database/memdb"
1818
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic"
19-
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
2019
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest"
2120
"github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest"
2221
"github.com/ava-labs/avalanchego/ids"
@@ -27,6 +26,7 @@ import (
2726
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
2827
"github.com/ava-labs/avalanchego/snow/snowtest"
2928
"github.com/ava-labs/avalanchego/snow/validators"
29+
"github.com/ava-labs/avalanchego/utils/bloom"
3030
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
3131
"github.com/ava-labs/avalanchego/utils/logging"
3232
"github.com/ava-labs/avalanchego/utils/set"
@@ -113,31 +113,20 @@ func TestAtomicTxGossip(t *testing.T) {
113113
}
114114

115115
// Ask the VM for any new transactions. We should get nothing at first.
116-
emptyBloomFilter, err := gossip.NewBloomFilter(
117-
prometheus.NewRegistry(),
118-
"",
119-
config.TxGossipBloomMinTargetElements,
120-
config.TxGossipBloomTargetFalsePositiveRate,
121-
config.TxGossipBloomResetFalsePositiveRate,
116+
requestBytes, err := gossip.MarshalAppRequest(
117+
bloom.EmptyFilter.Marshal(),
118+
agoUtils.RandomBytes(32),
122119
)
123120
require.NoError(err)
124-
emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal()
125-
request := &sdk.PullGossipRequest{
126-
Filter: emptyBloomFilterBytes,
127-
Salt: agoUtils.RandomBytes(32),
128-
}
129-
130-
requestBytes, err := proto.Marshal(request)
131-
require.NoError(err)
132121

133122
wg := &sync.WaitGroup{}
134123
wg.Add(1)
135124
onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
136125
require.NoError(err)
137126

138-
response := &sdk.PullGossipResponse{}
139-
require.NoError(proto.Unmarshal(responseBytes, response))
140-
require.Empty(response.Gossip)
127+
responseGossip, err := gossip.ParseAppResponse(responseBytes)
128+
require.NoError(err)
129+
require.Empty(responseGossip)
141130
wg.Done()
142131
}
143132
require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse))
@@ -166,18 +155,17 @@ func TestAtomicTxGossip(t *testing.T) {
166155
// Ask the VM for new transactions. We should get the newly issued tx.
167156
wg.Add(1)
168157

169-
marshaller := atomic.TxMarshaller{}
170158
onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
171159
require.NoError(err)
172160

173-
response := &sdk.PullGossipResponse{}
174-
require.NoError(proto.Unmarshal(responseBytes, response))
175-
require.Len(response.Gossip, 1)
176-
177-
gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0])
161+
responseGossip, err := gossip.ParseAppResponse(responseBytes)
178162
require.NoError(err)
179-
require.Equal(tx.ID(), gotTx.GossipID())
180-
163+
require.Equal(
164+
[][]byte{
165+
tx.SignedBytes(),
166+
},
167+
responseGossip,
168+
)
181169
wg.Done()
182170
}
183171
require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse))

graft/coreth/plugin/evm/eth_gossiper.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,19 @@ import (
2121
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
2222
"github.com/ava-labs/avalanchego/ids"
2323
"github.com/ava-labs/avalanchego/network/p2p/gossip"
24+
"github.com/ava-labs/avalanchego/utils/bloom"
2425

2526
ethcommon "github.com/ava-labs/libevm/common"
2627
)
2728

2829
const pendingTxsBuffer = 10
2930

3031
var (
31-
_ gossip.Gossipable = (*GossipEthTx)(nil)
32-
_ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil)
33-
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)
32+
_ gossip.Gossipable = (*GossipEthTx)(nil)
33+
_ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil)
34+
_ gossip.HandlerSet[*GossipEthTx] = (*GossipEthTxPool)(nil)
35+
_ gossip.PullGossiperSet[*GossipEthTx] = (*GossipEthTxPool)(nil)
36+
_ gossip.PushGossiperSet = (*GossipEthTxPool)(nil)
3437

3538
_ eth.PushGossiper = (*EthPushGossiper)(nil)
3639
)
@@ -132,11 +135,11 @@ func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) {
132135
})
133136
}
134137

135-
func (g *GossipEthTxPool) GetFilter() ([]byte, []byte) {
138+
func (g *GossipEthTxPool) BloomFilter() (*bloom.Filter, ids.ID) {
136139
g.lock.RLock()
137140
defer g.lock.RUnlock()
138141

139-
return g.bloom.Marshal()
142+
return g.bloom.BloomFilter()
140143
}
141144

142145
type GossipEthTxMarshaller struct{}

graft/coreth/plugin/evm/gossip/handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ var _ p2p.Handler = (*txGossipHandler)(nil)
2222
func NewTxGossipHandler[T gossip.Gossipable](
2323
log logging.Logger,
2424
marshaller gossip.Marshaller[T],
25-
mempool gossip.Set[T],
25+
mempool gossip.HandlerSet[T],
2626
metrics gossip.Metrics,
2727
maxMessageSize int,
2828
throttlingPeriod time.Duration,

graft/coreth/plugin/evm/tx_gossip_test.go

Lines changed: 16 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ import (
1717
"google.golang.org/protobuf/proto"
1818

1919
"github.com/ava-labs/avalanchego/database/memdb"
20-
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
2120
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/upgrade/ap0"
2221
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest"
2322
"github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest"
@@ -29,6 +28,7 @@ import (
2928
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
3029
"github.com/ava-labs/avalanchego/snow/snowtest"
3130
"github.com/ava-labs/avalanchego/snow/validators"
31+
"github.com/ava-labs/avalanchego/utils/bloom"
3232
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
3333
"github.com/ava-labs/avalanchego/utils/logging"
3434
"github.com/ava-labs/avalanchego/utils/set"
@@ -107,31 +107,20 @@ func TestEthTxGossip(t *testing.T) {
107107
}
108108

109109
// Ask the VM for any new transactions. We should get nothing at first.
110-
emptyBloomFilter, err := gossip.NewBloomFilter(
111-
prometheus.NewRegistry(),
112-
"",
113-
config.TxGossipBloomMinTargetElements,
114-
config.TxGossipBloomTargetFalsePositiveRate,
115-
config.TxGossipBloomResetFalsePositiveRate,
110+
requestBytes, err := gossip.MarshalAppRequest(
111+
bloom.EmptyFilter.Marshal(),
112+
agoUtils.RandomBytes(32),
116113
)
117114
require.NoError(err)
118-
emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal()
119-
request := &sdk.PullGossipRequest{
120-
Filter: emptyBloomFilterBytes,
121-
Salt: agoUtils.RandomBytes(32),
122-
}
123-
124-
requestBytes, err := proto.Marshal(request)
125-
require.NoError(err)
126115

127116
wg := &sync.WaitGroup{}
128117
wg.Add(1)
129118
onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
130119
require.NoError(err)
131120

132-
response := &sdk.PullGossipResponse{}
133-
require.NoError(proto.Unmarshal(responseBytes, response))
134-
require.Empty(response.Gossip)
121+
response, err := gossip.ParseAppResponse(responseBytes)
122+
require.NoError(err)
123+
require.Empty(response)
135124
wg.Done()
136125
}
137126
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
@@ -151,19 +140,22 @@ func TestEthTxGossip(t *testing.T) {
151140
// wait so we aren't throttled by the vm
152141
time.Sleep(5 * time.Second)
153142

154-
marshaller := GossipEthTxMarshaller{}
155143
// Ask the VM for new transactions. We should get the newly issued tx.
156144
wg.Add(1)
157145
onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
158146
require.NoError(err)
159147

160-
response := &sdk.PullGossipResponse{}
161-
require.NoError(proto.Unmarshal(responseBytes, response))
162-
require.Len(response.Gossip, 1)
148+
response, err := gossip.ParseAppResponse(responseBytes)
149+
require.NoError(err)
163150

164-
gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0])
151+
txBytes, err := signedTx.MarshalBinary()
165152
require.NoError(err)
166-
require.Equal(signedTx.Hash(), gotTx.Tx.Hash())
153+
require.Equal(
154+
[][]byte{
155+
txBytes,
156+
},
157+
response,
158+
)
167159

168160
wg.Done()
169161
}

network/p2p/gossip/bloom.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ import (
1818
//
1919
// Invariant: The returned bloom filter is not safe to reset concurrently with
2020
// other operations. However, it is otherwise safe to access concurrently.
21+
//
22+
// Deprecated: [BloomSet] should be used to manage bloom filters.
2123
func NewBloomFilter(
2224
registerer prometheus.Registerer,
2325
namespace string,
@@ -45,6 +47,7 @@ func NewBloomFilter(
4547
return filter, err
4648
}
4749

50+
// Deprecated: [BloomSet] should be used to manage bloom filters.
4851
type BloomFilter struct {
4952
minTargetElements int
5053
targetFalsePositiveProbability float64
@@ -72,12 +75,8 @@ func (b *BloomFilter) Has(gossipable Gossipable) bool {
7275
return bloom.Contains(b.bloom, h[:], b.salt[:])
7376
}
7477

75-
func (b *BloomFilter) Marshal() ([]byte, []byte) {
76-
bloomBytes := b.bloom.Marshal()
77-
// salt must be copied here to ensure the bytes aren't overwritten if salt
78-
// is later modified.
79-
salt := b.salt
80-
return bloomBytes, salt[:]
78+
func (b *BloomFilter) BloomFilter() (*bloom.Filter, ids.ID) {
79+
return b.bloom, b.salt
8180
}
8281

8382
// ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability].
@@ -86,6 +85,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) {
8685
// the same [targetFalsePositiveProbability].
8786
//
8887
// Returns true if the bloom filter was reset.
88+
//
89+
// Deprecated: [BloomSet] should be used to manage bloom filters.
8990
func ResetBloomFilterIfNeeded(
9091
bloomFilter *BloomFilter,
9192
targetElements int,

network/p2p/gossip/bloom_test.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -77,19 +77,22 @@ func TestBloomFilterRefresh(t *testing.T) {
7777

7878
var resetCount uint64
7979
for _, item := range tt.add {
80-
bloomBytes, saltBytes := bloom.Marshal()
81-
initialBloomBytes := slices.Clone(bloomBytes)
82-
initialSaltBytes := slices.Clone(saltBytes)
80+
bloomFilter, salt := bloom.BloomFilter()
81+
initialBloomBytes := slices.Clone(bloomFilter.Marshal())
82+
initialSaltBytes := slices.Clone(salt[:])
8383

8484
reset, err := ResetBloomFilterIfNeeded(bloom, len(tt.add))
8585
require.NoError(err)
8686
if reset {
8787
resetCount++
8888
}
89-
bloom.Add(item)
89+
require.Equal(initialBloomBytes, bloomFilter.Marshal())
90+
require.Equal(initialSaltBytes, salt[:])
9091

91-
require.Equal(initialBloomBytes, bloomBytes)
92-
require.Equal(initialSaltBytes, saltBytes)
92+
// If the bloom filter wasn't reset, adding an item may modify
93+
// the returned bloom filter, so this must be done after the
94+
// checks.
95+
bloom.Add(item)
9396
}
9497

9598
require.Equal(tt.resetCount, resetCount)

0 commit comments

Comments
 (0)