Skip to content

Commit

Permalink
core/txpool: Support journaling remote transactions with --txpool.jou…
Browse files Browse the repository at this point in the history
…rnalremotes
  • Loading branch information
ajsutton committed Jun 2, 2023
1 parent bdab05c commit d619a93
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 13 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ var (
utils.TxPoolLocalsFlag,
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolJournalRemotesFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
Expand Down
8 changes: 8 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,11 @@ var (
Value: txpool.DefaultConfig.Journal,
Category: flags.TxPoolCategory,
}
TxPoolJournalRemotesFlag = &cli.BoolFlag{
Name: "txpool.journalremotes",
Usage: "Includes remote transactions in the journal",
Category: flags.TxPoolCategory,
}
TxPoolRejournalFlag = &cli.DurationFlag{
Name: "txpool.rejournal",
Usage: "Time interval to regenerate the local transaction journal",
Expand Down Expand Up @@ -1625,6 +1630,9 @@ func setTxPool(ctx *cli.Context, cfg *txpool.Config) {
if ctx.IsSet(TxPoolJournalFlag.Name) {
cfg.Journal = ctx.String(TxPoolJournalFlag.Name)
}
if ctx.IsSet(TxPoolJournalRemotesFlag.Name) {
cfg.JournalRemote = ctx.Bool(TxPoolJournalRemotesFlag.Name)
}
if ctx.IsSet(TxPoolRejournalFlag.Name) {
cfg.Rejournal = ctx.Duration(TxPoolRejournalFlag.Name)
}
Expand Down
37 changes: 31 additions & 6 deletions core/txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@ type Config struct {
Journal string // Journal of local transactions to survive node restarts
Rejournal time.Duration // Time interval to regenerate the local transaction journal

// JournalRemote controls whether journaling includes remote transactions or not.
// When true, all transactions loaded from the journal are treated as remote.
JournalRemote bool

PriceLimit uint64 // Minimum gas price to enforce for acceptance into the pool
PriceBump uint64 // Minimum price bump percentage to replace an already existing transaction (nonce)

Expand Down Expand Up @@ -330,14 +334,18 @@ func NewTxPool(config Config, chainconfig *params.ChainConfig, chain blockChain)
pool.wg.Add(1)
go pool.scheduleReorgLoop()

// If local transactions and journaling is enabled, load from disk
if !config.NoLocals && config.Journal != "" {
// If journaling is enabled and has transactions to journal, load from disk
if (!config.NoLocals || config.JournalRemote) && config.Journal != "" {
pool.journal = newTxJournal(config.Journal)

if err := pool.journal.load(pool.AddLocals); err != nil {
add := pool.AddLocals
if config.JournalRemote {
add = pool.AddRemotesSync // Use sync version to match pool.AddLocals
}
if err := pool.journal.load(add); err != nil {
log.Warn("Failed to load transaction journal", "err", err)
}
if err := pool.journal.rotate(pool.local()); err != nil {
if err := pool.journal.rotate(pool.toJournal()); err != nil {
log.Warn("Failed to rotate transaction journal", "err", err)
}
}
Expand Down Expand Up @@ -420,7 +428,7 @@ func (pool *TxPool) loop() {
case <-journal.C:
if pool.journal != nil {
pool.mu.Lock()
if err := pool.journal.rotate(pool.local()); err != nil {
if err := pool.journal.rotate(pool.toJournal()); err != nil {
log.Warn("Failed to rotate local tx journal", "err", err)
}
pool.mu.Unlock()
Expand Down Expand Up @@ -600,6 +608,23 @@ func (pool *TxPool) local() map[common.Address]types.Transactions {
return txs
}

// toJournal retrieves all transactions that should be included in the journal,
// grouped by origin account and sorted by nonce.
// The returned transaction set is a copy and can be freely modified by calling code.
func (pool *TxPool) toJournal() map[common.Address]types.Transactions {
if !pool.config.JournalRemote {
return pool.local()
}
txs := make(map[common.Address]types.Transactions)
for addr, pending := range pool.pending {
txs[addr] = append(txs[addr], pending.Flatten()...)
}
for addr, queued := range pool.queue {
txs[addr] = append(txs[addr], queued.Flatten()...)
}
return txs
}

// validateTxBasics checks whether a transaction is valid according to the consensus
// rules, but does not check state-dependent validation such as sufficient balance.
// This check is meant as an early check which only needs to be performed once,
Expand Down Expand Up @@ -904,7 +929,7 @@ func (pool *TxPool) enqueueTx(hash common.Hash, tx *types.Transaction, local boo
// deemed to have been sent from a local account.
func (pool *TxPool) journalTx(from common.Address, tx *types.Transaction) {
// Only journal if it's enabled and the transaction is local
if pool.journal == nil || !pool.locals.contains(from) {
if pool.journal == nil || (!pool.config.JournalRemote && !pool.locals.contains(from)) {
return
}
if err := pool.journal.insert(tx); err != nil {
Expand Down
28 changes: 21 additions & 7 deletions core/txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2255,10 +2255,13 @@ func TestReplacementDynamicFee(t *testing.T) {

// Tests that local transactions are journaled to disk, but remote transactions
// get discarded between restarts.
func TestJournaling(t *testing.T) { testJournaling(t, false) }
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true) }
func TestJournaling(t *testing.T) { testJournaling(t, false, false) }
func TestJournalingNoLocals(t *testing.T) { testJournaling(t, true, false) }

func testJournaling(t *testing.T, nolocals bool) {
func TestJournalingRemotes(t *testing.T) { testJournaling(t, false, true) }
func TestJournalingRemotesNoLocals(t *testing.T) { testJournaling(t, true, true) }

func testJournaling(t *testing.T, nolocals bool, journalRemotes bool) {
t.Parallel()

// Create a temporary file for the journal
Expand All @@ -2279,6 +2282,7 @@ func testJournaling(t *testing.T, nolocals bool) {

config := testTxPoolConfig
config.NoLocals = nolocals
config.JournalRemote = journalRemotes
config.Journal = journal
config.Rejournal = time.Second

Expand Down Expand Up @@ -2325,10 +2329,14 @@ func testJournaling(t *testing.T, nolocals bool) {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
if nolocals {
if nolocals && !journalRemotes {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
} else if journalRemotes {
if pending != 3 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 3)
}
} else {
if pending != 2 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 2)
Expand All @@ -2348,10 +2356,16 @@ func testJournaling(t *testing.T, nolocals bool) {
pool = NewTxPool(config, params.TestChainConfig, blockchain)

pending, queued = pool.Stats()
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
if journalRemotes {
if pending != 1 { // Remove the 2 replaced local transactions, but preserve the remote
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 1)
}
} else {
if pending != 0 {
t.Fatalf("pending transactions mismatched: have %d, want %d", pending, 0)
}
}
if nolocals {
if nolocals && !journalRemotes {
if queued != 0 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 0)
}
Expand Down

0 comments on commit d619a93

Please sign in to comment.