diff --git a/cmd/geth/main.go b/cmd/geth/main.go index e118b137dd55..86ceba9f65e5 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -80,6 +80,7 @@ var ( utils.TxPoolLocalsFlag, utils.TxPoolNoLocalsFlag, utils.TxPoolJournalFlag, + utils.TxPoolJournalRemotesFlag, utils.TxPoolRejournalFlag, utils.TxPoolPriceLimitFlag, utils.TxPoolPriceBumpFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 0d3641afcce6..c55375ae19b8 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -406,6 +406,11 @@ var ( Value: txpool.DefaultConfig.Journal, Category: flags.TxPoolCategory, } + TxPoolJournalRemotesFlag = &cli.BoolFlag{ + Name: "txpool.journalremotes", + Usage: "Includes remote transactions in the journal", + Category: flags.TxPoolCategory, + } TxPoolRejournalFlag = &cli.DurationFlag{ Name: "txpool.rejournal", Usage: "Time interval to regenerate the local transaction journal", @@ -1625,6 +1630,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) { if ctx.IsSet(TxPoolJournalFlag.Name) { cfg.Journal = ctx.String(TxPoolJournalFlag.Name) } + if ctx.IsSet(TxPoolJournalRemotesFlag.Name) { + cfg.JournalRemote = ctx.Bool(TxPoolJournalRemotesFlag.Name) + } if ctx.IsSet(TxPoolRejournalFlag.Name) { cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name) } diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 3793ecc31c69..e16af651a83f 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -172,6 +172,10 @@ type Config struct { Journal string // Journal of local transactions to survive node restarts Rejournal time.Duration // Time interval to regenerate the local transaction journal + // JournalRemote controls whether journaling includes remote transactions or not. + // When true, all transactions loaded from the journal are treated as remote. + JournalRemote bool + PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce) @@ -330,14 +334,18 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain) pool.wg.Add(1) go pool.scheduleReorgLoop() - // If local transactions and journaling is enabled, load from disk - if !config.NoLocals && config.Journal != "" { + // If journaling is enabled and has transactions to journal, load from disk + if (!config.NoLocals || config.JournalRemote) && config.Journal != "" { pool.journal = newTxJournal(config.Journal) - if err := pool.journal.load(pool.AddLocals); err != nil { + add := pool.AddLocals + if config.JournalRemote { + add = pool.AddRemotesSync // Use sync version to match pool.AddLocals + } + if err := pool.journal.load(add); err != nil { log.Warn("Failed to load transaction journal", "err", err) } - if err := pool.journal.rotate(pool.local()); err != nil { + if err := pool.journal.rotate(pool.toJournal()); err != nil { log.Warn("Failed to rotate transaction journal", "err", err) } } @@ -420,7 +428,7 @@ func (pool *TxPool) loop() { case <-journal.C: if pool.journal != nil { pool.mu.Lock() - if err := pool.journal.rotate(pool.local()); err != nil { + if err := pool.journal.rotate(pool.toJournal()); err != nil { log.Warn("Failed to rotate local tx journal", "err", err) } pool.mu.Unlock() @@ -600,6 +608,23 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { return txs } +// toJournal retrieves all transactions that should be included in the journal, +// grouped by origin account and sorted by nonce. +// The returned transaction set is a copy and can be freely modified by calling code. +func (pool *TxPool) toJournal() map[common.Address]types.Transactions { + if !pool.config.JournalRemote { + return pool.local() + } + txs := make(map[common.Address]types.Transactions) + for addr, pending := range pool.pending { + txs[addr] = append(txs[addr], pending.Flatten()...) + } + for addr, queued := range pool.queue { + txs[addr] = append(txs[addr], queued.Flatten()...) + } + return txs +} + // validateTxBasics checks whether a transaction is valid according to the consensus // rules, but does not check state-dependent validation such as sufficient balance. // This check is meant as an early check which only needs to be performed once, @@ -904,7 +929,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo // deemed to have been sent from a local account. func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) { // Only journal if it's enabled and the transaction is local - if pool.journal == nil || !pool.locals.contains(from) { + if pool.journal == nil || (!pool.config.JournalRemote && !pool.locals.contains(from)) { return } if err := pool.journal.insert(tx); err != nil { diff --git a/core/txpool/txpool_test.go b/core/txpool/txpool_test.go index a4889fa62f59..92b249edcb60 100644 --- a/core/txpool/txpool_test.go +++ b/core/txpool/txpool_test.go @@ -2255,10 +2255,13 @@ func TestReplacementDynamicFee(t *testing.T) { // Tests that local transactions are journaled to disk, but remote transactions // get discarded between restarts. -func TestJournaling(t *testing.T) { testJournaling(t, false) } -func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) } +func TestJournaling(t *testing.T) { testJournaling(t, false, false) } +func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true, false) } -func testJournaling(t *testing.T, nolocals bool) { +func TestJournalingRemotes(t *testing.T) { testJournaling(t, false, true) } +func TestJournalingRemotesNoLocals(t *testing.T) { testJournaling(t, true, true) } + +func testJournaling(t *testing.T, nolocals bool, journalRemotes bool) { t.Parallel() // Create a temporary file for the journal @@ -2279,6 +2282,7 @@ func testJournaling(t *testing.T, nolocals bool) { config := testTxPoolConfig config.NoLocals = nolocals + config.JournalRemote = journalRemotes config.Journal = journal config.Rejournal = time.Second @@ -2325,10 +2329,14 @@ func testJournaling(t *testing.T, nolocals bool) { if queued != 0 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) } - if nolocals { + if nolocals && !journalRemotes { if pending != 0 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) } + } else if journalRemotes { + if pending != 3 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3) + } } else { if pending != 2 { t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2) @@ -2348,10 +2356,16 @@ func testJournaling(t *testing.T, nolocals bool) { pool = NewTxPool(config, params.TestChainConfig, blockchain) pending, queued = pool.Stats() - if pending != 0 { - t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + if journalRemotes { + if pending != 1 { // Remove the 2 replaced local transactions, but preserve the remote + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1) + } + } else { + if pending != 0 { + t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0) + } } - if nolocals { + if nolocals && !journalRemotes { if queued != 0 { t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0) }