Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ require (
github.com/DataDog/zstd v1.5.2
github.com/StephenButtolph/canoto v0.17.3
github.com/antithesishq/antithesis-sdk-go v0.3.8
github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201152440-a263a7cedcd0
github.com/ava-labs/avalanchego/graft/coreth v0.0.0-20251201173339-98b2978e465a
github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d
github.com/btcsuite/btcd/btcutil v1.1.3
github.com/cespare/xxhash/v2 v2.3.0
github.com/cockroachdb/pebble v0.0.0-20230928194634-aa077af62593
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2 h1:hQ15IJxY7WO
github.com/ava-labs/libevm v1.13.15-0.20251016142715-1bccf4f2ddb2/go.mod h1:DqSotSn4Dx/UJV+d3svfW8raR+cH7+Ohl9BpsQ5HlGU=
github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19 h1:S6oFasZsplNmw8B2S8cMJQMa62nT5ZKGzZRdCpd+5qQ=
github.com/ava-labs/simplex v0.0.0-20250919142550-9cdfff10fd19/go.mod h1:GVzumIo3zR23/qGRN2AdnVkIPHcKMq/D89EGWZfMGQ0=
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d h1:7pjEE0BXLjzQlq5uKP5B2BTl9jTgDKaOfJx2Qfb01Jo=
github.com/ava-labs/subnet-evm v0.8.1-0.20251124174652-9114d48a927d/go.mod h1:JTvIe8YbCjHpy8vy9uZBSpDXxawNXSnIe/Wlf3I09Tk=
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d h1:IlhCuTqhPEfpW+q/8ZlhmjflB/Onn9AhtXuRCRYa+oo=
github.com/ava-labs/subnet-evm v0.8.1-0.20251201175023-067762d6ce7d/go.mod h1:Hvl0SeW3Y/ZUgVQrfjzumterrF5T898YtkhDguq+pQA=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
Expand Down
9 changes: 6 additions & 3 deletions graft/coreth/plugin/evm/atomic/txpool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@ import (
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/utils/bloom"
)

var (
_ gossip.Set[*atomic.Tx] = (*Mempool)(nil)
_ gossip.HandlerSet[*atomic.Tx] = (*Mempool)(nil)
_ gossip.PullGossiperSet[*atomic.Tx] = (*Mempool)(nil)
_ gossip.PushGossiperSet = (*Mempool)(nil)

ErrAlreadyKnown = errors.New("already known")
ErrConflict = errors.New("conflict present")
Expand Down Expand Up @@ -298,9 +301,9 @@ func (m *Mempool) addTx(tx *atomic.Tx, local bool, force bool) error {
return nil
}

func (m *Mempool) GetFilter() ([]byte, []byte) {
func (m *Mempool) BloomFilter() (*bloom.Filter, ids.ID) {
m.lock.RLock()
defer m.lock.RUnlock()

return m.bloom.Marshal()
return m.bloom.BloomFilter()
}
40 changes: 14 additions & 26 deletions graft/coreth/plugin/evm/atomic/vm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/atomic"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest"
"github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest"
"github.com/ava-labs/avalanchego/ids"
Expand All @@ -27,6 +26,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand Down Expand Up @@ -113,31 +113,20 @@ func TestAtomicTxGossip(t *testing.T) {
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(
prometheus.NewRegistry(),
"",
config.TxGossipBloomMinTargetElements,
config.TxGossipBloomTargetFalsePositiveRate,
config.TxGossipBloomResetFalsePositiveRate,
requestBytes, err := gossip.MarshalAppRequest(
bloom.EmptyFilter.Marshal(),
agoUtils.RandomBytes(32),
)
require.NoError(err)
emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal()
request := &sdk.PullGossipRequest{
Filter: emptyBloomFilterBytes,
Salt: agoUtils.RandomBytes(32),
}

requestBytes, err := proto.Marshal(request)
require.NoError(err)

wg := &sync.WaitGroup{}
wg.Add(1)
onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Empty(response.Gossip)
responseGossip, err := gossip.ParseAppResponse(responseBytes)
require.NoError(err)
require.Empty(responseGossip)
wg.Done()
}
require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse))
Expand Down Expand Up @@ -166,18 +155,17 @@ func TestAtomicTxGossip(t *testing.T) {
// Ask the VM for new transactions. We should get the newly issued tx.
wg.Add(1)

marshaller := atomic.TxMarshaller{}
onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Len(response.Gossip, 1)

gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0])
responseGossip, err := gossip.ParseAppResponse(responseBytes)
require.NoError(err)
require.Equal(tx.ID(), gotTx.GossipID())

require.Equal(
[][]byte{
tx.SignedBytes(),
},
responseGossip,
)
wg.Done()
}
require.NoError(client.AppRequest(ctx, set.Of(vm.Ctx.NodeID), requestBytes, onResponse))
Expand Down
13 changes: 8 additions & 5 deletions graft/coreth/plugin/evm/eth_gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,19 @@ import (
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
"github.com/ava-labs/avalanchego/ids"
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/ava-labs/avalanchego/utils/bloom"

ethcommon "github.com/ava-labs/libevm/common"
)

const pendingTxsBuffer = 10

var (
_ gossip.Gossipable = (*GossipEthTx)(nil)
_ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil)
_ gossip.Set[*GossipEthTx] = (*GossipEthTxPool)(nil)
_ gossip.Gossipable = (*GossipEthTx)(nil)
_ gossip.Marshaller[*GossipEthTx] = (*GossipEthTxMarshaller)(nil)
_ gossip.HandlerSet[*GossipEthTx] = (*GossipEthTxPool)(nil)
_ gossip.PullGossiperSet[*GossipEthTx] = (*GossipEthTxPool)(nil)
_ gossip.PushGossiperSet = (*GossipEthTxPool)(nil)

_ eth.PushGossiper = (*EthPushGossiper)(nil)
)
Expand Down Expand Up @@ -132,11 +135,11 @@ func (g *GossipEthTxPool) Iterate(f func(tx *GossipEthTx) bool) {
})
}

func (g *GossipEthTxPool) GetFilter() ([]byte, []byte) {
func (g *GossipEthTxPool) BloomFilter() (*bloom.Filter, ids.ID) {
g.lock.RLock()
defer g.lock.RUnlock()

return g.bloom.Marshal()
return g.bloom.BloomFilter()
}

type GossipEthTxMarshaller struct{}
Expand Down
2 changes: 1 addition & 1 deletion graft/coreth/plugin/evm/gossip/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ var _ p2p.Handler = (*txGossipHandler)(nil)
func NewTxGossipHandler[T gossip.Gossipable](
log logging.Logger,
marshaller gossip.Marshaller[T],
mempool gossip.Set[T],
mempool gossip.HandlerSet[T],
metrics gossip.Metrics,
maxMessageSize int,
throttlingPeriod time.Duration,
Expand Down
40 changes: 16 additions & 24 deletions graft/coreth/plugin/evm/tx_gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"google.golang.org/protobuf/proto"

"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/config"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/upgrade/ap0"
"github.com/ava-labs/avalanchego/graft/coreth/plugin/evm/vmtest"
"github.com/ava-labs/avalanchego/graft/coreth/utils/utilstest"
Expand All @@ -29,6 +28,7 @@ import (
"github.com/ava-labs/avalanchego/snow/engine/enginetest"
"github.com/ava-labs/avalanchego/snow/snowtest"
"github.com/ava-labs/avalanchego/snow/validators"
"github.com/ava-labs/avalanchego/utils/bloom"
"github.com/ava-labs/avalanchego/utils/crypto/secp256k1"
"github.com/ava-labs/avalanchego/utils/logging"
"github.com/ava-labs/avalanchego/utils/set"
Expand Down Expand Up @@ -107,31 +107,20 @@ func TestEthTxGossip(t *testing.T) {
}

// Ask the VM for any new transactions. We should get nothing at first.
emptyBloomFilter, err := gossip.NewBloomFilter(
prometheus.NewRegistry(),
"",
config.TxGossipBloomMinTargetElements,
config.TxGossipBloomTargetFalsePositiveRate,
config.TxGossipBloomResetFalsePositiveRate,
requestBytes, err := gossip.MarshalAppRequest(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(out of scope) Just noting that these changes are identical to the atomic ones, which suggests there may be insufficient abstraction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah there is probably a nice test helper that should be made at some point. I really didn't want to refactor all of the testing here... Just wanted to change the tests as minimally as possible to work with the new interface

bloom.EmptyFilter.Marshal(),
agoUtils.RandomBytes(32),
)
require.NoError(err)
emptyBloomFilterBytes, _ := emptyBloomFilter.Marshal()
request := &sdk.PullGossipRequest{
Filter: emptyBloomFilterBytes,
Salt: agoUtils.RandomBytes(32),
}

requestBytes, err := proto.Marshal(request)
require.NoError(err)

wg := &sync.WaitGroup{}
wg.Add(1)
onResponse := func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Empty(response.Gossip)
response, err := gossip.ParseAppResponse(responseBytes)
require.NoError(err)
require.Empty(response)
wg.Done()
}
require.NoError(client.AppRequest(ctx, set.Of(vm.ctx.NodeID), requestBytes, onResponse))
Expand All @@ -151,19 +140,22 @@ func TestEthTxGossip(t *testing.T) {
// wait so we aren't throttled by the vm
time.Sleep(5 * time.Second)

marshaller := GossipEthTxMarshaller{}
// Ask the VM for new transactions. We should get the newly issued tx.
wg.Add(1)
onResponse = func(_ context.Context, _ ids.NodeID, responseBytes []byte, err error) {
require.NoError(err)

response := &sdk.PullGossipResponse{}
require.NoError(proto.Unmarshal(responseBytes, response))
require.Len(response.Gossip, 1)
response, err := gossip.ParseAppResponse(responseBytes)
require.NoError(err)

gotTx, err := marshaller.UnmarshalGossip(response.Gossip[0])
txBytes, err := signedTx.MarshalBinary()
require.NoError(err)
require.Equal(signedTx.Hash(), gotTx.Tx.Hash())
require.Equal(
[][]byte{
txBytes,
},
response,
)

wg.Done()
}
Expand Down
13 changes: 7 additions & 6 deletions network/p2p/gossip/bloom.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
//
// Invariant: The returned bloom filter is not safe to reset concurrently with
// other operations. However, it is otherwise safe to access concurrently.
//
// Deprecated: [BloomSet] should be used to manage bloom filters.
func NewBloomFilter(
registerer prometheus.Registerer,
namespace string,
Expand Down Expand Up @@ -45,6 +47,7 @@ func NewBloomFilter(
return filter, err
}

// Deprecated: [BloomSet] should be used to manage bloom filters.
type BloomFilter struct {
minTargetElements int
targetFalsePositiveProbability float64
Expand Down Expand Up @@ -72,12 +75,8 @@ func (b *BloomFilter) Has(gossipable Gossipable) bool {
return bloom.Contains(b.bloom, h[:], b.salt[:])
}

func (b *BloomFilter) Marshal() ([]byte, []byte) {
bloomBytes := b.bloom.Marshal()
// salt must be copied here to ensure the bytes aren't overwritten if salt
// is later modified.
salt := b.salt
return bloomBytes, salt[:]
func (b *BloomFilter) BloomFilter() (*bloom.Filter, ids.ID) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my understanding: why ids.ID? It seems like an odd overloading of the meaning of a type.

Copy link
Contributor

@joshua-kim joshua-kim Dec 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there was a reason in the old code - I think we just went with hash = ids.ID. It would probably make more sense to make a type alias for salt or just use plain []byte

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I returned an array rather than a slice to avoid needing to do the copy.. It could just be [32]byte... We just typically use ids.ID throughout the repo for these... We could probably clean it up later

return b.bloom, b.salt
}

// ResetBloomFilterIfNeeded resets a bloom filter if it breaches [targetFalsePositiveProbability].
Expand All @@ -86,6 +85,8 @@ func (b *BloomFilter) Marshal() ([]byte, []byte) {
// the same [targetFalsePositiveProbability].
//
// Returns true if the bloom filter was reset.
//
// Deprecated: [BloomSet] should be used to manage bloom filters.
func ResetBloomFilterIfNeeded(
bloomFilter *BloomFilter,
targetElements int,
Expand Down
15 changes: 9 additions & 6 deletions network/p2p/gossip/bloom_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,22 @@ func TestBloomFilterRefresh(t *testing.T) {

var resetCount uint64
for _, item := range tt.add {
bloomBytes, saltBytes := bloom.Marshal()
initialBloomBytes := slices.Clone(bloomBytes)
initialSaltBytes := slices.Clone(saltBytes)
bloomFilter, salt := bloom.BloomFilter()
initialBloomBytes := slices.Clone(bloomFilter.Marshal())
initialSaltBytes := slices.Clone(salt[:])

reset, err := ResetBloomFilterIfNeeded(bloom, len(tt.add))
require.NoError(err)
if reset {
resetCount++
}
bloom.Add(item)
require.Equal(initialBloomBytes, bloomFilter.Marshal())
require.Equal(initialSaltBytes, salt[:])

require.Equal(initialBloomBytes, bloomBytes)
require.Equal(initialSaltBytes, saltBytes)
// If the bloom filter wasn't reset, adding an item may modify
// the returned bloom filter, so this must be done after the
// checks.
bloom.Add(item)
}

require.Equal(tt.resetCount, resetCount)
Expand Down
Loading