From f46d4e6ca28a8bc7838c823556b225042cccb78e Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 17:45:06 +0200 Subject: [PATCH 1/8] txpool: introduce run method --- cmd/txpool/main.go | 36 ++-- cmd/utils/flags.go | 7 +- eth/backend.go | 84 ++++----- eth/ethconfig/config.go | 2 - eth/ethconfig/gen_config.go | 6 - turbo/stages/mock/mock_sentry.go | 37 +++- txnprovider/provider.go | 6 + txnprovider/txpool/fetch_test.go | 11 +- txnprovider/txpool/pool.go | 137 ++++++++++++-- txnprovider/txpool/pool_db.go | 74 ++++++++ txnprovider/txpool/send.go | 4 +- txnprovider/txpool/txpool_grpc_server.go | 8 +- .../txpool/txpoolutil/all_components.go | 174 ------------------ 13 files changed, 303 insertions(+), 283 deletions(-) delete mode 100644 txnprovider/txpool/txpoolutil/all_components.go diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index c476f29a2db..aff3aab4a12 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -28,6 +28,7 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" + "github.com/erigontech/erigon-lib/common/paths" "github.com/erigontech/erigon-lib/direct" "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" @@ -38,16 +39,13 @@ import ( "github.com/erigontech/erigon-lib/kv/remotedbserver" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cmd/rpcdaemon/rpcdaemontest" + "github.com/erigontech/erigon/cmd/utils" "github.com/erigontech/erigon/consensus/misc" "github.com/erigontech/erigon/ethdb/privateapi" - "github.com/erigontech/erigon/txnprovider/txpool" - "github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg" - "github.com/erigontech/erigon/txnprovider/txpool/txpoolutil" - - "github.com/erigontech/erigon-lib/common/paths" - "github.com/erigontech/erigon/cmd/utils" "github.com/erigontech/erigon/turbo/debug" "github.com/erigontech/erigon/turbo/logging" + "github.com/erigontech/erigon/txnprovider/txpool" + "github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg" ) var ( @@ -183,26 +181,32 @@ func doTxpool(ctx context.Context, logger log.Logger) error { cfg.TracedSenders[i] = string(sender[:]) } - newTxs := make(chan txpool.Announcements, 1024) - defer close(newTxs) - txPoolDB, txPool, fetch, send, txpoolGrpcServer, err := txpoolutil.AllComponents(ctx, cfg, - kvcache.New(cacheConfig), newTxs, coreDB, sentryClients, kvClient, misc.Eip1559FeeCalculator, logger) + notifyMiner := func() {} + txPool, txpoolGrpcServer, err := txpool.Assemble( + ctx, + cfg, + coreDB, + kvcache.New(cacheConfig), + sentryClients, + kvClient, + misc.Eip1559FeeCalculator, + notifyMiner, + logger, + ) if err != nil { return err } - defer txPoolDB.Close() - fetch.ConnectCore() - fetch.ConnectSentries() miningGrpcServer := privateapi.NewMiningServer(ctx, &rpcdaemontest.IsMiningMock{}, nil, logger) - grpcServer, err := txpool.StartGrpc(txpoolGrpcServer, miningGrpcServer, txpoolApiAddr, nil, logger) if err != nil { return err } - notifyMiner := func() {} - txpool.MainLoop(ctx, txPool, newTxs, send, txpoolGrpcServer.NewSlotsStreams, notifyMiner) + err = txPool.Run(ctx) + if err != nil { + return err + } grpcServer.GracefulStop() return nil diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index d1a9b1a882b..e4441767158 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -1562,6 +1562,9 @@ func setTxPool(ctx *cli.Context, dbDir string, fullCfg *ethconfig.Config) { if ctx.IsSet(DbWriteMapFlag.Name) { cfg.MdbxWriteMap = ctx.Bool(DbWriteMapFlag.Name) } + if ctx.IsSet(TxPoolGossipDisableFlag.Name) { + cfg.NoGossip = ctx.Bool(TxPoolGossipDisableFlag.Name) + } cfg.LogEvery = 3 * time.Minute cfg.CommitEvery = libcommon.RandomizeDuration(ctx.Duration(TxPoolCommitEveryFlag.Name)) cfg.DBDir = dbDir @@ -1985,10 +1988,6 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C if ctx.IsSet(TrustedSetupFile.Name) { libkzg.SetTrustedSetupFilePath(ctx.String(TrustedSetupFile.Name)) } - - if ctx.IsSet(TxPoolGossipDisableFlag.Name) { - cfg.DisableTxPoolGossip = ctx.Bool(TxPoolGossipDisableFlag.Name) - } } // SetDNSDiscoveryDefaults configures DNS discovery with the given URL if diff --git a/eth/backend.go b/eth/backend.go index 587eca7d4fd..079f127871b 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -134,7 +134,6 @@ import ( "github.com/erigontech/erigon/txnprovider/shutter" "github.com/erigontech/erigon/txnprovider/txpool" "github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg" - "github.com/erigontech/erigon/txnprovider/txpool/txpoolutil" ) // Config contains the configuration options of the ETH protocol. @@ -193,16 +192,12 @@ type Ethereum struct { waitForStageLoopStop chan struct{} waitForMiningStop chan struct{} - txPoolDB kv.RwDB - txPool *txpool.TxPool - newTxs chan txpool.Announcements - txPoolFetch *txpool.Fetch - txPoolSend *txpool.Send - txPoolGrpcServer txpoolproto.TxpoolServer - shutterPool *shutter.Pool - notifyMiningAboutNewTxs chan struct{} - forkValidator *engine_helpers.ForkValidator - downloader *downloader.Downloader + txPool *txpool.TxPool + txPoolGrpcServer txpoolproto.TxpoolServer + shutterPool *shutter.Pool + blockBuilderNotifyNewTxns chan struct{} + forkValidator *engine_helpers.ForkValidator + downloader *downloader.Downloader agg *libstate.Aggregator blockSnapshots *freezeblocks.RoSnapshots @@ -644,31 +639,45 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger return nil, err } - var txnProvider txnprovider.TxnProvider - config.TxPool.NoGossip = config.DisableTxPoolGossip - var miningRPC txpoolproto.MiningServer stateDiffClient := direct.NewStateDiffClientDirect(kvRPC) + var txnProvider txnprovider.TxnProvider if config.TxPool.Disable { backend.txPoolGrpcServer = &txpool.GrpcDisabled{} } else { - backend.newTxs = make(chan txpool.Announcements, 1024) - backend.txPoolDB, backend.txPool, backend.txPoolFetch, backend.txPoolSend, backend.txPoolGrpcServer, err = txpoolutil.AllComponents( - ctx, config.TxPool, kvcache.NewDummy(), backend.newTxs, backend.chainDB, backend.sentriesClient.Sentries(), stateDiffClient, misc.Eip1559FeeCalculator, logger, + sentries := backend.sentriesClient.Sentries() + blockBuilderNotifyNewTxns := func() { + select { + case backend.blockBuilderNotifyNewTxns <- struct{}{}: + default: + } + } + backend.txPool, backend.txPoolGrpcServer, err = txpool.Assemble( + ctx, + config.TxPool, + backend.chainDB, + kvcache.NewDummy(), + sentries, + stateDiffClient, + misc.Eip1559FeeCalculator, + blockBuilderNotifyNewTxns, + logger, ) if err != nil { return nil, err } + txnProvider = backend.txPool } if config.Shutter.Enabled { if config.TxPool.Disable { panic("can't enable shutter pool when devp2p txpool is disabled") } - backend.shutterPool = shutter.NewPool(logger, config.Shutter, txnProvider) + + backend.shutterPool = shutter.NewPool(logger, config.Shutter, backend.txPool) txnProvider = backend.shutterPool } - backend.notifyMiningAboutNewTxs = make(chan struct{}, 1) + backend.blockBuilderNotifyNewTxns = make(chan struct{}, 1) backend.miningSealingQuit = make(chan struct{}) backend.pendingBlocks = make(chan *types.Block, 1) backend.minedBlocks = make(chan *types.Block, 1) @@ -794,7 +803,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger agg.SetSnapshotBuildSema(blockSnapBuildSema) blockRetire := freezeblocks.NewBlockRetire(1, dirs, blockReader, blockWriter, backend.chainDB, heimdallStore, bridgeStore, backend.chainConfig, config, backend.notifications.Events, blockSnapBuildSema, logger) - miningRPC = privateapi.NewMiningServer(ctx, backend, ethashApi, logger) + var miningRPC txpoolproto.MiningServer = privateapi.NewMiningServer(ctx, backend, ethashApi, logger) var creds credentials.TransportCredentials if stack.Config().PrivateApiAddr != "" { @@ -824,26 +833,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger if currentBlock == nil { currentBlock = genesis } - // We start the transaction pool on startup, for a couple of reasons: - // 1) Hive tests requires us to do so and starting it from eth_sendRawTransaction is not viable as we have not enough data - // to initialize it properly. - // 2) we cannot propose for block 1 regardless. - - if !config.TxPool.Disable { - backend.txPoolFetch.ConnectCore() - backend.txPoolFetch.ConnectSentries() - var newTxsBroadcaster *txpool.NewSlotsStreams - if casted, ok := backend.txPoolGrpcServer.(*txpool.GrpcServer); ok { - newTxsBroadcaster = casted.NewSlotsStreams - } - go txpool.MainLoop(backend.sentryCtx, backend.txPool, backend.newTxs, backend.txPoolSend, newTxsBroadcaster, - func() { - select { - case backend.notifyMiningAboutNewTxs <- struct{}{}: - default: - } - }) - } go func() { defer debug.LogPanic() @@ -1283,8 +1272,8 @@ func (s *Ethereum) StartMining(ctx context.Context, db kv.RwDB, stateDiffClient // block info in the state channel hasWork = true - case <-s.notifyMiningAboutNewTxs: - //log.Warn("[dbg] notifyMiningAboutNewTxs") + case <-s.blockBuilderNotifyNewTxns: + //log.Warn("[dbg] blockBuilderNotifyNewTxns") // Skip mining based on new txn notif for bor consensus hasWork = s.chainConfig.Bor == nil @@ -1601,6 +1590,14 @@ func (s *Ethereum) Start() error { } } + if s.txPool != nil { + // We start the transaction pool on startup, for a couple of reasons: + // 1) Hive tests requires us to do so and starting it from eth_sendRawTransaction is not viable as we have not enough data + // to initialize it properly. + // 2) we cannot propose for block 1 regardless. + s.bgComponentsEg.Go(func() error { return s.txPool.Run(s.sentryCtx) }) + } + if s.shutterPool != nil { s.bgComponentsEg.Go(func() error { return s.shutterPool.Run(s.sentryCtx) }) } @@ -1642,9 +1639,6 @@ func (s *Ethereum) Stop() error { for _, sentryServer := range s.sentryServers { sentryServer.Close() } - if s.txPoolDB != nil { - s.txPoolDB.Close() - } if s.agg != nil { s.agg.Close() } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 8ea5f2bbab1..f762a043602 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -257,8 +257,6 @@ type Config struct { SilkwormRpcLogDumpResponse bool SilkwormRpcNumWorkers uint32 SilkwormRpcJsonCompatibility bool - - DisableTxPoolGossip bool } type Sync struct { diff --git a/eth/ethconfig/gen_config.go b/eth/ethconfig/gen_config.go index 94abe593f55..7a5d015e379 100644 --- a/eth/ethconfig/gen_config.go +++ b/eth/ethconfig/gen_config.go @@ -67,7 +67,6 @@ func (c Config) MarshalTOML() (interface{}, error) { SilkwormRpcLogDumpResponse bool SilkwormRpcNumWorkers uint32 SilkwormRpcJsonCompatibility bool - DisableTxPoolGossip bool } var enc Config enc.Genesis = c.Genesis @@ -114,7 +113,6 @@ func (c Config) MarshalTOML() (interface{}, error) { enc.SilkwormRpcLogDumpResponse = c.SilkwormRpcLogDumpResponse enc.SilkwormRpcNumWorkers = c.SilkwormRpcNumWorkers enc.SilkwormRpcJsonCompatibility = c.SilkwormRpcJsonCompatibility - enc.DisableTxPoolGossip = c.DisableTxPoolGossip return &enc, nil } @@ -165,7 +163,6 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { SilkwormRpcLogDumpResponse *bool SilkwormRpcNumWorkers *uint32 SilkwormRpcJsonCompatibility *bool - DisableTxPoolGossip *bool } var dec Config if err := unmarshal(&dec); err != nil { @@ -303,8 +300,5 @@ func (c *Config) UnmarshalTOML(unmarshal func(interface{}) error) error { if dec.SilkwormRpcJsonCompatibility != nil { c.SilkwormRpcJsonCompatibility = *dec.SilkwormRpcJsonCompatibility } - if dec.DisableTxPoolGossip != nil { - c.DisableTxPoolGossip = *dec.DisableTxPoolGossip - } return nil } diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 3be2cb7f03a..2aea4677c37 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -30,7 +30,9 @@ import ( "github.com/c2h5oh/datasize" lru "github.com/hashicorp/golang-lru/arc/v2" "github.com/holiman/uint256" + "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" + "golang.org/x/sync/errgroup" "golang.org/x/sync/semaphore" "google.golang.org/protobuf/types/known/emptypb" @@ -137,13 +139,11 @@ type MockSentry struct { BlockReader services.FullBlockReader ReceiptsReader *receipts.Generator posStagedSync *stagedsync.Sync + bgComponentsEg errgroup.Group } func (ms *MockSentry) Close() { ms.cancel() - if ms.txPoolDB != nil { - ms.txPoolDB.Close() - } if ms.Engine != nil { ms.Engine.Close() } @@ -156,6 +156,8 @@ func (ms *MockSentry) Close() { if ms.DB != nil { ms.DB.Close() } + err := ms.bgComponentsEg.Wait() + require.Equal(ms.tb, err, context.Canceled) } // Stream returns stream, waiting if necessary @@ -345,24 +347,41 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK pragueTime := mock.ChainConfig.PragueTime maxBlobsPerBlock := mock.ChainConfig.GetMaxBlobsPerBlock() mock.txPoolDB = memdb.NewWithLabel(tmpdir, kv.TxPoolDB) - mock.TxPool, err = txpool.New(newTxs, mock.txPoolDB, mock.DB, poolCfg, kvcache.NewDummy(), *chainID, shanghaiTime, nil /* agraBlock */, cancunTime, pragueTime, maxBlobsPerBlock, nil, logger) + stateChangesClient := direct.NewStateDiffClientDirect(erigonGrpcServeer) + newSlotsStreams := &txpool.NewSlotsStreams{} + mock.TxPool, err = txpool.New( + ctx, + newTxs, + mock.txPoolDB, + mock.DB, poolCfg, + kvcache.NewDummy(), + *chainID, shanghaiTime, + nil, /* agraBlock */ + cancunTime, + pragueTime, + maxBlobsPerBlock, + nil, /* feeCalculator */ + sentries, + stateChangesClient, + func() {}, /* builderNotifyNewTxns */ + newSlotsStreams, + logger, + ) if err != nil { tb.Fatal(err) } - stateChangesClient := direct.NewStateDiffClientDirect(erigonGrpcServeer) - mock.TxPoolFetch = txpool.NewFetch(mock.Ctx, sentries, mock.TxPool, stateChangesClient, mock.DB, mock.txPoolDB, *chainID, logger) mock.TxPoolFetch.SetWaitGroup(&mock.ReceiveWg) - mock.TxPoolSend = txpool.NewSend(mock.Ctx, sentries, mock.TxPool, logger) - mock.TxPoolGrpcServer = txpool.NewGrpcServer(mock.Ctx, mock.TxPool, mock.txPoolDB, *chainID, logger) + mock.TxPoolSend = txpool.NewSend(mock.Ctx, sentries, logger) + mock.TxPoolGrpcServer = txpool.NewGrpcServer(mock.Ctx, mock.TxPool, mock.txPoolDB, newSlotsStreams, *chainID, logger) mock.TxPoolFetch.ConnectCore() mock.StreamWg.Add(1) mock.TxPoolFetch.ConnectSentries() mock.StreamWg.Wait() - go txpool.MainLoop(mock.Ctx, mock.TxPool, newTxs, mock.TxPoolSend, mock.TxPoolGrpcServer.NewSlotsStreams, func() {}) + mock.bgComponentsEg.Go(func() error { return mock.TxPool.Run(ctx) }) } // Committed genesis will be shared between download and mock sentry diff --git a/txnprovider/provider.go b/txnprovider/provider.go index eff293a7996..036239e9ac3 100644 --- a/txnprovider/provider.go +++ b/txnprovider/provider.go @@ -26,6 +26,12 @@ import ( ) type TxnProvider interface { + // ProvideTxns provides transactions ready to be included in a block for block building. Available request options: + // - WithParentBlockNum + // - WithAmount + // - WithGasTarget + // - WithBlobGasTarget + // - WithTxnIdsFilter ProvideTxns(ctx context.Context, opts ...ProvideOption) ([]types.Transaction, error) } diff --git a/txnprovider/txpool/fetch_test.go b/txnprovider/txpool/fetch_test.go index 441404fa743..0ca827a859e 100644 --- a/txnprovider/txpool/fetch_test.go +++ b/txnprovider/txpool/fetch_test.go @@ -24,13 +24,14 @@ import ( "sync" "testing" - "github.com/erigontech/erigon-lib/kv" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/common/u256" "github.com/erigontech/erigon-lib/direct" "github.com/erigontech/erigon-lib/gointerfaces" @@ -102,7 +103,7 @@ func TestSendTxPropagate(t *testing.T) { }).AnyTimes() m := NewMockSentry(ctx, sentryServer) - send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New()) + send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, log.New()) send.BroadcastPooledTxns(testRlps(2), 100) send.AnnouncePooledTxns([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100) @@ -132,7 +133,7 @@ func TestSendTxPropagate(t *testing.T) { Times(times) m := NewMockSentry(ctx, sentryServer) - send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New()) + send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, log.New()) list := make(Hashes, p2pTxPacketLimit*3) for i := 0; i < len(list); i += 32 { b := []byte(fmt.Sprintf("%x", i)) @@ -167,7 +168,7 @@ func TestSendTxPropagate(t *testing.T) { Times(times) m := NewMockSentry(ctx, sentryServer) - send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New()) + send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, log.New()) send.BroadcastPooledTxns(testRlps(2), 100) send.AnnouncePooledTxns([]byte{0, 1}, []uint32{10, 15}, toHashes(1, 42), 100) @@ -207,7 +208,7 @@ func TestSendTxPropagate(t *testing.T) { }).AnyTimes() m := NewMockSentry(ctx, sentryServer) - send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, nil, log.New()) + send := NewSend(ctx, []sentryproto.SentryClient{direct.NewSentryClientDirect(direct.ETH68, m)}, log.New()) expectPeers := toPeerIDs(1, 2, 42) send.PropagatePooledTxnsToPeersList(expectPeers, []byte{0, 1}, []uint32{10, 15}, toHashes(1, 42)) diff --git a/txnprovider/txpool/pool.go b/txnprovider/txpool/pool.go index 2402f0de5f2..4b81822ca83 100644 --- a/txnprovider/txpool/pool.go +++ b/txnprovider/txpool/pool.go @@ -30,6 +30,7 @@ import ( "sync/atomic" "time" + "github.com/c2h5oh/datasize" gokzg4844 "github.com/crate-crypto/go-kzg-4844" mapset "github.com/deckarep/golang-set/v2" "github.com/go-stack/stack" @@ -48,6 +49,7 @@ import ( "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" + sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/kvcache" @@ -144,6 +146,10 @@ type TxPool struct { isPostPrague atomic.Bool maxBlobsPerBlock uint64 feeCalculator FeeCalculator + p2pFetcher *Fetch + p2pSender *Send + newSlotsStreams *NewSlotsStreams + builderNotifyNewTxns func() logger log.Logger } @@ -151,7 +157,92 @@ type FeeCalculator interface { CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice, blockGasLimit uint64, err error) } +func Assemble( + ctx context.Context, + cfg txpoolcfg.Config, + chainDB kv.RwDB, + cache kvcache.Cache, + sentryClients []sentry.SentryClient, + stateChangesClient StateChangesClient, + feeCalculator FeeCalculator, + builderNotifyNewTxns func(), + logger log.Logger, +) (*TxPool, txpoolproto.TxpoolServer, error) { + opts := mdbx.New(kv.TxPoolDB, logger).Path(cfg.DBDir). + WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). + WriteMergeThreshold(3 * 8192). + PageSize(16 * datasize.KB). + GrowthStep(16 * datasize.MB). + DirtySpace(uint64(128 * datasize.MB)). + MapSize(1 * datasize.TB). + WriteMap(cfg.MdbxWriteMap) + + if cfg.MdbxPageSize > 0 { + opts = opts.PageSize(cfg.MdbxPageSize) + } + if cfg.MdbxDBSizeLimit > 0 { + opts = opts.MapSize(cfg.MdbxDBSizeLimit) + } + if cfg.MdbxGrowthStep > 0 { + opts = opts.GrowthStep(cfg.MdbxGrowthStep) + } + + poolDB, err := opts.Open(ctx) + if err != nil { + return nil, nil, err + } + + chainConfig, _, err := SaveChainConfigIfNeed(ctx, chainDB, poolDB, true, logger) + if err != nil { + return nil, nil, err + } + + chainID, _ := uint256.FromBig(chainConfig.ChainID) + maxBlobsPerBlock := chainConfig.GetMaxBlobsPerBlock() + + shanghaiTime := chainConfig.ShanghaiTime + var agraBlock *big.Int + if chainConfig.Bor != nil { + agraBlock = chainConfig.Bor.GetAgraBlock() + } + cancunTime := chainConfig.CancunTime + pragueTime := chainConfig.PragueTime + if cfg.OverridePragueTime != nil { + pragueTime = cfg.OverridePragueTime + } + + newTxns := make(chan Announcements, 1024) + newSlotsStreams := &NewSlotsStreams{} + pool, err := New( + ctx, + newTxns, + poolDB, + chainDB, + cfg, + cache, + *chainID, + shanghaiTime, + agraBlock, + cancunTime, + pragueTime, + maxBlobsPerBlock, + feeCalculator, + sentryClients, + stateChangesClient, + builderNotifyNewTxns, + newSlotsStreams, + logger, + ) + if err != nil { + return nil, nil, err + } + + grpcServer := NewGrpcServer(ctx, pool, poolDB, newSlotsStreams, *chainID, logger) + return pool, grpcServer, nil +} + func New( + ctx context.Context, newTxns chan Announcements, poolDB kv.RwDB, coreDB kv.RoDB, @@ -164,6 +255,10 @@ func New( pragueTime *big.Int, maxBlobsPerBlock uint64, feeCalculator FeeCalculator, + sentryClients []sentry.SentryClient, + stateChangesClient StateChangesClient, + builderNotifyNewTxns func(), + newSlotsStreams *NewSlotsStreams, logger log.Logger, ) (*TxPool, error) { localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil) @@ -212,6 +307,8 @@ func New( minedBlobTxnsByHash: map[string]*metaTxn{}, maxBlobsPerBlock: maxBlobsPerBlock, feeCalculator: feeCalculator, + builderNotifyNewTxns: builderNotifyNewTxns, + newSlotsStreams: newSlotsStreams, logger: logger, } @@ -244,6 +341,8 @@ func New( res.pragueTime = &pragueTimeU64 } + res.p2pFetcher = NewFetch(ctx, sentryClients, res, stateChangesClient, coreDB, poolDB, chainID, logger) + res.p2pSender = NewSend(ctx, sentryClients, logger) return res, nil } @@ -1731,7 +1830,7 @@ func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcem } } -// MainLoop - does: +// Run - does: // send pending byHash to p2p: // - new byHash // - all pooled byHash to recently connected peers @@ -1739,7 +1838,11 @@ func (p *TxPool) promote(pendingBaseFee uint64, pendingBlobFee uint64, announcem // // promote/demote transactions // reorgs -func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send *Send, newSlotsStreams *NewSlotsStreams, notifyMiningAboutNewSlots func()) { +func (p *TxPool) Run(ctx context.Context) error { + defer p.poolDB.Close() + p.p2pFetcher.ConnectCore() + p.p2pFetcher.ConnectSentries() + syncToNewPeersEvery := time.NewTicker(p.cfg.SyncToNewPeersEvery) defer syncToNewPeersEvery.Stop() processRemoteTxnsEvery := time.NewTicker(p.cfg.ProcessRemoteTxnsEvery) @@ -1751,14 +1854,18 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * if err := p.Start(ctx); err != nil { p.logger.Error("[txpool] Failed to start", "err", err) - return + return err } for { select { case <-ctx.Done(): - _, _ = p.flush(ctx) - return + err := ctx.Err() + _, flushErr := p.flush(ctx) + if flushErr != nil { + err = fmt.Errorf("%w: %w", flushErr, err) + } + return err case <-logEvery.C: p.logStats() case <-processRemoteTxnsEvery.C: @@ -1785,11 +1892,11 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * writeToDBBytesCounter.SetUint64(written) p.logger.Debug("[txpool] Commit", "written_kb", written/1024, "in", time.Since(t)) } - case announcements := <-newTxns: + case announcements := <-p.newPendingTxns: go func() { for i := 0; i < 16; i++ { // drain more events from channel, then merge and dedup them select { - case a := <-newTxns: + case a := <-p.newPendingTxns: announcements.AppendOther(a) continue default: @@ -1803,7 +1910,7 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * announcements = announcements.DedupCopy() - notifyMiningAboutNewSlots() + p.builderNotifyNewTxns() if p.cfg.NoGossip { // drain newTxns for emptying newTxn channel @@ -1868,17 +1975,17 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * p.logger.Error("[txpool] collect info to propagate", "err", err) return } - if newSlotsStreams != nil { - newSlotsStreams.Broadcast(&txpoolproto.OnAddReply{RplTxs: slotsRlp}, p.logger) + if p.newSlotsStreams != nil { + p.newSlotsStreams.Broadcast(&txpoolproto.OnAddReply{RplTxs: slotsRlp}, p.logger) } // broadcast local transactions const localTxnsBroadcastMaxPeers uint64 = 10 - txnSentTo := send.BroadcastPooledTxns(localTxnRlps, localTxnsBroadcastMaxPeers) + txnSentTo := p.p2pSender.BroadcastPooledTxns(localTxnRlps, localTxnsBroadcastMaxPeers) for i, peer := range txnSentTo { p.logger.Trace("Local txn broadcast", "txHash", hex.EncodeToString(broadcastHashes.At(i)), "to peer", peer) } - hashSentTo := send.AnnouncePooledTxns(localTxnTypes, localTxnSizes, localTxnHashes, localTxnsBroadcastMaxPeers*2) + hashSentTo := p.p2pSender.AnnouncePooledTxns(localTxnTypes, localTxnSizes, localTxnHashes, localTxnsBroadcastMaxPeers*2) for i := 0; i < localTxnHashes.Len(); i++ { hash := localTxnHashes.At(i) p.logger.Trace("Local txn announced", "txHash", hex.EncodeToString(hash), "to peer", hashSentTo[i], "baseFee", p.pendingBaseFee.Load()) @@ -1886,8 +1993,8 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * // broadcast remote transactions const remoteTxnsBroadcastMaxPeers uint64 = 3 - send.BroadcastPooledTxns(remoteTxnRlps, remoteTxnsBroadcastMaxPeers) - send.AnnouncePooledTxns(remoteTxnTypes, remoteTxnSizes, remoteTxnHashes, remoteTxnsBroadcastMaxPeers*2) + p.p2pSender.BroadcastPooledTxns(remoteTxnRlps, remoteTxnsBroadcastMaxPeers) + p.p2pSender.AnnouncePooledTxns(remoteTxnTypes, remoteTxnSizes, remoteTxnHashes, remoteTxnsBroadcastMaxPeers*2) }() case <-syncToNewPeersEvery.C: // new peer newPeers := p.recentlyConnectedPeers.GetAndClean() @@ -1904,7 +2011,7 @@ func MainLoop(ctx context.Context, p *TxPool, newTxns chan Announcements, send * var types []byte var sizes []uint32 types, sizes, hashes = p.AppendAllAnnouncements(types, sizes, hashes[:0]) - go send.PropagatePooledTxnsToPeersList(newPeers, types, sizes, hashes) + go p.p2pSender.PropagatePooledTxnsToPeersList(newPeers, types, sizes, hashes) propagateToNewPeerTimer.ObserveDuration(t) } } diff --git a/txnprovider/txpool/pool_db.go b/txnprovider/txpool/pool_db.go index 4c81a71faae..f180826d4fc 100644 --- a/txnprovider/txpool/pool_db.go +++ b/txnprovider/txpool/pool_db.go @@ -18,13 +18,17 @@ package txpool import ( "bytes" + "context" "encoding/binary" "encoding/json" + "errors" "fmt" + "time" "github.com/erigontech/erigon-lib/chain" "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" ) var PoolChainConfigKey = []byte("chain_config") @@ -96,3 +100,73 @@ func PutChainConfig(tx kv.Putter, cc *chain.Config, buf []byte) error { } return nil } + +func SaveChainConfigIfNeed( + ctx context.Context, + coreDB kv.RoDB, + poolDB kv.RwDB, + force bool, + logger log.Logger, +) (cc *chain.Config, blockNum uint64, err error) { + if err = poolDB.View(ctx, func(tx kv.Tx) error { + cc, err = ChainConfig(tx) + if err != nil { + return err + } + blockNum, err = LastSeenBlock(tx) + if err != nil { + return err + } + return nil + }); err != nil { + return nil, 0, err + } + if cc != nil && !force { + if cc.ChainID.Uint64() == 0 { + return nil, 0, errors.New("wrong chain config") + } + return cc, blockNum, nil + } + + for { + if err = coreDB.View(ctx, func(tx kv.Tx) error { + cc, err = chain.GetConfig(tx, nil) + if err != nil { + return err + } + n, err := chain.CurrentBlockNumber(tx) + if err != nil { + return err + } + if n != nil { + blockNum = *n + } + return nil + }); err != nil { + logger.Error("cant read chain config from core db", "err", err) + time.Sleep(5 * time.Second) + continue + } else if cc == nil { + logger.Error("cant read chain config from core db") + time.Sleep(5 * time.Second) + continue + } + break + } + + if err = poolDB.Update(ctx, func(tx kv.RwTx) error { + if err = PutChainConfig(tx, cc, nil); err != nil { + return err + } + if err = PutLastSeenBlock(tx, blockNum, nil); err != nil { + return err + } + return nil + }); err != nil { + return nil, 0, err + } + if cc.ChainID.Uint64() == 0 { + return nil, 0, errors.New("wrong chain config") + } + return cc, blockNum, nil +} diff --git a/txnprovider/txpool/send.go b/txnprovider/txpool/send.go index 38c72d5301b..97271d3a846 100644 --- a/txnprovider/txpool/send.go +++ b/txnprovider/txpool/send.go @@ -35,16 +35,14 @@ import ( // does not initiate any messages by self type Send struct { ctx context.Context - pool Pool wg *sync.WaitGroup sentryClients []sentryproto.SentryClient // sentry clients that will be used for accessing the network logger log.Logger } -func NewSend(ctx context.Context, sentryClients []sentryproto.SentryClient, pool Pool, logger log.Logger) *Send { +func NewSend(ctx context.Context, sentryClients []sentryproto.SentryClient, logger log.Logger) *Send { return &Send{ ctx: ctx, - pool: pool, sentryClients: sentryClients, logger: logger, } diff --git a/txnprovider/txpool/txpool_grpc_server.go b/txnprovider/txpool/txpool_grpc_server.go index 4b384075444..72fdb977007 100644 --- a/txnprovider/txpool/txpool_grpc_server.go +++ b/txnprovider/txpool/txpool_grpc_server.go @@ -103,14 +103,14 @@ type GrpcServer struct { ctx context.Context txPool txPool db kv.RoDB - NewSlotsStreams *NewSlotsStreams + newSlotsStreams *NewSlotsStreams chainID uint256.Int logger log.Logger } -func NewGrpcServer(ctx context.Context, txPool txPool, db kv.RoDB, chainID uint256.Int, logger log.Logger) *GrpcServer { - return &GrpcServer{ctx: ctx, txPool: txPool, db: db, NewSlotsStreams: &NewSlotsStreams{}, chainID: chainID, logger: logger} +func NewGrpcServer(ctx context.Context, txPool txPool, db kv.RoDB, newSlotsStreams *NewSlotsStreams, chainID uint256.Int, logger log.Logger) *GrpcServer { + return &GrpcServer{ctx: ctx, txPool: txPool, db: db, newSlotsStreams: newSlotsStreams, chainID: chainID, logger: logger} } func (s *GrpcServer) Version(context.Context, *emptypb.Empty) (*typesproto.VersionReply, error) { @@ -248,7 +248,7 @@ func mapDiscardReasonToProto(reason txpoolcfg.DiscardReason) txpool_proto.Import func (s *GrpcServer) OnAdd(req *txpool_proto.OnAddRequest, stream txpool_proto.Txpool_OnAddServer) error { s.logger.Info("New txns subscriber joined") //txpool.Loop does send messages to this streams - remove := s.NewSlotsStreams.Add(stream) + remove := s.newSlotsStreams.Add(stream) defer remove() select { case <-stream.Context().Done(): diff --git a/txnprovider/txpool/txpoolutil/all_components.go b/txnprovider/txpool/txpoolutil/all_components.go deleted file mode 100644 index f4870598354..00000000000 --- a/txnprovider/txpool/txpoolutil/all_components.go +++ /dev/null @@ -1,174 +0,0 @@ -// Copyright 2021 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package txpoolutil - -import ( - "context" - "errors" - "math/big" - "time" - - "github.com/c2h5oh/datasize" - "github.com/holiman/uint256" - - "github.com/erigontech/erigon-lib/chain" - "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/kvcache" - "github.com/erigontech/erigon-lib/kv/mdbx" - "github.com/erigontech/erigon-lib/log/v3" - "github.com/erigontech/erigon/txnprovider/txpool" - "github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg" -) - -func SaveChainConfigIfNeed(ctx context.Context, coreDB kv.RoDB, txPoolDB kv.RwDB, force bool, logger log.Logger) (cc *chain.Config, blockNum uint64, err error) { - if err = txPoolDB.View(ctx, func(tx kv.Tx) error { - cc, err = txpool.ChainConfig(tx) - if err != nil { - return err - } - blockNum, err = txpool.LastSeenBlock(tx) - if err != nil { - return err - } - return nil - }); err != nil { - return nil, 0, err - } - if cc != nil && !force { - if cc.ChainID.Uint64() == 0 { - return nil, 0, errors.New("wrong chain config") - } - return cc, blockNum, nil - } - - for { - if err = coreDB.View(ctx, func(tx kv.Tx) error { - cc, err = chain.GetConfig(tx, nil) - if err != nil { - return err - } - n, err := chain.CurrentBlockNumber(tx) - if err != nil { - return err - } - if n != nil { - blockNum = *n - } - return nil - }); err != nil { - logger.Error("cant read chain config from core db", "err", err) - time.Sleep(5 * time.Second) - continue - } else if cc == nil { - logger.Error("cant read chain config from core db") - time.Sleep(5 * time.Second) - continue - } - break - } - - if err = txPoolDB.Update(ctx, func(tx kv.RwTx) error { - if err = txpool.PutChainConfig(tx, cc, nil); err != nil { - return err - } - if err = txpool.PutLastSeenBlock(tx, blockNum, nil); err != nil { - return err - } - return nil - }); err != nil { - return nil, 0, err - } - if cc.ChainID.Uint64() == 0 { - return nil, 0, errors.New("wrong chain config") - } - return cc, blockNum, nil -} - -func AllComponents(ctx context.Context, cfg txpoolcfg.Config, cache kvcache.Cache, newTxns chan txpool.Announcements, chainDB kv.RoDB, - sentryClients []sentryproto.SentryClient, stateChangesClient txpool.StateChangesClient, feeCalculator txpool.FeeCalculator, logger log.Logger) (kv.RwDB, *txpool.TxPool, *txpool.Fetch, *txpool.Send, *txpool.GrpcServer, error) { - opts := mdbx.New(kv.TxPoolDB, logger).Path(cfg.DBDir). - WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). - WriteMergeThreshold(3 * 8192). - PageSize(16 * datasize.KB). - GrowthStep(16 * datasize.MB). - DirtySpace(uint64(128 * datasize.MB)). - MapSize(1 * datasize.TB). - WriteMap(cfg.MdbxWriteMap) - - if cfg.MdbxPageSize > 0 { - opts = opts.PageSize(cfg.MdbxPageSize) - } - if cfg.MdbxDBSizeLimit > 0 { - opts = opts.MapSize(cfg.MdbxDBSizeLimit) - } - if cfg.MdbxGrowthStep > 0 { - opts = opts.GrowthStep(cfg.MdbxGrowthStep) - } - - txPoolDB, err := opts.Open(ctx) - - if err != nil { - return nil, nil, nil, nil, nil, err - } - - chainConfig, _, err := SaveChainConfigIfNeed(ctx, chainDB, txPoolDB, true, logger) - if err != nil { - return nil, nil, nil, nil, nil, err - } - - chainID, _ := uint256.FromBig(chainConfig.ChainID) - maxBlobsPerBlock := chainConfig.GetMaxBlobsPerBlock() - - shanghaiTime := chainConfig.ShanghaiTime - var agraBlock *big.Int - if chainConfig.Bor != nil { - agraBlock = chainConfig.Bor.GetAgraBlock() - } - cancunTime := chainConfig.CancunTime - pragueTime := chainConfig.PragueTime - if cfg.OverridePragueTime != nil { - pragueTime = cfg.OverridePragueTime - } - - txPool, err := txpool.New( - newTxns, - txPoolDB, - chainDB, - cfg, - cache, - *chainID, - shanghaiTime, - agraBlock, - cancunTime, - pragueTime, - maxBlobsPerBlock, - feeCalculator, - logger, - ) - if err != nil { - return nil, nil, nil, nil, nil, err - } - - fetch := txpool.NewFetch(ctx, sentryClients, txPool, stateChangesClient, chainDB, txPoolDB, *chainID, logger) - //fetch.ConnectCore() - //fetch.ConnectSentries() - - send := txpool.NewSend(ctx, sentryClients, txPool, logger) - txpoolGrpcServer := txpool.NewGrpcServer(ctx, txPool, txPoolDB, *chainID, logger) - return txPoolDB, txPool, fetch, send, txpoolGrpcServer, nil -} From d8d53ec2d4339afe90eb784132d6c22b3e962155 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 18:18:20 +0200 Subject: [PATCH 2/8] fix tests --- txnprovider/txpool/pool.go | 4 +- txnprovider/txpool/pool_fuzz_test.go | 6 +-- txnprovider/txpool/pool_test.go | 64 ++++++++++++++-------------- 3 files changed, 37 insertions(+), 37 deletions(-) diff --git a/txnprovider/txpool/pool.go b/txnprovider/txpool/pool.go index 4b81822ca83..cae6ac110e4 100644 --- a/txnprovider/txpool/pool.go +++ b/txnprovider/txpool/pool.go @@ -346,7 +346,7 @@ func New( return res, nil } -func (p *TxPool) Start(ctx context.Context) error { +func (p *TxPool) start(ctx context.Context) error { if p.started.Load() { return nil } @@ -1852,7 +1852,7 @@ func (p *TxPool) Run(ctx context.Context) error { logEvery := time.NewTicker(p.cfg.LogEvery) defer logEvery.Stop() - if err := p.Start(ctx); err != nil { + if err := p.start(ctx); err != nil { p.logger.Error("[txpool] Failed to start", "err", err) return err } diff --git a/txnprovider/txpool/pool_fuzz_test.go b/txnprovider/txpool/pool_fuzz_test.go index 3beb109c7db..b557e40900f 100644 --- a/txnprovider/txpool/pool_fuzz_test.go +++ b/txnprovider/txpool/pool_fuzz_test.go @@ -325,10 +325,10 @@ func FuzzOnNewBlocks(f *testing.F) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) - err = pool.Start(ctx) + err = pool.start(ctx) assert.NoError(err) pool.senders.senderIDs = senderIDs @@ -551,7 +551,7 @@ func FuzzOnNewBlocks(f *testing.F) { check(p2pReceived, TxnSlots{}, "after_flush") checkNotify(p2pReceived, TxnSlots{}, "after_flush") - p2, err := New(ch, db, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + p2, err := New(ctx, ch, db, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) p2.senders = pool.senders // senders are not persisted diff --git a/txnprovider/txpool/pool_test.go b/txnprovider/txpool/pool_test.go index 3d2f15b8db6..33a29e4b719 100644 --- a/txnprovider/txpool/pool_test.go +++ b/txnprovider/txpool/pool_test.go @@ -49,16 +49,15 @@ import ( func TestNonceFromAddress(t *testing.T) { assert, require := assert.New(t), require.New(t) ch := make(chan Announcements, 100) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) - cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 pendingBaseFee := uint64(200000) // start blocks from 0, set empty hash - then kvcache will also work on this @@ -172,13 +171,13 @@ func TestReplaceWithHigherFee(t *testing.T) { ch := make(chan Announcements, 100) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.NotEqual(nil, pool) - ctx := context.Background() var stateVersionID uint64 = 0 pendingBaseFee := uint64(200000) // start blocks from 0, set empty hash - then kvcache will also work on this @@ -289,13 +288,13 @@ func TestReverseNonces(t *testing.T) { ch := make(chan Announcements, 100) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 pendingBaseFee := uint64(1_000_000) // start blocks from 0, set empty hash - then kvcache will also work on this @@ -413,13 +412,13 @@ func TestTxnPoke(t *testing.T) { ch := make(chan Announcements, 100) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 pendingBaseFee := uint64(200000) // start blocks from 0, set empty hash - then kvcache will also work on this @@ -703,10 +702,11 @@ func TestShanghaiValidateTxn(t *testing.T) { shanghaiTime = big.NewInt(0) } + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cache := &kvcache.DummyCache{} - pool, err := New(ch, nil, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, logger) + pool, err := New(ctx, ch, nil, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) asrt.NoError(err) - ctx := context.Background() tx, err := coreDB.BeginRw(ctx) defer tx.Rollback() asrt.NoError(err) @@ -745,17 +745,17 @@ func TestShanghaiValidateTxn(t *testing.T) { func TestSetCodeTxnValidationWithLargeAuthorizationValues(t *testing.T) { maxUint256 := new(uint256.Int).SetAllOne() - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) ch := make(chan Announcements, 1) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) cfg := txpoolcfg.DefaultConfig chainID := *maxUint256 cache := &kvcache.DummyCache{} logger := log.New() - pool, err := New(ch, nil, coreDB, cfg, cache, chainID, common.Big0 /* shanghaiTime */, nil, /* agraBlock */ - common.Big0 /* cancunTime */, common.Big0 /* pragueTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, logger) + pool, err := New(ctx, ch, nil, coreDB, cfg, cache, chainID, common.Big0 /* shanghaiTime */, nil, /* agraBlock */ + common.Big0 /* cancunTime */, common.Big0 /* pragueTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) assert.NoError(t, err) - ctx := context.Background() tx, err := coreDB.BeginRw(ctx) defer tx.Rollback() assert.NoError(t, err) @@ -797,13 +797,13 @@ func TestBlobTxnReplacement(t *testing.T) { ch := make(chan Announcements, 5) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 h1 := gointerfaces.ConvertHashToH256([32]byte{}) @@ -977,13 +977,13 @@ func TestDropRemoteAtNoGossip(t *testing.T) { logger := log.New() sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - txnPool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, logger) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + txnPool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) assert.NoError(err) require.True(txnPool != nil) - ctx := context.Background() - - err = txnPool.Start(ctx) + err = txnPool.start(ctx) assert.NoError(err) var stateVersionID uint64 = 0 @@ -1078,15 +1078,15 @@ func TestBlobSlots(t *testing.T) { coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) cfg := txpoolcfg.DefaultConfig - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) //Setting limits for blobs in the pool cfg.TotalBlobPoolLimit = 20 sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 h1 := gointerfaces.ConvertHashToH256([32]byte{}) @@ -1152,15 +1152,15 @@ func TestBlobSlots(t *testing.T) { func TestGasLimitChanged(t *testing.T) { assert, require := assert.New(t), require.New(t) ch := make(chan Announcements, 100) - + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) coreDB, _ := temporaltest.NewTestDB(t, datadir.New(t.TempDir())) db := memdb.NewTestPoolDB(t) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) assert.NoError(err) require.True(pool != nil) - ctx := context.Background() var stateVersionID uint64 = 0 pendingBaseFee := uint64(200000) // start blocks from 0, set empty hash - then kvcache will also work on this From c6e6eddd8f927f06b2cd41fcb0de76c4930528b0 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 18:41:59 +0200 Subject: [PATCH 3/8] fix tests --- turbo/stages/mock/mock_sentry.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 2aea4677c37..a08b6384c2e 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -156,8 +156,9 @@ func (ms *MockSentry) Close() { if ms.DB != nil { ms.DB.Close() } - err := ms.bgComponentsEg.Wait() - require.Equal(ms.tb, err, context.Canceled) + if err := ms.bgComponentsEg.Wait(); err != nil { + require.Equal(ms.tb, context.Canceled, err) // upon waiting for clean exit we should get ctx cancelled + } } // Stream returns stream, waiting if necessary From a79d63052edc57724a8f4b1ccdea1202356a4296 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 21:35:37 +0200 Subject: [PATCH 4/8] fix tests --- cmd/txpool/main.go | 2 - eth/backend.go | 2 - turbo/stages/mock/mock_sentry.go | 71 ++++++------------ txnprovider/txpool/assemble.go | 123 +++++++++++++++++++++++++++++++ txnprovider/txpool/fetch.go | 20 +++-- txnprovider/txpool/fetch_test.go | 7 +- txnprovider/txpool/options.go | 69 +++++++++++++++++ txnprovider/txpool/pool.go | 96 ++---------------------- txnprovider/txpool/send.go | 8 +- 9 files changed, 239 insertions(+), 159 deletions(-) create mode 100644 txnprovider/txpool/assemble.go create mode 100644 txnprovider/txpool/options.go diff --git a/cmd/txpool/main.go b/cmd/txpool/main.go index aff3aab4a12..eba6b7a2632 100644 --- a/cmd/txpool/main.go +++ b/cmd/txpool/main.go @@ -40,7 +40,6 @@ import ( "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/cmd/rpcdaemon/rpcdaemontest" "github.com/erigontech/erigon/cmd/utils" - "github.com/erigontech/erigon/consensus/misc" "github.com/erigontech/erigon/ethdb/privateapi" "github.com/erigontech/erigon/turbo/debug" "github.com/erigontech/erigon/turbo/logging" @@ -189,7 +188,6 @@ func doTxpool(ctx context.Context, logger log.Logger) error { kvcache.New(cacheConfig), sentryClients, kvClient, - misc.Eip1559FeeCalculator, notifyMiner, logger, ) diff --git a/eth/backend.go b/eth/backend.go index 079f127871b..0679b5eb85e 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -87,7 +87,6 @@ import ( "github.com/erigontech/erigon/consensus/clique" "github.com/erigontech/erigon/consensus/ethash" "github.com/erigontech/erigon/consensus/merge" - "github.com/erigontech/erigon/consensus/misc" "github.com/erigontech/erigon/core" "github.com/erigontech/erigon/core/rawdb" "github.com/erigontech/erigon/core/rawdb/blockio" @@ -658,7 +657,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger kvcache.NewDummy(), sentries, stateDiffClient, - misc.Eip1559FeeCalculator, blockBuilderNotifyNewTxns, logger, ) diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index a08b6384c2e..406a0543feb 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -29,7 +29,6 @@ import ( "github.com/c2h5oh/datasize" lru "github.com/hashicorp/golang-lru/arc/v2" - "github.com/holiman/uint256" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" "golang.org/x/sync/errgroup" @@ -46,6 +45,7 @@ import ( proto_downloader "github.com/erigontech/erigon-lib/gointerfaces/downloaderproto" execution "github.com/erigontech/erigon-lib/gointerfaces/executionproto" proto_sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" + "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto" ptypes "github.com/erigontech/erigon-lib/gointerfaces/typesproto" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/kvcache" @@ -127,11 +127,8 @@ type MockSentry struct { Notifications *shards.Notifications // TxPool - TxPoolFetch *txpool.Fetch - TxPoolSend *txpool.Send - TxPoolGrpcServer *txpool.GrpcServer TxPool *txpool.TxPool - txPoolDB kv.RwDB + TxPoolGrpcServer txpoolproto.TxpoolServer HistoryV3 bool agg *libstate.Aggregator @@ -321,6 +318,17 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK if tb != nil { tb.Cleanup(mock.Close) } + + // Committed genesis will be shared between download and mock sentry + _, mock.Genesis, err = core.CommitGenesisBlock(mock.DB, gspec, datadir.New(tmpdir), mock.Log) + if _, ok := err.(*chain.ConfigCompatError); err != nil && !ok { + if tb != nil { + tb.Fatal(err) + } else { + panic(err) + } + } + blockWriter := blockio.NewBlockWriter() mock.Address = crypto.PubkeyToAddress(mock.Key.PublicKey) @@ -336,66 +344,33 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK blockPropagator := func(Ctx context.Context, header *types.Header, body *types.RawBody, td *big.Int) {} if !cfg.TxPool.Disable { poolCfg := txpoolcfg.DefaultConfig - newTxs := make(chan txpool.Announcements, 1024) - if tb != nil { - tb.Cleanup(func() { - close(newTxs) - }) - } - chainID, _ := uint256.FromBig(mock.ChainConfig.ChainID) - shanghaiTime := mock.ChainConfig.ShanghaiTime - cancunTime := mock.ChainConfig.CancunTime - pragueTime := mock.ChainConfig.PragueTime - maxBlobsPerBlock := mock.ChainConfig.GetMaxBlobsPerBlock() - mock.txPoolDB = memdb.NewWithLabel(tmpdir, kv.TxPoolDB) stateChangesClient := direct.NewStateDiffClientDirect(erigonGrpcServeer) - newSlotsStreams := &txpool.NewSlotsStreams{} - mock.TxPool, err = txpool.New( + mock.TxPool, mock.TxPoolGrpcServer, err = txpool.Assemble( ctx, - newTxs, - mock.txPoolDB, - mock.DB, poolCfg, + poolCfg, + mock.DB, kvcache.NewDummy(), - *chainID, shanghaiTime, - nil, /* agraBlock */ - cancunTime, - pragueTime, - maxBlobsPerBlock, - nil, /* feeCalculator */ sentries, stateChangesClient, func() {}, /* builderNotifyNewTxns */ - newSlotsStreams, logger, + txpool.WithP2PFetcherWg(&mock.ReceiveWg), + txpool.WithP2PSenderWg(nil), + txpool.WithFeeCalculator(nil), + txpool.WithPoolDBInitializer(func(_ context.Context, _ txpoolcfg.Config, _ log.Logger) (kv.RwDB, error) { + return memdb.NewWithLabel(tmpdir, kv.TxPoolDB), nil + }), ) if err != nil { tb.Fatal(err) } - mock.TxPoolFetch = txpool.NewFetch(mock.Ctx, sentries, mock.TxPool, stateChangesClient, mock.DB, mock.txPoolDB, *chainID, logger) - mock.TxPoolFetch.SetWaitGroup(&mock.ReceiveWg) - mock.TxPoolSend = txpool.NewSend(mock.Ctx, sentries, logger) - mock.TxPoolGrpcServer = txpool.NewGrpcServer(mock.Ctx, mock.TxPool, mock.txPoolDB, newSlotsStreams, *chainID, logger) - - mock.TxPoolFetch.ConnectCore() mock.StreamWg.Add(1) - mock.TxPoolFetch.ConnectSentries() - mock.StreamWg.Wait() - mock.bgComponentsEg.Go(func() error { return mock.TxPool.Run(ctx) }) + mock.StreamWg.Wait() } - // Committed genesis will be shared between download and mock sentry - _, mock.Genesis, err = core.CommitGenesisBlock(mock.DB, gspec, datadir.New(tmpdir), mock.Log) - if _, ok := err.(*chain.ConfigCompatError); err != nil && !ok { - if tb != nil { - tb.Fatal(err) - } else { - panic(err) - } - } latestBlockBuiltStore := builder.NewLatestBlockBuiltStore() - inMemoryExecution := func(txc wrap.TxContainer, header *types.Header, body *types.RawBody, unwindPoint uint64, headersChain []*types.Header, bodiesChain []*types.RawBody, notifications *shards.Notifications) error { terseLogger := log.New() diff --git a/txnprovider/txpool/assemble.go b/txnprovider/txpool/assemble.go new file mode 100644 index 00000000000..77332be8baf --- /dev/null +++ b/txnprovider/txpool/assemble.go @@ -0,0 +1,123 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package txpool + +import ( + "context" + "math/big" + + "github.com/c2h5oh/datasize" + "github.com/holiman/uint256" + + "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" + "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto" + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/kvcache" + "github.com/erigontech/erigon-lib/kv/mdbx" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon/txnprovider/txpool/txpoolcfg" +) + +func Assemble( + ctx context.Context, + cfg txpoolcfg.Config, + chainDB kv.RwDB, + cache kvcache.Cache, + sentryClients []sentryproto.SentryClient, + stateChangesClient StateChangesClient, + builderNotifyNewTxns func(), + logger log.Logger, + opts ...Option, +) (*TxPool, txpoolproto.TxpoolServer, error) { + options := applyOpts(opts...) + poolDB, err := options.poolDBInitializer(ctx, cfg, logger) + if err != nil { + return nil, nil, err + } + + chainConfig, _, err := SaveChainConfigIfNeed(ctx, chainDB, poolDB, true, logger) + if err != nil { + return nil, nil, err + } + + chainID, _ := uint256.FromBig(chainConfig.ChainID) + maxBlobsPerBlock := chainConfig.GetMaxBlobsPerBlock() + + shanghaiTime := chainConfig.ShanghaiTime + var agraBlock *big.Int + if chainConfig.Bor != nil { + agraBlock = chainConfig.Bor.GetAgraBlock() + } + cancunTime := chainConfig.CancunTime + pragueTime := chainConfig.PragueTime + if cfg.OverridePragueTime != nil { + pragueTime = cfg.OverridePragueTime + } + + newTxns := make(chan Announcements, 1024) + newSlotsStreams := &NewSlotsStreams{} + pool, err := New( + ctx, + newTxns, + poolDB, + chainDB, + cfg, + cache, + *chainID, + shanghaiTime, + agraBlock, + cancunTime, + pragueTime, + maxBlobsPerBlock, + sentryClients, + stateChangesClient, + builderNotifyNewTxns, + newSlotsStreams, + logger, + opts..., + ) + if err != nil { + return nil, nil, err + } + + grpcServer := NewGrpcServer(ctx, pool, poolDB, newSlotsStreams, *chainID, logger) + return pool, grpcServer, nil +} + +type poolDBInitializer func(ctx context.Context, cfg txpoolcfg.Config, logger log.Logger) (kv.RwDB, error) + +var defaultPoolDBInitializer = func(ctx context.Context, cfg txpoolcfg.Config, logger log.Logger) (kv.RwDB, error) { + opts := mdbx.New(kv.TxPoolDB, logger). + Path(cfg.DBDir). + WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). + WriteMergeThreshold(3 * 8192). + PageSize(16 * datasize.KB). + GrowthStep(16 * datasize.MB). + DirtySpace(uint64(128 * datasize.MB)). + MapSize(1 * datasize.TB). + WriteMap(cfg.MdbxWriteMap) + if cfg.MdbxPageSize > 0 { + opts = opts.PageSize(cfg.MdbxPageSize) + } + if cfg.MdbxDBSizeLimit > 0 { + opts = opts.MapSize(cfg.MdbxDBSizeLimit) + } + if cfg.MdbxGrowthStep > 0 { + opts = opts.GrowthStep(cfg.MdbxGrowthStep) + } + return opts.Open(ctx) +} diff --git a/txnprovider/txpool/fetch.go b/txnprovider/txpool/fetch.go index 21838e85798..1e11b37f706 100644 --- a/txnprovider/txpool/fetch.go +++ b/txnprovider/txpool/fetch.go @@ -43,7 +43,6 @@ import ( type Fetch struct { ctx context.Context // Context used for cancellation and closing of the fetcher pool Pool // Transaction pool implementation - coreDB kv.RoDB db kv.RwDB stateChangesClient StateChangesClient wg *sync.WaitGroup // used for synchronisation in the tests (nil when not in tests) @@ -62,17 +61,26 @@ type StateChangesClient interface { // NewFetch creates a new fetch object that will work with given sentry clients. Since the // SentryClient here is an interface, it is suitable for mocking in tests (mock will need // to implement all the functions of the SentryClient interface). -func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Pool, stateChangesClient StateChangesClient, coreDB kv.RoDB, db kv.RwDB, - chainID uint256.Int, logger log.Logger) *Fetch { +func NewFetch( + ctx context.Context, + sentryClients []sentry.SentryClient, + pool Pool, + stateChangesClient StateChangesClient, + db kv.RwDB, + chainID uint256.Int, + logger log.Logger, + opts ...Option, +) *Fetch { + options := applyOpts(opts...) f := &Fetch{ ctx: ctx, sentryClients: sentryClients, pool: pool, - coreDB: coreDB, db: db, stateChangesClient: stateChangesClient, stateChangesParseCtx: NewTxnParseContext(chainID).ChainIDRequired(), //TODO: change ctx if rules changed pooledTxnsParseCtx: NewTxnParseContext(chainID).ChainIDRequired(), + wg: options.p2pFetcherWg, logger: logger, } f.pooledTxnsParseCtx.ValidateRLP(f.pool.ValidateSerializedTxn) @@ -81,10 +89,6 @@ func NewFetch(ctx context.Context, sentryClients []sentry.SentryClient, pool Poo return f } -func (f *Fetch) SetWaitGroup(wg *sync.WaitGroup) { - f.wg = wg -} - func (f *Fetch) threadSafeParsePooledTxn(cb func(*TxnParseContext) error) error { f.pooledTxnsParseCtxLock.Lock() defer f.pooledTxnsParseCtxLock.Unlock() diff --git a/txnprovider/txpool/fetch_test.go b/txnprovider/txpool/fetch_test.go index 0ca827a859e..0cbfb79fba5 100644 --- a/txnprovider/txpool/fetch_test.go +++ b/txnprovider/txpool/fetch_test.go @@ -54,9 +54,8 @@ func TestFetch(t *testing.T) { m := NewMockSentry(ctx, sentryServer) sentryClient := direct.NewSentryClientDirect(direct.ETH67, m) - fetch := NewFetch(ctx, []sentryproto.SentryClient{sentryClient}, pool, remoteKvClient, nil, nil, *u256.N1, log.New()) var wg sync.WaitGroup - fetch.SetWaitGroup(&wg) + fetch := NewFetch(ctx, []sentryproto.SentryClient{sentryClient}, pool, remoteKvClient, nil, *u256.N1, log.New(), WithP2PFetcherWg(&wg)) m.StreamWg.Add(2) fetch.ConnectSentries() m.StreamWg.Wait() @@ -232,7 +231,7 @@ func decodeHex(in string) []byte { func TestOnNewBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - coreDB, db := memdb.NewTestDB(t, kv.ChainDB), memdb.NewTestDB(t, kv.TxPoolDB) + _, db := memdb.NewTestDB(t, kv.ChainDB), memdb.NewTestDB(t, kv.TxPoolDB) ctrl := gomock.NewController(t) stream := remote.NewMockKV_StateChangesClient(ctrl) @@ -287,7 +286,7 @@ func TestOnNewBlock(t *testing.T) { }). Times(1) - fetch := NewFetch(ctx, nil, pool, stateChanges, coreDB, db, *u256.N1, log.New()) + fetch := NewFetch(ctx, nil, pool, stateChanges, db, *u256.N1, log.New()) err := fetch.handleStateChanges(ctx, stateChanges) assert.ErrorIs(t, io.EOF, err) assert.Equal(t, 3, len(minedTxns.Txns)) diff --git a/txnprovider/txpool/options.go b/txnprovider/txpool/options.go new file mode 100644 index 00000000000..79f47369511 --- /dev/null +++ b/txnprovider/txpool/options.go @@ -0,0 +1,69 @@ +// Copyright 2024 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package txpool + +import ( + "sync" + + "github.com/erigontech/erigon/consensus/misc" +) + +type Option func(*options) + +func WithFeeCalculator(f FeeCalculator) Option { + return func(o *options) { + o.feeCalculator = f + } +} + +func WithPoolDBInitializer(init poolDBInitializer) Option { + return func(o *options) { + o.poolDBInitializer = init + } +} + +func WithP2PFetcherWg(wg *sync.WaitGroup) Option { + return func(o *options) { + o.p2pFetcherWg = wg + } +} + +func WithP2PSenderWg(wg *sync.WaitGroup) Option { + return func(o *options) { + o.p2pSenderWg = wg + } +} + +type options struct { + feeCalculator FeeCalculator + poolDBInitializer poolDBInitializer + p2pSenderWg *sync.WaitGroup + p2pFetcherWg *sync.WaitGroup +} + +func applyOpts(opts ...Option) options { + overriddenDefaults := defaultOptions + for _, opt := range opts { + opt(&overriddenDefaults) + } + return overriddenDefaults +} + +var defaultOptions = options{ + poolDBInitializer: defaultPoolDBInitializer, + feeCalculator: misc.Eip1559FeeCalculator, +} diff --git a/txnprovider/txpool/pool.go b/txnprovider/txpool/pool.go index cae6ac110e4..32205474a02 100644 --- a/txnprovider/txpool/pool.go +++ b/txnprovider/txpool/pool.go @@ -30,7 +30,6 @@ import ( "sync/atomic" "time" - "github.com/c2h5oh/datasize" gokzg4844 "github.com/crate-crypto/go-kzg-4844" mapset "github.com/deckarep/golang-set/v2" "github.com/go-stack/stack" @@ -157,90 +156,6 @@ type FeeCalculator interface { CurrentFees(chainConfig *chain.Config, db kv.Getter) (baseFee uint64, blobFee uint64, minBlobGasPrice, blockGasLimit uint64, err error) } -func Assemble( - ctx context.Context, - cfg txpoolcfg.Config, - chainDB kv.RwDB, - cache kvcache.Cache, - sentryClients []sentry.SentryClient, - stateChangesClient StateChangesClient, - feeCalculator FeeCalculator, - builderNotifyNewTxns func(), - logger log.Logger, -) (*TxPool, txpoolproto.TxpoolServer, error) { - opts := mdbx.New(kv.TxPoolDB, logger).Path(cfg.DBDir). - WithTableCfg(func(defaultBuckets kv.TableCfg) kv.TableCfg { return kv.TxpoolTablesCfg }). - WriteMergeThreshold(3 * 8192). - PageSize(16 * datasize.KB). - GrowthStep(16 * datasize.MB). - DirtySpace(uint64(128 * datasize.MB)). - MapSize(1 * datasize.TB). - WriteMap(cfg.MdbxWriteMap) - - if cfg.MdbxPageSize > 0 { - opts = opts.PageSize(cfg.MdbxPageSize) - } - if cfg.MdbxDBSizeLimit > 0 { - opts = opts.MapSize(cfg.MdbxDBSizeLimit) - } - if cfg.MdbxGrowthStep > 0 { - opts = opts.GrowthStep(cfg.MdbxGrowthStep) - } - - poolDB, err := opts.Open(ctx) - if err != nil { - return nil, nil, err - } - - chainConfig, _, err := SaveChainConfigIfNeed(ctx, chainDB, poolDB, true, logger) - if err != nil { - return nil, nil, err - } - - chainID, _ := uint256.FromBig(chainConfig.ChainID) - maxBlobsPerBlock := chainConfig.GetMaxBlobsPerBlock() - - shanghaiTime := chainConfig.ShanghaiTime - var agraBlock *big.Int - if chainConfig.Bor != nil { - agraBlock = chainConfig.Bor.GetAgraBlock() - } - cancunTime := chainConfig.CancunTime - pragueTime := chainConfig.PragueTime - if cfg.OverridePragueTime != nil { - pragueTime = cfg.OverridePragueTime - } - - newTxns := make(chan Announcements, 1024) - newSlotsStreams := &NewSlotsStreams{} - pool, err := New( - ctx, - newTxns, - poolDB, - chainDB, - cfg, - cache, - *chainID, - shanghaiTime, - agraBlock, - cancunTime, - pragueTime, - maxBlobsPerBlock, - feeCalculator, - sentryClients, - stateChangesClient, - builderNotifyNewTxns, - newSlotsStreams, - logger, - ) - if err != nil { - return nil, nil, err - } - - grpcServer := NewGrpcServer(ctx, pool, poolDB, newSlotsStreams, *chainID, logger) - return pool, grpcServer, nil -} - func New( ctx context.Context, newTxns chan Announcements, @@ -254,13 +169,14 @@ func New( cancunTime *big.Int, pragueTime *big.Int, maxBlobsPerBlock uint64, - feeCalculator FeeCalculator, sentryClients []sentry.SentryClient, stateChangesClient StateChangesClient, builderNotifyNewTxns func(), newSlotsStreams *NewSlotsStreams, logger log.Logger, + opts ...Option, ) (*TxPool, error) { + options := applyOpts(opts...) localsHistory, err := simplelru.NewLRU[string, struct{}](10_000, nil) if err != nil { return nil, err @@ -306,7 +222,7 @@ func New( minedBlobTxnsByBlock: map[uint64][]*metaTxn{}, minedBlobTxnsByHash: map[string]*metaTxn{}, maxBlobsPerBlock: maxBlobsPerBlock, - feeCalculator: feeCalculator, + feeCalculator: options.feeCalculator, builderNotifyNewTxns: builderNotifyNewTxns, newSlotsStreams: newSlotsStreams, logger: logger, @@ -341,8 +257,8 @@ func New( res.pragueTime = &pragueTimeU64 } - res.p2pFetcher = NewFetch(ctx, sentryClients, res, stateChangesClient, coreDB, poolDB, chainID, logger) - res.p2pSender = NewSend(ctx, sentryClients, logger) + res.p2pFetcher = NewFetch(ctx, sentryClients, res, stateChangesClient, poolDB, chainID, logger, opts...) + res.p2pSender = NewSend(ctx, sentryClients, logger, opts...) return res, nil } @@ -1861,7 +1777,7 @@ func (p *TxPool) Run(ctx context.Context) error { select { case <-ctx.Done(): err := ctx.Err() - _, flushErr := p.flush(ctx) + _, flushErr := p.flush(context.Background()) // need background ctx since the other one is cancelled if flushErr != nil { err = fmt.Errorf("%w: %w", flushErr, err) } diff --git a/txnprovider/txpool/send.go b/txnprovider/txpool/send.go index 97271d3a846..721269e6c5c 100644 --- a/txnprovider/txpool/send.go +++ b/txnprovider/txpool/send.go @@ -40,18 +40,16 @@ type Send struct { logger log.Logger } -func NewSend(ctx context.Context, sentryClients []sentryproto.SentryClient, logger log.Logger) *Send { +func NewSend(ctx context.Context, sentryClients []sentryproto.SentryClient, logger log.Logger, opts ...Option) *Send { + options := applyOpts(opts...) return &Send{ ctx: ctx, sentryClients: sentryClients, logger: logger, + wg: options.p2pSenderWg, } } -func (f *Send) SetWaitGroup(wg *sync.WaitGroup) { - f.wg = wg -} - const ( // This is the target size for the packs of transactions or announcements. A // pack can get larger than this if a single transactions exceeds this size. From e491ac2089065620f0e533c1c347efbb491ebb06 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 21:38:39 +0200 Subject: [PATCH 5/8] tidy --- txnprovider/txpool/fetch_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/txnprovider/txpool/fetch_test.go b/txnprovider/txpool/fetch_test.go index 0cbfb79fba5..3f6f0e00f1a 100644 --- a/txnprovider/txpool/fetch_test.go +++ b/txnprovider/txpool/fetch_test.go @@ -30,14 +30,13 @@ import ( "google.golang.org/grpc" "google.golang.org/protobuf/types/known/emptypb" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/common/u256" "github.com/erigontech/erigon-lib/direct" "github.com/erigontech/erigon-lib/gointerfaces" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" "github.com/erigontech/erigon-lib/gointerfaces/typesproto" + "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/memdb" "github.com/erigontech/erigon-lib/log/v3" ) @@ -74,7 +73,7 @@ func TestFetch(t *testing.T) { wg.Wait() } -func TestSendTxPropagate(t *testing.T) { +func TestSendTxnPropagate(t *testing.T) { ctx, cancelFn := context.WithCancel(context.Background()) defer cancelFn() t.Run("few remote byHash", func(t *testing.T) { From 9b207517ce353f67c4b3de506cde1ae194730230 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 21:41:25 +0200 Subject: [PATCH 6/8] tidy --- txnprovider/txpool/pool.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/txnprovider/txpool/pool.go b/txnprovider/txpool/pool.go index 32205474a02..6a6b8bbadf5 100644 --- a/txnprovider/txpool/pool.go +++ b/txnprovider/txpool/pool.go @@ -48,7 +48,7 @@ import ( "github.com/erigontech/erigon-lib/gointerfaces" "github.com/erigontech/erigon-lib/gointerfaces/grpcutil" remote "github.com/erigontech/erigon-lib/gointerfaces/remoteproto" - sentry "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" + "github.com/erigontech/erigon-lib/gointerfaces/sentryproto" "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto" "github.com/erigontech/erigon-lib/kv" "github.com/erigontech/erigon-lib/kv/kvcache" @@ -169,7 +169,7 @@ func New( cancunTime *big.Int, pragueTime *big.Int, maxBlobsPerBlock uint64, - sentryClients []sentry.SentryClient, + sentryClients []sentryproto.SentryClient, stateChangesClient StateChangesClient, builderNotifyNewTxns func(), newSlotsStreams *NewSlotsStreams, From b6a4068fb497ee42fbca866ac2ff6e6a8a02bee8 Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Fri, 20 Dec 2024 22:01:19 +0200 Subject: [PATCH 7/8] fix tests --- txnprovider/txpool/pool_fuzz_test.go | 4 ++-- txnprovider/txpool/pool_test.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/txnprovider/txpool/pool_fuzz_test.go b/txnprovider/txpool/pool_fuzz_test.go index b557e40900f..72ebf61c204 100644 --- a/txnprovider/txpool/pool_fuzz_test.go +++ b/txnprovider/txpool/pool_fuzz_test.go @@ -325,7 +325,7 @@ func FuzzOnNewBlocks(f *testing.F) { cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) err = pool.start(ctx) @@ -551,7 +551,7 @@ func FuzzOnNewBlocks(f *testing.F) { check(p2pReceived, TxnSlots{}, "after_flush") checkNotify(p2pReceived, TxnSlots{}, "after_flush") - p2, err := New(ctx, ch, db, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + p2, err := New(ctx, ch, db, coreDB, txpoolcfg.DefaultConfig, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) p2.senders = pool.senders // senders are not persisted diff --git a/txnprovider/txpool/pool_test.go b/txnprovider/txpool/pool_test.go index 33a29e4b719..fc945fc2822 100644 --- a/txnprovider/txpool/pool_test.go +++ b/txnprovider/txpool/pool_test.go @@ -55,7 +55,7 @@ func TestNonceFromAddress(t *testing.T) { db := memdb.NewTestPoolDB(t) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 @@ -175,7 +175,7 @@ func TestReplaceWithHigherFee(t *testing.T) { t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.NotEqual(nil, pool) var stateVersionID uint64 = 0 @@ -292,7 +292,7 @@ func TestReverseNonces(t *testing.T) { t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 @@ -416,7 +416,7 @@ func TestTxnPoke(t *testing.T) { t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 @@ -705,7 +705,7 @@ func TestShanghaiValidateTxn(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) cache := &kvcache.DummyCache{} - pool, err := New(ctx, ch, nil, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) + pool, err := New(ctx, ch, nil, coreDB, cfg, cache, *u256.N1, shanghaiTime, nil /* agraBlock */, nil /* cancunTime */, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, logger, WithFeeCalculator(nil)) asrt.NoError(err) tx, err := coreDB.BeginRw(ctx) defer tx.Rollback() @@ -754,7 +754,7 @@ func TestSetCodeTxnValidationWithLargeAuthorizationValues(t *testing.T) { cache := &kvcache.DummyCache{} logger := log.New() pool, err := New(ctx, ch, nil, coreDB, cfg, cache, chainID, common.Big0 /* shanghaiTime */, nil, /* agraBlock */ - common.Big0 /* cancunTime */, common.Big0 /* pragueTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) + common.Big0 /* cancunTime */, common.Big0 /* pragueTime */, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, logger, WithFeeCalculator(nil)) assert.NoError(t, err) tx, err := coreDB.BeginRw(ctx) defer tx.Rollback() @@ -801,7 +801,7 @@ func TestBlobTxnReplacement(t *testing.T) { t.Cleanup(cancel) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 @@ -979,7 +979,7 @@ func TestDropRemoteAtNoGossip(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - txnPool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, logger) + txnPool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, big.NewInt(0), big.NewInt(0), nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, logger, WithFeeCalculator(nil)) assert.NoError(err) require.True(txnPool != nil) @@ -1084,7 +1084,7 @@ func TestBlobSlots(t *testing.T) { cfg.TotalBlobPoolLimit = 20 sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, common.Big0, nil, common.Big0, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 @@ -1158,7 +1158,7 @@ func TestGasLimitChanged(t *testing.T) { db := memdb.NewTestPoolDB(t) cfg := txpoolcfg.DefaultConfig sendersCache := kvcache.New(kvcache.DefaultCoherentConfig) - pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, nil, func() {}, nil, log.New()) + pool, err := New(ctx, ch, db, coreDB, cfg, sendersCache, *u256.N1, nil, nil, nil, nil, fixedgas.DefaultMaxBlobsPerBlock, nil, nil, func() {}, nil, log.New(), WithFeeCalculator(nil)) assert.NoError(err) require.True(pool != nil) var stateVersionID uint64 = 0 From 2f1f6a1f9d4f244ea60f47b737828043150cd34d Mon Sep 17 00:00:00 2001 From: taratorio <94537774+taratorio@users.noreply.github.com> Date: Mon, 6 Jan 2025 14:30:51 +0000 Subject: [PATCH 8/8] tidy --- eth/backend.go | 24 +++++++++++------------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/eth/backend.go b/eth/backend.go index b467282059f..08da323e42c 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -270,14 +270,17 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // kv_remote architecture does blocks on stream.Send - means current architecture require unlimited amount of txs to provide good throughput backend := &Ethereum{ - sentryCtx: ctx, - sentryCancel: ctxCancel, - config: config, - networkID: config.NetworkID, - etherbase: config.Miner.Etherbase, - waitForStageLoopStop: make(chan struct{}), - waitForMiningStop: make(chan struct{}), - logger: logger, + sentryCtx: ctx, + sentryCancel: ctxCancel, + config: config, + networkID: config.NetworkID, + etherbase: config.Miner.Etherbase, + waitForStageLoopStop: make(chan struct{}), + waitForMiningStop: make(chan struct{}), + blockBuilderNotifyNewTxns: make(chan struct{}, 1), + miningSealingQuit: make(chan struct{}), + minedBlocks: make(chan *types.Block, 1), + logger: logger, stopNode: func() error { return stack.Close() }, @@ -676,11 +679,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger txnProvider = backend.shutterPool } - backend.blockBuilderNotifyNewTxns = make(chan struct{}, 1) - backend.miningSealingQuit = make(chan struct{}) - backend.pendingBlocks = make(chan *types.Block, 1) - backend.minedBlocks = make(chan *types.Block, 1) - miner := stagedsync.NewMiningState(&config.Miner) backend.pendingBlocks = miner.PendingResultCh