Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cmd, core, eth: journal local transactions to disk #14784

Merged
merged 4 commits into from
Jul 28, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ var (
utils.EthashDatasetsInMemoryFlag,
utils.EthashDatasetsOnDiskFlag,
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag,
Expand Down
2 changes: 2 additions & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ var AppHelpFlagGroups = []flagGroup{
Name: "TRANSACTION POOL",
Flags: []cli.Flag{
utils.TxPoolNoLocalsFlag,
utils.TxPoolJournalFlag,
utils.TxPoolRejournalFlag,
utils.TxPoolPriceLimitFlag,
utils.TxPoolPriceBumpFlag,
utils.TxPoolAccountSlotsFlag,
Expand Down
16 changes: 16 additions & 0 deletions cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ var (
Name: "txpool.nolocals",
Usage: "Disables price exemptions for locally submitted transactions",
}
TxPoolJournalFlag = cli.StringFlag{
Name: "txpool.journal",
Usage: "Disk journal for local transaction to survive node restarts",
Value: core.DefaultTxPoolConfig.Journal,
}
TxPoolRejournalFlag = cli.DurationFlag{
Name: "txpool.rejournal",
Usage: "Time interval to regenerate the local transaction journal",
Value: core.DefaultTxPoolConfig.Rejournal,
}
TxPoolPriceLimitFlag = cli.Uint64Flag{
Name: "txpool.pricelimit",
Usage: "Minimum gas price limit to enforce for acceptance into the pool",
Expand Down Expand Up @@ -838,6 +848,12 @@ func setTxPool(ctx *cli.Context, cfg *core.TxPoolConfig) {
if ctx.GlobalIsSet(TxPoolNoLocalsFlag.Name) {
cfg.NoLocals = ctx.GlobalBool(TxPoolNoLocalsFlag.Name)
}
if ctx.GlobalIsSet(TxPoolJournalFlag.Name) {
cfg.Journal = ctx.GlobalString(TxPoolJournalFlag.Name)
}
if ctx.GlobalIsSet(TxPoolRejournalFlag.Name) {
cfg.Rejournal = ctx.GlobalDuration(TxPoolRejournalFlag.Name)
}
if ctx.GlobalIsSet(TxPoolPriceLimitFlag.Name) {
cfg.PriceLimit = ctx.GlobalUint64(TxPoolPriceLimitFlag.Name)
}
Expand Down
150 changes: 150 additions & 0 deletions core/tx_journal.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2017 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.

package core

import (
"errors"
"io"
"os"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
)

// errNoActiveJournal is returned if a transaction is attempted to be inserted
// into the journal, but no such file is currently open.
var errNoActiveJournal = errors.New("no active journal")

// txJournal is a rotating log of transactions with the aim of storing locally
// created transactions to allow non-executed ones to survive node restarts.
type txJournal struct {
path string // Filesystem path to store the transactions at
writer io.WriteCloser // Output stream to write new transactions into
}

// newTxJournal creates a new transaction journal to
func newTxJournal(path string) *txJournal {
return &txJournal{
path: path,
}
}

// load parses a transaction journal dump from disk, loading its contents into
// the specified pool.
func (journal *txJournal) load(add func(*types.Transaction) error) error {
// Skip the parsing if the journal file doens't exist at all
if _, err := os.Stat(journal.path); os.IsNotExist(err) {
return nil
}
// Open the journal for loading any past transactions
input, err := os.Open(journal.path)
if err != nil {
return err
}
defer input.Close()

// Inject all transactions from the journal into the pool
stream := rlp.NewStream(input, 0)
total, dropped := 0, 0

var failure error
for {
// Parse the next transaction and terminate on error
tx := new(types.Transaction)
if err = stream.Decode(tx); err != nil {
if err != io.EOF {
failure = err
}
break
}
// Import the transaction and bump the appropriate progress counters
total++
if err = add(tx); err != nil {
log.Debug("Failed to add journaled transaction", "err", err)
dropped++
continue
}
}
log.Info("Loaded local transaction journal", "transactions", total, "dropped", dropped)

return failure
}

// insert adds the specified transaction to the local disk journal.
func (journal *txJournal) insert(tx *types.Transaction) error {
if journal.writer == nil {
return errNoActiveJournal
}
if err := rlp.Encode(journal.writer, tx); err != nil {
return err
}
return nil
}

// rotate regenerates the transaction journal based on the current contents of
// the transaction pool.
func (journal *txJournal) rotate(all map[common.Address]types.Transactions) error {
// Close the current journal (if any is open)
if journal.writer != nil {
if err := journal.writer.Close(); err != nil {
return err
}
journal.writer = nil
}
// Generate a new journal with the contents of the current pool
replacement, err := os.OpenFile(journal.path+".new", os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0755)
if err != nil {
return err
}
journaled := 0
for _, txs := range all {
for _, tx := range txs {
if err = rlp.Encode(replacement, tx); err != nil {
replacement.Close()
return err
}
}
journaled += len(txs)
}
replacement.Close()

// Replace the live journal with the newly generated one
if err = os.Rename(journal.path+".new", journal.path); err != nil {
return err
}
sink, err := os.OpenFile(journal.path, os.O_WRONLY|os.O_APPEND, 0755)
if err != nil {
return err
}
journal.writer = sink
log.Info("Regenerated local transaction journal", "transactions", journaled, "accounts", len(all))

return nil
}

// close flushes the transaction journal contents to disk and closes the file.
func (journal *txJournal) close() error {
var err error

if journal.writer != nil {
err = journal.writer.Close()
journal.writer = nil
}
return err
}
Loading