From 46c4bde80ffe1e7c1def1cab46c08bf10ce8f90a Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Mon, 16 Sep 2019 10:03:21 +0000 Subject: [PATCH 01/17] set max block generation interval to 1 second --- miner/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/worker.go b/miner/worker.go index aa9a1a970614..f6cef1836df7 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1161,7 +1161,7 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) num := parent.Number() num.Add(num, common.Big1) ts := w.ancestorTimes(num) - if dt, pt := w.throttleMining(ts); dt > 0 && pt < 5 { + if dt, pt := w.throttleMining(ts); dt > 0 && pt < 1 { // sleep 1 second here to prevent unnecessary checks log.Info("Metadium: too many blocks", "ahead", dt) time.Sleep(time.Second) From 8570863e76d8ce4d33edcb5cf38ec42cba7cadda Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Mon, 16 Sep 2019 10:45:20 +0000 Subject: [PATCH 02/17] 1. moved some of transaction verification out of lock: tx_pool.go:preValidate. 2. added tx_pool.go:ResolveSenders. It resolves sender accounts from transactions concurrently with worker threads --- core/tx_pool.go | 117 +++++++++++++++++++++++------- core/tx_sender_resolver.go | 115 +++++++++++++++++++++++++++++ core/types/transaction_signing.go | 11 +++ eth/handler.go | 6 ++ eth/protocol.go | 3 + 5 files changed, 226 insertions(+), 26 deletions(-) create mode 100644 core/tx_sender_resolver.go diff --git a/core/tx_pool.go b/core/tx_pool.go index a165d9149679..54ee6961dc4c 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -21,6 +21,7 @@ import ( "fmt" "math" "math/big" + "runtime" "sort" "sync" "time" @@ -227,6 +228,8 @@ type TxPool struct { all *txLookup // All transactions to allow lookups priced *txPricedList // All transactions sorted by price + senderResolver *SenderResolver // ecrecover helper + wg sync.WaitGroup // for shutdown sync homestead bool @@ -240,16 +243,17 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block // Create the transaction pool with its initial settings pool := &TxPool{ - config: config, - chainconfig: chainconfig, - chain: chain, - signer: types.NewEIP155Signer(chainconfig.ChainID), - pending: make(map[common.Address]*txList), - queue: make(map[common.Address]*txList), - beats: make(map[common.Address]time.Time), - all: newTxLookup(), - chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), - gasPrice: new(big.Int).SetUint64(config.PriceLimit), + config: config, + chainconfig: chainconfig, + chain: chain, + signer: types.NewEIP155Signer(chainconfig.ChainID), + pending: make(map[common.Address]*txList), + queue: make(map[common.Address]*txList), + beats: make(map[common.Address]time.Time), + all: newTxLookup(), + chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), + gasPrice: new(big.Int).SetUint64(config.PriceLimit), + senderResolver: NewSenderResolver(runtime.NumCPU()*2), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -259,6 +263,9 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block pool.priced = newTxPricedList(pool.all) pool.reset(nil, chain.CurrentBlock().Header()) + // Start the ecrecover helper + go pool.senderResolver.Run() + // If local transactions and journaling is enabled, load from disk if !config.NoLocals && config.Journal != "" { pool.journal = newTxJournal(config.Journal) @@ -437,7 +444,7 @@ func (pool *TxPool) reset(oldHead, newHead *types.Header) { // Inject any transactions discarded due to reorgs log.Debug("Reinjecting stale transactions", "count", len(reinject)) senderCacher.recover(pool.signer, reinject) - pool.addTxsLocked(reinject, false) + pool.addTxsLocked(reinject, false, nil) // validate the pool of pending transactions, this will remove // any transactions that have been included in the block or @@ -467,6 +474,10 @@ func (pool *TxPool) Stop() { if pool.journal != nil { pool.journal.close() } + + // Stop ecrecover helper + pool.senderResolver.Stop() + log.Info("Transaction pool stopped") } @@ -611,9 +622,12 @@ func (pool *TxPool) local() map[common.Address]types.Transactions { return txs } -// validateTx checks whether a transaction is valid according to the consensus -// rules and adheres to some heuristic limits of the local node (price and size). -func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { +// Do the most of validation outside of mutex +func preValidateTx(tx *types.Transaction, signer types.Signer, maxGas uint64, homestead bool) error { + // Check nonce limit + if params.NonceLimit != 0 && tx.Nonce() > params.NonceLimit { + return fmt.Errorf("Too many transactions (%d) for an account", params.NonceLimit) + } // Heuristic limit, reject transactions over 32KB to prevent DOS attacks if tx.Size() > params.MaxTransactionSize { return ErrOversizedData @@ -624,9 +638,27 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { return ErrNegativeValue } // Ensure the transaction doesn't exceed the current block limit gas. - if pool.currentMaxGas < tx.Gas() { + if maxGas < tx.Gas() { return ErrGasLimit } + // Make sure the transaction is signed properly + _, err := types.Sender(signer, tx) + if err != nil { + return ErrInvalidSender + } + intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, homestead) + if err != nil { + return err + } + if tx.Gas() < intrGas { + return ErrIntrinsicGas + } + return nil +} + +// validateTx checks whether a transaction is valid according to the consensus +// rules and adheres to some heuristic limits of the local node (price and size). +func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { // Make sure the transaction is signed properly from, err := types.Sender(pool.signer, tx) if err != nil { @@ -646,13 +678,6 @@ func (pool *TxPool) validateTx(tx *types.Transaction, local bool) error { if pool.currentState.GetBalance(from).Cmp(tx.Cost()) < 0 { return ErrInsufficientFunds } - intrGas, err := IntrinsicGas(tx.Data(), tx.To() == nil, pool.homestead) - if err != nil { - return err - } - if tx.Gas() < intrGas { - return ErrIntrinsicGas - } return nil } @@ -824,6 +849,9 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. func (pool *TxPool) AddLocal(tx *types.Transaction) error { + var txs []*types.Transaction + txs = append(txs, tx) + pool.ResolveSenders(pool.signer, txs, false) return pool.addTx(tx, !pool.config.NoLocals) } @@ -838,6 +866,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { + pool.ResolveSenders(pool.signer, txs, false) return pool.addTxs(txs, !pool.config.NoLocals) } @@ -845,11 +874,16 @@ func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { // If the senders are not among the locally tracked ones, full pricing constraints // will apply. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { + pool.ResolveSenders(pool.signer, txs, false) return pool.addTxs(txs, false) } // addTx enqueues a single transaction into the pool if it is valid. func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { + if err := preValidateTx(tx, pool.signer, pool.currentMaxGas, pool.homestead); err != nil { + return err + } + pool.mu.Lock() defer pool.mu.Unlock() @@ -868,20 +902,35 @@ func (pool *TxPool) addTx(tx *types.Transaction, local bool) error { // addTxs attempts to queue a batch of transactions if they are valid. func (pool *TxPool) addTxs(txs []*types.Transaction, local bool) []error { + errs := make([]error, len(txs)) + for i, tx := range txs { + if err := preValidateTx(tx, pool.signer, pool.currentMaxGas, pool.homestead); err != nil { + errs[i] = err + } + } + pool.mu.Lock() defer pool.mu.Unlock() - return pool.addTxsLocked(txs, local) + pool.addTxsLocked(txs, local, errs) + return errs } // addTxsLocked attempts to queue a batch of transactions if they are valid, // whilst assuming the transaction pool lock is already held. -func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { +func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool, errs []error) { // Add the batch of transactions, tracking the accepted ones dirty := make(map[common.Address]struct{}) - errs := make([]error, len(txs)) + if errs == nil { + errs = make([]error, len(txs)) + } for i, tx := range txs { + if errs[i] != nil { + // already rejected + continue + } + var replace bool if replace, errs[i] = pool.add(tx, local); errs[i] == nil && !replace { from, _ := types.Sender(pool.signer, tx) // already validated @@ -896,7 +945,7 @@ func (pool *TxPool) addTxsLocked(txs []*types.Transaction, local bool) []error { } pool.promoteExecutables(addrs) } - return errs + return } // Status returns the status (unknown/pending/queued) of a batch of transactions @@ -1316,3 +1365,19 @@ func (t *txLookup) Remove(hash common.Hash) { delete(t.all, hash) } + +// Resolve senders +func (t *txLookup) ResolveSenders(signer types.Signer, txs []*types.Transaction) []*common.Address { + t.lock.Lock() + defer t.lock.Unlock() + + addrs := make([]*common.Address, len(txs)) + for i, d := range txs { + if tx, ok := t.all[d.Hash()]; ok { + if addr, err := types.Sender(signer, tx); err == nil { + addrs[i] = &addr + } + } + } + return addrs +} diff --git a/core/tx_sender_resolver.go b/core/tx_sender_resolver.go new file mode 100644 index 000000000000..2a2350e624c7 --- /dev/null +++ b/core/tx_sender_resolver.go @@ -0,0 +1,115 @@ +// tx_sender_resolver + +package core + +import ( + _ "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// SenderResolver resolves sender accounts from transactions concurrently +// with worker threads +type SenderResolver struct { + jobs chan func() + busy chan interface{} +} + +// NewSenderResolver creates a new sender resolver worker pool +func NewSenderResolver(count int) *SenderResolver { + return &SenderResolver{ + jobs: make(chan func(), count), + busy: make(chan interface{}, count), + } +} + +// sender resolver main loop +func (s *SenderResolver) Run() { + eor := false + for { + select { + case f := <-s.jobs: + if f == nil { + eor = true + } else { + go func() { + s.busy <- struct{}{} + defer func() { + <-s.busy + }() + f() + }() + } + } + if eor { + break + } + } +} + +// Stop stops sender resolver +func (s *SenderResolver) Stop() { + s.jobs <- nil +} + +// Post a new sender resolver task +func (s *SenderResolver) Post(f func()) { + s.jobs <- f +} + +// ResolveSenders resolves sender accounts from given transactions +// concurrently using SenderResolver worker pool. If 'checkPool' is true, +// it checks the pending pool first +func (pool *TxPool) ResolveSenders(signer types.Signer, txs []*types.Transaction, checkPool bool) []*common.Address { + var total, by_ecrecover, failed int64 + total = int64(len(txs)) + ot := time.Now() + + var addrs []*common.Address + if checkPool { + addrs = pool.all.ResolveSenders(signer, txs) + } else { + addrs = make([]*common.Address, len(txs)) + } + + var wg sync.WaitGroup + for i, addr := range addrs { + if addr != nil { + types.SetSender(signer, txs[i], *addr) + continue + } + atomic.AddInt64(&by_ecrecover, 1) + + wg.Add(1) + + f := func(ix int) { + if from, err := types.Sender(signer, txs[ix]); err == nil { + addrs[ix] = &from + } else { + atomic.AddInt64(&failed, 1) + } + wg.Done() + } + pool.senderResolver.Post(func() { f(i) }) + } + + wg.Wait() + + if total > 1 { + dt := float64(time.Now().Sub(ot) / time.Millisecond) + if dt <= 0 { + dt = 1 + } + ps := float64(total) * 1000.0 / dt + _ = ps + //fmt.Printf("=== %d/%d/%d : took %.3f ms %.3f/sec\n", total, total-by_ecrecover, failed, dt, ps) + } + + return addrs +} + +// EOF diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index 63132048ee72..820b7c19491a 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -88,6 +88,17 @@ func Sender(signer Signer, tx *Transaction) (common.Address, error) { return addr, nil } +// set sender +func SetSender(signer Signer, tx *Transaction, from common.Address) { + if sc := tx.from.Load(); sc != nil { + sigCache := sc.(sigCache) + if sigCache.signer.Equal(signer) { + return + } + } + tx.from.Store(sigCache{signer: signer, from: from}) +} + // Signer encapsulates transaction signature handling. Note that this interface is not a // stable API and may change at any time to accommodate new protocol rules. type Signer interface { diff --git a/eth/handler.go b/eth/handler.go index 68d86ad00796..0d693240b2c9 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -597,6 +597,12 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if filter { transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) } + if len(transactions) > 0 { + signer := types.MakeSigner(pm.chainconfig, pm.blockchain.CurrentBlock().Number()) + for _, txs := range transactions { + pm.txpool.ResolveSenders(signer, txs, true) + } + } if len(transactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, transactions, uncles) if err != nil { diff --git a/eth/protocol.go b/eth/protocol.go index 30d5fc59ebde..7943c844a615 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -115,6 +115,9 @@ type txPool interface { // SubscribeNewTxsEvent should return an event subscription of // NewTxsEvent and send events to the given channel. SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription + + // Resolve senders from transaction pool + ResolveSenders(types.Signer, []*types.Transaction, bool) []*common.Address } // statusData is the network packet for the status message. From 6bf600bde9d0e019ca031c64826b357701abf4c9 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 07:14:05 +0000 Subject: [PATCH 03/17] added lru cache library --- {metadium/miner => common/lru}/lrucache.go | 34 ++++++++++++++++++---- 1 file changed, 29 insertions(+), 5 deletions(-) rename {metadium/miner => common/lru}/lrucache.go (64%) diff --git a/metadium/miner/lrucache.go b/common/lru/lrucache.go similarity index 64% rename from metadium/miner/lrucache.go rename to common/lru/lrucache.go index 3aae40572b21..03edffcd81d0 100644 --- a/metadium/miner/lrucache.go +++ b/common/lru/lrucache.go @@ -1,6 +1,6 @@ // lrucache.go -package miner +package lru import ( "container/list" @@ -9,22 +9,31 @@ import ( type LruCache struct { lock *sync.RWMutex - max int - count int + max int // max count + count int // current count + fifo bool // if true, element order is not updated on access lru *list.List data map[interface{}]interface{} } -func NewLruCache(max int) *LruCache { +// NewLruCache creates LruCache +func NewLruCache(max int, fifo bool) *LruCache { return &LruCache{ lock: &sync.RWMutex{}, max: max, count: 0, + fifo: fifo, lru: list.New(), data: map[interface{}]interface{}{}, } } +// Count returns the current count of elements +func (c *LruCache) Count() int { + return c.count +} + +// Put adds a key-value pair func (c *LruCache) Put(key, value interface{}) { c.lock.Lock() defer c.lock.Unlock() @@ -49,6 +58,7 @@ func (c *LruCache) Put(key, value interface{}) { } } +// Get returns a value with the given key if present, nil otherwise. func (c *LruCache) Get(key interface{}) interface{} { c.lock.RLock() defer c.lock.RUnlock() @@ -58,11 +68,14 @@ func (c *LruCache) Get(key interface{}) interface{} { return nil } else { e := _e.(*list.Element) - c.lru.MoveToFront(e) + if !c.fifo { + c.lru.MoveToFront(e) + } return e.Value.([]interface{})[1] } } +// Exists checks if a key exists. func (c *LruCache) Exists(key interface{}) bool { c.lock.RLock() defer c.lock.RUnlock() @@ -70,6 +83,7 @@ func (c *LruCache) Exists(key interface{}) bool { return ok } +// Del deletes a key-value pair if present. It returns true iff it's present. func (c *LruCache) Del(key interface{}) bool { c.lock.Lock() defer c.lock.Unlock() @@ -87,4 +101,14 @@ func (c *LruCache) Del(key interface{}) bool { } } +// Clear resets the lru +func (c *LruCache) Clear() { + c.lock.Lock() + defer c.lock.Unlock() + + c.count = 0 + c.lru = list.New() + c.data = map[interface{}]interface{}{} +} + // EOF From 5aed92de0e500869af4c0cf07622a894631e1877 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 07:14:48 +0000 Subject: [PATCH 04/17] added batch library --- common/batch/batch.go | 149 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 149 insertions(+) create mode 100644 common/batch/batch.go diff --git a/common/batch/batch.go b/common/batch/batch.go new file mode 100644 index 000000000000..990f270c8a8b --- /dev/null +++ b/common/batch/batch.go @@ -0,0 +1,149 @@ +// batch.go + +package batch + +import ( + "sync/atomic" + "time" + + "github.com/ethereum/go-ethereum/log" +) + +type Batch struct { + toInterval time.Duration + timeout time.Duration + batchCount int + ch chan interface{} + count int32 + f func(interface{}, int) error +} + +func NewBatch(toInterval, timeout time.Duration, batchCount int, f func(interface{}, int) error) *Batch { + return &Batch{ + toInterval: toInterval, + timeout: timeout, + batchCount: batchCount, + ch: make(chan interface{}, batchCount*10), + count: 0, + f: f, + } +} + +func (b *Batch) Run() { + var ( + data []interface{} + lt time.Time = time.Now() // last time + ln int = 0 // last count + ) + + timer := time.NewTimer(0) + <-timer.C // drain the initial timeout + + eod := false + for { + itstimer := false + fire := false + + select { + case d := <-b.ch: + atomic.AddInt32(&b.count, -1) + if d == nil { + eod = true + } else { + data = append(data, d) + } + case <-timer.C: + itstimer = true + } + last := b.count == 0 + + if eod { + break + } + + // when to fire + // 1. timer fired + // 1.1 no count change + // 1.2 more than 50 ms passed from the initial + // 2. count >= 100 + + if !itstimer { + if ln == 0 { + lt = time.Now() + ln = len(data) + timer.Stop() + timer.Reset(b.toInterval) + } else if len(data) >= b.batchCount { + fire = true + } + } else if last { + et := time.Since(lt) + if (len(data) == ln && et > b.toInterval) || et > b.timeout { + fire = true + } + } + + if fire { + if len(data) < b.batchCount { + // do it + e := b.f(data, len(data)) + if e != nil { + log.Error("Metadium Server", "Failed", e) + } else { + log.Debug("Metadium Server", "Count", len(data)) + } + data = nil + } else { + for { + if len(data) < b.batchCount { + break + } + + // do it + e := b.f(data, b.batchCount) + if e != nil { + log.Error("Metadium Server", "Failed", e) + } else { + log.Debug("Metadium Server", "Count", b.batchCount) + } + data = data[b.batchCount:] + } + } + } + + lt = time.Now() + ln = len(data) + + if itstimer && ln != 0 { + timer.Reset(b.toInterval) + } else if !itstimer && ln == 0 { + timer.Stop() + } + } + + // got eod, flush the remaining data + for len(data) > 0 { + l := len(data) + if l > b.batchCount { + l = b.batchCount + } + e := b.f(data, l) + if e != nil { + log.Error("Metadium Server", "Failed", e) + } else { + log.Debug("Metadium Server", "Count", l) + } + data = data[l:] + } +} + +func (b *Batch) Stop() { + b.ch <- nil +} + +func (b *Batch) Put(data interface{}) { + b.ch <- data + atomic.AddInt32(&b.count, 1) +} + +// EOF From b1cc4054fb01ed10b48bc920c30d2e4f82df66c9 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 07:16:41 +0000 Subject: [PATCH 05/17] overhauled sender resolver routine --- core/tx_pool.go | 30 +++-------- core/tx_sender_resolver.go | 88 ++++++++++++++++++------------- core/types/transaction_signing.go | 14 +++++ eth/handler.go | 4 +- eth/peer.go | 26 +++++++-- eth/protocol.go | 2 +- miner/worker.go | 2 +- 7 files changed, 98 insertions(+), 68 deletions(-) diff --git a/core/tx_pool.go b/core/tx_pool.go index 54ee6961dc4c..7547a75d2437 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -39,6 +39,9 @@ import ( const ( // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 + + // tx -> address cache size + tx2addrCacheSize = 1024000 ) var ( @@ -253,7 +256,7 @@ func NewTxPool(config TxPoolConfig, chainconfig *params.ChainConfig, chain block all: newTxLookup(), chainHeadCh: make(chan ChainHeadEvent, chainHeadChanSize), gasPrice: new(big.Int).SetUint64(config.PriceLimit), - senderResolver: NewSenderResolver(runtime.NumCPU()*2), + senderResolver: NewSenderResolver(runtime.NumCPU()*2, tx2addrCacheSize ), } pool.locals = newAccountSet(pool.signer) for _, addr := range config.Locals { @@ -849,9 +852,7 @@ func (pool *TxPool) promoteTx(addr common.Address, hash common.Hash, tx *types.T // the sender as a local one in the mean time, ensuring it goes around the local // pricing constraints. func (pool *TxPool) AddLocal(tx *types.Transaction) error { - var txs []*types.Transaction - txs = append(txs, tx) - pool.ResolveSenders(pool.signer, txs, false) + pool.ResolveSender(pool.signer, tx) return pool.addTx(tx, !pool.config.NoLocals) } @@ -859,6 +860,7 @@ func (pool *TxPool) AddLocal(tx *types.Transaction) error { // sender is not among the locally tracked ones, full pricing constraints will // apply. func (pool *TxPool) AddRemote(tx *types.Transaction) error { + pool.ResolveSender(pool.signer, tx) return pool.addTx(tx, false) } @@ -866,7 +868,7 @@ func (pool *TxPool) AddRemote(tx *types.Transaction) error { // marking the senders as a local ones in the mean time, ensuring they go around // the local pricing constraints. func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { - pool.ResolveSenders(pool.signer, txs, false) + pool.ResolveSenders(pool.signer, txs) return pool.addTxs(txs, !pool.config.NoLocals) } @@ -874,7 +876,7 @@ func (pool *TxPool) AddLocals(txs []*types.Transaction) []error { // If the senders are not among the locally tracked ones, full pricing constraints // will apply. func (pool *TxPool) AddRemotes(txs []*types.Transaction) []error { - pool.ResolveSenders(pool.signer, txs, false) + pool.ResolveSenders(pool.signer, txs) return pool.addTxs(txs, false) } @@ -1365,19 +1367,3 @@ func (t *txLookup) Remove(hash common.Hash) { delete(t.all, hash) } - -// Resolve senders -func (t *txLookup) ResolveSenders(signer types.Signer, txs []*types.Transaction) []*common.Address { - t.lock.Lock() - defer t.lock.Unlock() - - addrs := make([]*common.Address, len(txs)) - for i, d := range txs { - if tx, ok := t.all[d.Hash()]; ok { - if addr, err := types.Sender(signer, tx); err == nil { - addrs[i] = &addr - } - } - } - return addrs -} diff --git a/core/tx_sender_resolver.go b/core/tx_sender_resolver.go index 2a2350e624c7..65eb7e7e3f5e 100644 --- a/core/tx_sender_resolver.go +++ b/core/tx_sender_resolver.go @@ -3,27 +3,37 @@ package core import ( - _ "fmt" + "fmt" "sync" "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" ) +// job structure is needed because param is necessary due to evaluation order +// uncertainty with closure and channel. +type job struct { + f func(interface{}) + param interface{} +} + // SenderResolver resolves sender accounts from transactions concurrently // with worker threads type SenderResolver struct { - jobs chan func() - busy chan interface{} + tx2addr *lru.LruCache + jobs chan *job + busy chan interface{} } // NewSenderResolver creates a new sender resolver worker pool -func NewSenderResolver(count int) *SenderResolver { +func NewSenderResolver(concurrency, cacheSize int) *SenderResolver { return &SenderResolver{ - jobs: make(chan func(), count), - busy: make(chan interface{}, count), + tx2addr: lru.NewLruCache(cacheSize, true), + jobs: make(chan *job, concurrency), + busy: make(chan interface{}, concurrency), } } @@ -32,8 +42,8 @@ func (s *SenderResolver) Run() { eor := false for { select { - case f := <-s.jobs: - if f == nil { + case j := <-s.jobs: + if j == nil { eor = true } else { go func() { @@ -41,7 +51,7 @@ func (s *SenderResolver) Run() { defer func() { <-s.busy }() - f() + j.f(j.param) }() } } @@ -57,59 +67,63 @@ func (s *SenderResolver) Stop() { } // Post a new sender resolver task -func (s *SenderResolver) Post(f func()) { - s.jobs <- f +func (s *SenderResolver) Post(f func(interface{}), p interface{}) { + s.jobs <- &job{f: f, param: p} } // ResolveSenders resolves sender accounts from given transactions -// concurrently using SenderResolver worker pool. If 'checkPool' is true, -// it checks the pending pool first -func (pool *TxPool) ResolveSenders(signer types.Signer, txs []*types.Transaction, checkPool bool) []*common.Address { - var total, by_ecrecover, failed int64 - total = int64(len(txs)) +// concurrently using SenderResolver worker pool. +func (pool *TxPool) ResolveSenders(signer types.Signer, txs []*types.Transaction) { ot := time.Now() - - var addrs []*common.Address - if checkPool { - addrs = pool.all.ResolveSenders(signer, txs) - } else { - addrs = make([]*common.Address, len(txs)) - } + s := pool.senderResolver + var total, by_ecrecover, failed int64 = int64(len(txs)), 0, 0 var wg sync.WaitGroup - for i, addr := range addrs { - if addr != nil { - types.SetSender(signer, txs[i], *addr) + for _, tx := range txs { + hash := tx.Hash() + if addr := types.GetSender(signer, tx); addr != nil { + s.tx2addr.Put(hash, *addr) continue } - atomic.AddInt64(&by_ecrecover, 1) - wg.Add(1) + data := s.tx2addr.Get(hash) + if data != nil { + types.SetSender(signer, tx, data.(common.Address)) + continue + } - f := func(ix int) { - if from, err := types.Sender(signer, txs[ix]); err == nil { - addrs[ix] = &from + wg.Add(1) + atomic.AddInt64(&by_ecrecover, 1) + s.Post(func(param interface{}) { + t := param.(*types.Transaction) + if from, err := types.Sender(signer, t); err == nil { + s.tx2addr.Put(t.Hash(), from) } else { atomic.AddInt64(&failed, 1) } wg.Done() - } - pool.senderResolver.Post(func() { f(i) }) + }, tx) } wg.Wait() - if total > 1 { + if false && total > 1 { dt := float64(time.Now().Sub(ot) / time.Millisecond) if dt <= 0 { dt = 1 } ps := float64(total) * 1000.0 / dt - _ = ps - //fmt.Printf("=== %d/%d/%d : took %.3f ms %.3f/sec\n", total, total-by_ecrecover, failed, dt, ps) + fmt.Printf("=== %d/%d/%d : took %.3f ms %.3f/sec %d\n", total, total-by_ecrecover, failed, dt, ps, s.tx2addr.Count()) } - return addrs + return +} + +// ResolveSender resolves sender address from a transaction +func (pool *TxPool) ResolveSender(signer types.Signer, tx *types.Transaction) { + var txs []*types.Transaction + txs = append(txs, tx) + pool.ResolveSenders(signer, txs) } // EOF diff --git a/core/types/transaction_signing.go b/core/types/transaction_signing.go index 820b7c19491a..9e73dc3bb4e5 100644 --- a/core/types/transaction_signing.go +++ b/core/types/transaction_signing.go @@ -99,6 +99,20 @@ func SetSender(signer Signer, tx *Transaction, from common.Address) { tx.from.Store(sigCache{signer: signer, from: from}) } +// get sender +func GetSender(signer Signer, tx *Transaction) *common.Address { + if sc := tx.from.Load(); sc != nil { + sigCache := sc.(sigCache) + // If the signer used to derive from in a previous + // call is not the same as used current, invalidate + // the cache. + if sigCache.signer.Equal(signer) { + return &sigCache.from + } + } + return nil +} + // Signer encapsulates transaction signature handling. Note that this interface is not a // stable API and may change at any time to accommodate new protocol rules. type Signer interface { diff --git a/eth/handler.go b/eth/handler.go index 0d693240b2c9..76c894857586 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -51,7 +51,7 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. - txChanSize = 40960 + txChanSize = 102400 // minimim number of peers to broadcast new blocks to minBroadcastPeers = 4 @@ -600,7 +600,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { if len(transactions) > 0 { signer := types.MakeSigner(pm.chainconfig, pm.blockchain.CurrentBlock().Number()) for _, txs := range transactions { - pm.txpool.ResolveSenders(signer, txs, true) + pm.txpool.ResolveSenders(signer, txs) } } if len(transactions) > 0 || len(uncles) > 0 || !filter { diff --git a/eth/peer.go b/eth/peer.go index 4ae7ead45960..ad76f8f6a98f 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -25,6 +25,7 @@ import ( mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/batch" "github.com/ethereum/go-ethereum/core/types" metaapi "github.com/ethereum/go-ethereum/metadium/api" "github.com/ethereum/go-ethereum/p2p" @@ -38,13 +39,13 @@ var ( ) const ( - maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownTxs = 1024000 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) // maxQueuedTxs is the maximum number of transaction lists to queue up before // dropping broadcasts. This is a sensitive number as a transaction list might // contain a single transaction, or thousands. - maxQueuedTxs = 128 + maxQueuedTxs = 40960 // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few @@ -113,13 +114,28 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { // and transaction broadcasts into the remote peer. The goal is to have an async // writer that does not lock up node internals. func (p *peer) broadcast() { + b := batch.NewBatch(time.Millisecond * 10, time.Millisecond * 30, 1000, + func(data interface{}, count int) error { + var txs []*types.Transaction + for _, i := range data.([]interface{}) { + txs = append(txs, i.(*types.Transaction)) + } + + if err := p.SendTransactions(txs); err != nil { + return err + } + p.Log().Trace("Broadcast transactions", "count", len(txs)) + return nil + }) + defer b.Stop() + go b.Run() + for { select { case txs := <-p.queuedTxs: - if err := p.SendTransactions(txs); err != nil { - return + for _, i := range txs { + b.Put(i) } - p.Log().Trace("Broadcast transactions", "count", len(txs)) case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { diff --git a/eth/protocol.go b/eth/protocol.go index 7943c844a615..c370594f7bf4 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -117,7 +117,7 @@ type txPool interface { SubscribeNewTxsEvent(chan<- core.NewTxsEvent) event.Subscription // Resolve senders from transaction pool - ResolveSenders(types.Signer, []*types.Transaction, bool) []*common.Address + ResolveSenders(types.Signer, []*types.Transaction) } // statusData is the network packet for the status message. diff --git a/miner/worker.go b/miner/worker.go index f6cef1836df7..57f54173538f 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -43,7 +43,7 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. - txChanSize = 4096 + txChanSize = 102400 // chainHeadChanSize is the size of channel listening to ChainHeadEvent. chainHeadChanSize = 10 From b8d83b11e6c2c99b2f859536848d2429988d5968 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 07:35:34 +0000 Subject: [PATCH 06/17] fixed renamed package name --- miner/tx_prefetch.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/miner/tx_prefetch.go b/miner/tx_prefetch.go index d1e3316420d2..218f1a62660a 100644 --- a/miner/tx_prefetch.go +++ b/miner/tx_prefetch.go @@ -7,15 +7,15 @@ import ( "math" "time" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/log" - metaminer "github.com/ethereum/go-ethereum/metadium/miner" ) var ( - doneTxs = metaminer.NewLruCache(5000) + doneTxs = lru.NewLruCache(5000, false) ) func tx_prefetch(w *worker, to *TxOrderer, numWorkers int) { From f76d5a560797fbf5bc148bcb8c032c329a5d1ebb Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 08:31:31 +0000 Subject: [PATCH 07/17] replaced faulty mapset.Set with lrucache --- eth/peer.go | 36 +++++++++++++++--------------------- 1 file changed, 15 insertions(+), 21 deletions(-) diff --git a/eth/peer.go b/eth/peer.go index ad76f8f6a98f..6e935bafc968 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -23,9 +23,9 @@ import ( "sync" "time" - mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/batch" + "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" metaapi "github.com/ethereum/go-ethereum/metadium/api" "github.com/ethereum/go-ethereum/p2p" @@ -87,8 +87,8 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer + knownTxs *lru.LruCache // Set of transaction hashes known to be known by this peer + knownBlocks *lru.LruCache // Set of block hashes known to be known by this peer queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer queuedProps chan *propEvent // Queue of blocks to broadcast to the peer queuedAnns chan *types.Block // Queue of blocks to announce to the peer @@ -101,8 +101,8 @@ func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { rw: rw, version: version, id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), - knownTxs: mapset.NewSet(), - knownBlocks: mapset.NewSet(), + knownTxs: lru.NewLruCache(maxKnownTxs, true), + knownBlocks: lru.NewLruCache(maxKnownBlocks, true), queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), queuedProps: make(chan *propEvent, maxQueuedProps), queuedAnns: make(chan *types.Block, maxQueuedAnns), @@ -194,27 +194,21 @@ func (p *peer) SetHead(hash common.Hash, td *big.Int) { // never be propagated to this particular peer. func (p *peer) MarkBlock(hash common.Hash) { // If we reached the memory allowance, drop a previously known block hash - for p.knownBlocks.Cardinality() >= maxKnownBlocks { - p.knownBlocks.Pop() - } - p.knownBlocks.Add(hash) + p.knownBlocks.Put(hash, true) } // MarkTransaction marks a transaction as known for the peer, ensuring that it // will never be propagated to this particular peer. func (p *peer) MarkTransaction(hash common.Hash) { // If we reached the memory allowance, drop a previously known transaction hash - for p.knownTxs.Cardinality() >= maxKnownTxs { - p.knownTxs.Pop() - } - p.knownTxs.Add(hash) + p.knownTxs.Put(hash, true) } // SendTransactions sends transactions to the peer and includes the hashes // in its transaction hash set for future reference. func (p *peer) SendTransactions(txs types.Transactions) error { for _, tx := range txs { - p.knownTxs.Add(tx.Hash()) + p.knownTxs.Put(tx.Hash(), true) } return p2p.Send(p.rw, TxMsg, txs) } @@ -225,7 +219,7 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { select { case p.queuedTxs <- txs: for _, tx := range txs { - p.knownTxs.Add(tx.Hash()) + p.knownTxs.Put(tx.Hash(), true) } default: p.Log().Debug("Dropping transaction propagation", "count", len(txs)) @@ -243,7 +237,7 @@ func (p *peer) resendPendingTxs(txs map[common.Address]types.Transactions) { // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { for _, hash := range hashes { - p.knownBlocks.Add(hash) + p.knownBlocks.Put(hash, true) } request := make(newBlockHashesData, len(hashes)) for i := 0; i < len(hashes); i++ { @@ -259,7 +253,7 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error func (p *peer) AsyncSendNewBlockHash(block *types.Block) { select { case p.queuedAnns <- block: - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) default: p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) } @@ -267,7 +261,7 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) { // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } @@ -276,7 +270,7 @@ func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { select { case p.queuedProps <- &propEvent{block: block, td: td}: - p.knownBlocks.Add(block.Hash()) + p.knownBlocks.Put(block.Hash(), true) default: p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) } @@ -522,7 +516,7 @@ func (ps *peerSet) PeersWithoutBlock(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownBlocks.Contains(hash) { + if !p.knownBlocks.Exists(hash) { list = append(list, p) } } @@ -537,7 +531,7 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { list := make([]*peer, 0, len(ps.peers)) for _, p := range ps.peers { - if !p.knownTxs.Contains(hash) { + if !p.knownTxs.Exists(hash) { list = append(list, p) } } From fab71b0e0d1f68484229ccb5a6b1857fe337e1f5 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 18 Sep 2019 13:20:43 +0000 Subject: [PATCH 08/17] Added TxExMsg message type to send derived sender address along with transaction itself to avoid doing signature verification. If it comes from non-trusted node, signature verification is enforced. --- core/types/transaction.go | 59 +++++++++++++++++++++++++++++++++++++++ eth/handler.go | 28 +++++++++++++++++++ eth/peer.go | 6 +++- eth/protocol.go | 6 ++-- 4 files changed, 96 insertions(+), 3 deletions(-) diff --git a/core/types/transaction.go b/core/types/transaction.go index ba3d5de91de1..4cfa639ba22f 100644 --- a/core/types/transaction.go +++ b/core/types/transaction.go @@ -43,6 +43,11 @@ type Transaction struct { from atomic.Value } +type TransactionEx struct { + Tx *Transaction + From common.Address `json:"from" rlp:"nil"` +} + type txdata struct { AccountNonce uint64 `json:"nonce" gencodec:"required"` Price *big.Int `json:"gasPrice" gencodec:"required"` @@ -212,6 +217,60 @@ func (tx *Transaction) Size() common.StorageSize { return common.StorageSize(c) } +// Convert []*Transaction to []*TransactionEx +func Txs2TxExs(txs []*Transaction) []*TransactionEx { + var out []*TransactionEx + for _, i := range txs { + var from common.Address + if sc := i.from.Load(); sc != nil { + from = sc.(sigCache).from + } + j := &TransactionEx{ + Tx: i, + From: from, + } + out = append(out, j) + } + return out +} + +// Convert []*TransactionEx to []*Transaction +func TxExs2Txs(signer Signer, txs []*TransactionEx, trustIt bool) []*Transaction { + var out []*Transaction + for _, i := range txs { + if trustIt { + i.Tx.from.Store(sigCache{signer: signer, from: i.From}) + } + out = append(out, i.Tx) + } + return out +} + +// EncodeRLP implements rlp.Encoder +func (tx *TransactionEx) EncodeRLP(w io.Writer) error { + if err := rlp.Encode(w, &tx.Tx.data); err != nil { + return err + } + var from common.Address + if sc := tx.Tx.from.Load(); sc != nil { + from = sc.(sigCache).from + } + return rlp.Encode(w, &from) +} + +// DecodeRLP implements rlp.Decoder +func (tx *TransactionEx) DecodeRLP(s *rlp.Stream) error { + tx.Tx = &Transaction{} + _, size, _ := s.Kind() + err := s.Decode(&tx.Tx.data) + if err != nil { + return err + } else { + tx.Tx.size.Store(common.StorageSize(rlp.ListSize(size))) + } + return s.Decode(&tx.From) +} + // AsMessage returns the transaction as a core.Message. // // AsMessage requires a signer to derive the sender. diff --git a/eth/handler.go b/eth/handler.go index 76c894857586..486ca6b8a320 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -832,6 +832,34 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { }() return nil + case p.version >= eth64 && msg.Code == TxExMsg: + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.acceptTxs) == 0 { + break + } + // Transactions can be processed, parse all of them and deliver to the pool + var txexs []*types.TransactionEx + if err := msg.Decode(&txexs); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + + // Metadium: it's non-blocking now + go func() error { + signer := types.MakeSigner(pm.chainconfig, pm.blockchain.CurrentBlock().Number()) + txs := types.TxExs2Txs(signer, txexs, metaminer.IsPartner(p.ID().String())) + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return errResp(ErrDecode, "transaction %d is nil", i) + } + p.MarkTransaction(tx.Hash()) + } + //pm.txpool.AddRemotes(txs) + remoteTxCh <- txs + return nil + }() + return nil + // Metadium: leader wants to get left-over transactions if any case p.version >= eth63 && msg.Code == GetPendingTxsMsg: if !metaminer.IsPartner(p.ID().String()) { diff --git a/eth/peer.go b/eth/peer.go index 6e935bafc968..2e67dd41ff6f 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -210,7 +210,11 @@ func (p *peer) SendTransactions(txs types.Transactions) error { for _, tx := range txs { p.knownTxs.Put(tx.Hash(), true) } - return p2p.Send(p.rw, TxMsg, txs) + if p.version >= eth64 { + return p2p.Send(p.rw, TxExMsg, types.Txs2TxExs(txs)) + } else { + return p2p.Send(p.rw, TxMsg, txs) + } } // AsyncSendTransactions queues list of transactions propagation to a remote diff --git a/eth/protocol.go b/eth/protocol.go index c370594f7bf4..aafdb8f4d723 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -32,16 +32,17 @@ import ( const ( eth62 = 62 eth63 = 63 + eth64 = 64 ) // ProtocolName is the official short name of the protocol used during capability negotiation. var ProtocolName = "meta" // ProtocolVersions are the supported versions of the eth protocol (first is primary). -var ProtocolVersions = []uint{eth63, eth62} +var ProtocolVersions = []uint{eth64, eth63, eth62} // ProtocolLengths are the number of implemented message corresponding to different protocol versions. -var ProtocolLengths = []uint64{22, 8} +var ProtocolLengths = []uint64{23, 22, 8} const ProtocolMaxMsgSize = 100 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -69,6 +70,7 @@ const ( StatusExMsg = 0x13 EtcdAddMemberMsg = 0x14 EtcdClusterMsg = 0x15 + TxExMsg = 0x16 ) type errCode int From 0c149101c285643bd5e7891cf046ab4053ef5536 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Thu, 12 Sep 2019 08:35:21 +0000 Subject: [PATCH 09/17] added admin.etcdGetWork() and admin.etcdDeleteWork() commands --- eth/api.go | 10 ++++++++++ internal/web3ext/web3ext.go | 10 ++++++++++ metadium/admin.go | 2 ++ metadium/api/api.go | 2 ++ metadium/etcdutil.go | 14 ++++++++++++++ 5 files changed, 38 insertions(+) diff --git a/eth/api.go b/eth/api.go index 05277d8998d5..c0a110b353e5 100644 --- a/eth/api.go +++ b/eth/api.go @@ -299,6 +299,16 @@ func (api *PrivateAdminAPI) EtcdMoveLeader(name string) error { return metaapi.EtcdMoveLeader(name) } +// Get the latest logged work +func (api *PrivateAdminAPI) EtcdGetWork() (string, error) { + return metaapi.EtcdGetWork() +} + +// Remove the latest logged work +func (api *PrivateAdminAPI) EtcdDeleteWork() error { + return metaapi.EtcdDeleteWork() +} + // Synchronize with the peer func (api *PrivateAdminAPI) SynchroniseWith(id enode.ID) error { return api.eth.protocolManager.SynchroniseWith(id) diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 0ff683aa56b7..02ad5ef043c8 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -249,6 +249,16 @@ web3._extend({ params: 1, inputFormatter: [null] }), + new web3._extend.Method({ + name: 'etcdGetWork', + call: 'admin_etcdGetWork', + params: 0 + }), + new web3._extend.Method({ + name: 'etcdDeleteWork', + call: 'admin_etcdDeleteWork', + params: 0 + }), ], properties: [ new web3._extend.Property({ diff --git a/metadium/admin.go b/metadium/admin.go index 25e8bc797ce3..237b530815fb 100644 --- a/metadium/admin.go +++ b/metadium/admin.go @@ -1441,6 +1441,8 @@ func init() { metaapi.EtcdRemoveMember = EtcdRemoveMember metaapi.EtcdJoin = EtcdJoin metaapi.EtcdMoveLeader = EtcdMoveLeader + metaapi.EtcdGetWork = EtcdGetWork + metaapi.EtcdDeleteWork = EtcdDeleteWork } /* EOF */ diff --git a/metadium/api/api.go b/metadium/api/api.go index d07464a7ea45..b3215388ea17 100644 --- a/metadium/api/api.go +++ b/metadium/api/api.go @@ -39,6 +39,8 @@ var ( EtcdRemoveMember func(name string) (string, error) EtcdJoin func(cluster string) error EtcdMoveLeader func(name string) error + EtcdGetWork func() (string, error) + EtcdDeleteWork func() error ) func SetMsgChannel(ch chan interface{}) { diff --git a/metadium/etcdutil.go b/metadium/etcdutil.go index 439989fef0dc..bfa563c46e43 100644 --- a/metadium/etcdutil.go +++ b/metadium/etcdutil.go @@ -639,4 +639,18 @@ func EtcdJoin(name string) error { return admin.etcdJoin(name) } +func EtcdGetWork() (string, error) { + if admin == nil { + return "", ErrNotRunning + } + return admin.etcdGet("metadium-work") +} + +func EtcdDeleteWork() error { + if admin == nil { + return ErrNotRunning + } + return admin.etcdDelete("metadium-work") +} + /* EOF */ From 601f34eb91e5a925ee961b48e88b717e4e8c4e75 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Fri, 20 Sep 2019 03:05:34 +0000 Subject: [PATCH 10/17] added hub subsystem --- cmd/geth/main.go | 1 + cmd/geth/usage.go | 1 + cmd/utils/flags.go | 8 ++ eth/handler.go | 2 +- eth/peer.go | 38 ++++++ metadium/admin.go | 16 +++ metadium/miner/miner.go | 9 ++ metadium/scripts/gmet.sh | 1 + msgq/msgq.go | 269 ++++++++++++++++++++++++++++++++++++++ params/protocol_params.go | 1 + 10 files changed, 345 insertions(+), 1 deletion(-) create mode 100644 msgq/msgq.go diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 880073934dad..4f427f373dc7 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -183,6 +183,7 @@ var ( utils.PrefetchCount, utils.LogFlag, utils.MaxTxsPerBlock, + utils.Hub, } ) diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 5e313d4d7e9b..b12e3116569c 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -250,6 +250,7 @@ var AppHelpFlagGroups = []flagGroup{ utils.PrefetchCount, utils.LogFlag, utils.MaxTxsPerBlock, + utils.Hub, }, }, { diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 391e52339326..115ef8e97219 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -690,6 +690,11 @@ var ( Usage: "Max # of transactions in a block", Value: params.MaxTxsPerBlock, } + Hub = cli.StringFlag{ + Name: "hub", + Usage: "Id of message hub", + Value: params.Hub, + } ) // MakeDataDir retrieves the currently requested data directory, terminating @@ -1385,6 +1390,9 @@ func SetMetadiumConfig(ctx *cli.Context, stack *node.Node, cfg *eth.Config) { if ctx.GlobalIsSet(MaxTxsPerBlock.Name) { params.MaxTxsPerBlock = ctx.GlobalInt(MaxTxsPerBlock.Name) } + if ctx.GlobalIsSet(Hub.Name) { + params.Hub = ctx.GlobalString(Hub.Name) + } if params.ConsensusMethod == params.ConsensusInvalid { params.ConsensusMethod = params.ConsensusPoW diff --git a/eth/handler.go b/eth/handler.go index 486ca6b8a320..f6fb24a88a49 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -998,7 +998,7 @@ func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { // Broadcast transactions to a batch of peers not knowing about it for _, tx := range txs { - peers := pm.peers.PeersWithoutTx(tx.Hash()) + peers := pm.peers.PeersWithoutTx2(tx.Hash()) for _, peer := range peers { txset[peer] = append(txset[peer], tx) } diff --git a/eth/peer.go b/eth/peer.go index 2e67dd41ff6f..808fc1c8149e 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -28,7 +28,9 @@ import ( "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core/types" metaapi "github.com/ethereum/go-ethereum/metadium/api" + metaminer "github.com/ethereum/go-ethereum/metadium/miner" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rlp" ) @@ -542,6 +544,42 @@ func (ps *peerSet) PeersWithoutTx(hash common.Hash) []*peer { return list } +// hub subsystem +var ( + isHub int = -1 // -1: unset, 1: hub, 0: not hub +) + +// PeersWithoutTx2 retrieves a list of peers that do not have a given transaction +// in their set of known hashes. +func (ps *peerSet) PeersWithoutTx2(hash common.Hash) []*peer { + if !metaminer.AmPartner() { + return ps.PeersWithoutTx(hash) + } + + if isHub == -1 { + isHub = metaminer.AmHub(params.Hub) + } + + if isHub == 0 { + // send it to the hub if it did not come from there + ps.lock.RLock() + for _, p := range ps.peers { + if p.id == params.Hub { + var list []*peer + if !p.knownTxs.Exists(hash) { + list = append(list, p) + } + ps.lock.RUnlock() + return list + } + } + ps.lock.RUnlock() + } + + // fall back + return ps.PeersWithoutTx(hash) +} + // BestPeer retrieves the known peer with the currently highest total difficulty. func (ps *peerSet) BestPeer() *peer { ps.lock.RLock() diff --git a/metadium/admin.go b/metadium/admin.go index 237b530815fb..9bf6289bf822 100644 --- a/metadium/admin.go +++ b/metadium/admin.go @@ -1043,6 +1043,21 @@ func IsPartner(id string) bool { return true } +// id is v4 id +func AmHub(id string) int { + if admin == nil || admin.self == nil { + return -1 + } + + admin.lock.Lock() + defer admin.lock.Unlock() + if strings.HasPrefix(strings.ToUpper(admin.self.Id), strings.ToUpper(id)) { + return 1 + } else { + return 0 + } +} + func (ma *metaAdmin) pendingEmpty() bool { type txpool_status struct { Pending hexutil.Uint `json:"pending"` @@ -1425,6 +1440,7 @@ func init() { metaminer.IsMinerFunc = IsMiner metaminer.AmPartnerFunc = AmPartner metaminer.IsPartnerFunc = IsPartner + metaminer.AmHubFunc = AmHub metaminer.LogBlockFunc = LogBlock metaminer.SuggestGasPriceFunc = suggestGasPrice metaminer.CalculateRewardsFunc = calculateRewards diff --git a/metadium/miner/miner.go b/metadium/miner/miner.go index 17fc9bd0ff50..ce8828542e03 100644 --- a/metadium/miner/miner.go +++ b/metadium/miner/miner.go @@ -14,6 +14,7 @@ var ( IsMinerFunc func() bool AmPartnerFunc func() bool IsPartnerFunc func(string) bool + AmHubFunc func(string) int LogBlockFunc func(int64, common.Hash) CalculateRewardsFunc func(*big.Int, *big.Int, *big.Int, func(common.Address, *big.Int)) (*common.Address, []byte, error) VerifyRewardsFunc func(*big.Int, string) error @@ -48,6 +49,14 @@ func AmPartner() bool { } } +func AmHub(id string) int { + if AmHubFunc == nil { + return -1 + } else { + return AmHubFunc(id) + } +} + func LogBlock(height int64, hash common.Hash) { if LogBlockFunc != nil { LogBlockFunc(height, hash) diff --git a/metadium/scripts/gmet.sh b/metadium/scripts/gmet.sh index 5488a4135dc8..02f8dff37806 100755 --- a/metadium/scripts/gmet.sh +++ b/metadium/scripts/gmet.sh @@ -175,6 +175,7 @@ function start () OPTS="$COINBASE $DISCOVER $RPCOPT $BOOT_NODES $NONCE_LIMIT $TESTNET ${GMET_OPTS}" [ "$PORT" = "" ] || OPTS="${OPTS} --port $(($PORT + 1))" + [ "$HUB" = "" ] || OPTS="${OPTS} --hub ${HUB}" [ -d "$d/logs" ] || mkdir -p $d/logs diff --git a/msgq/msgq.go b/msgq/msgq.go new file mode 100644 index 000000000000..9faa175ce08e --- /dev/null +++ b/msgq/msgq.go @@ -0,0 +1,269 @@ +// msgq.go + +package msgq + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// subscriber information +type subscriber struct { + name string // name is to identify a subscriber + ix int64 // the last index that's sent to this suscriber + e chan bool // set if a new data is posted + done bool // set when unsubscribed + f func(data interface{}) error +} + +// MsgQ implements a simple pubsub system +type MsgQ struct { + lock *sync.RWMutex + min, max int // minimum and maximum number of entries + ix2data map[int64]interface{} + head, tail int64 + subscribers map[string]*subscriber +} + +// NewMsgQ creates a new Msgq +func NewMsgQ(min, max int) *MsgQ { + return &MsgQ{ + lock: &sync.RWMutex{}, + min: min, + max: max, + ix2data: map[int64]interface{}{}, + head: 0, + tail: -1, + subscribers: map[string]*subscriber{}, + } +} + +// Destroy is no-op for now +func (q *MsgQ) Destroy() { + return +} + +// CountOfSubscribers returns the number of current subscribers +func (q *MsgQ) CountOfSubscribers() int { + q.lock.RLock() + count := len(q.subscribers) + defer q.lock.RUnlock() + return count +} + +// Post adds a new data +func (q *MsgQ) Post(data interface{}) { + q.lock.Lock() + n := q.tail - q.head + 1 + ix := q.tail + 1 + q.ix2data[ix] = data + q.tail = ix + if n <= 0 { + q.head = ix + } + n += 1 + bNeedTrimming := false + if n%100 == 0 && n > int64(q.min) { + bNeedTrimming = true + } + q.lock.Unlock() + + q.lock.RLock() + for _, s := range q.subscribers { + select { + case s.e <- true: + break + default: + break + } + } + q.lock.RUnlock() + + if bNeedTrimming { + q.Trim() + } +} + +// Trim cleans up old messages +func (q *MsgQ) Trim() { + ix := int64(-1) + + q.lock.RLock() + n := q.tail - q.head + 1 + max := q.tail - int64(q.max) + + if n <= int64(q.min) { + q.lock.RUnlock() + return + } + + for _, s := range q.subscribers { + if ix == -1 || s.ix < ix { + ix = s.ix + } + } + + pass := (ix == -1 || ix == q.head) && ix >= max + q.lock.RUnlock() + if pass { + return + } + + q.lock.Lock() + if ix < max { + ix = max + } + if ix >= q.head { + for i := q.head; i <= ix; i++ { + delete(q.ix2data, i) + } + q.head = ix + 1 + } + for _, s := range q.subscribers { + if s.ix != -1 && s.ix < ix { + s.ix = ix + } + } + q.lock.Unlock() +} + +// Subscribe adds a new subscriber +func (q *MsgQ) Subscribe(name string, f func(data interface{}) error) error { + q.lock.Lock() + if _, ok := q.subscribers[name]; ok { + q.lock.Unlock() + return errors.New("Already Exists") + } + + s := &subscriber{ + name: name, + ix: -1, + e: make(chan bool, 1), + done: false, + f: f, + } + q.subscribers[name] = s + + go func(q *MsgQ, s *subscriber) { + for !s.done { + <-s.e + fmt.Printf(" !!! got event\n") + + for { + six := s.ix + 1 + if six < q.head { + six = q.head + } + eix := q.tail + + if six > eix { + break + } + + for i := six; i <= eix; i++ { + var ( + d interface{} + ok bool + ) + q.lock.RLock() + d, ok = q.ix2data[i] + q.lock.RUnlock() + if !ok { + continue + } else { + e := s.f(d) + if e != nil { + s.done = true + break + } + } + } + s.ix = eix + } + } + }(q, s) + + q.lock.Unlock() + + return nil +} + +// Unsubscribe remoted the named subscriber +func (q *MsgQ) Unsubscribe(name string) error { + var err error + + q.lock.Lock() + if s, ok := q.subscribers[name]; !ok { + err = errors.New("Not Found") + } else { + s.done = true + select { + case s.e <- true: + break + default: + break + } + } + delete(q.subscribers, name) + q.lock.Unlock() + return err +} + +// msgq test +func msgqTest() error { + q := NewMsgQ(10, 100) + + for x := 1; x <= 100; x++ { + name := fmt.Sprintf("%d", x) + q.Subscribe(name, func(data interface{}) error { + if s, ok := data.(string); ok { + fmt.Printf("%s: %s\n", name, s) + } + return nil + }) + } + + i := 0 + for ; i < 50; i++ { + q.Post(fmt.Sprintf("%d", i)) + } + + time.Sleep(2 * time.Second) + + for ; i < 100; i++ { + q.Post(fmt.Sprintf("%d", i)) + } + + time.Sleep(2 * time.Second) + + for x := 2; x <= 100; x++ { + name := fmt.Sprintf("%d", x) + q.Unsubscribe(name) + } + + time.Sleep(2 * time.Second) + + for ; i < 200; i++ { + q.Post(fmt.Sprintf("%d", i)) + } + + time.Sleep(10 * time.Second) + + for x := 1; x <= 1; x++ { + name := fmt.Sprintf("%d", x) + q.Unsubscribe(name) + } + + for { + if q.CountOfSubscribers() == 0 { + break + } + time.Sleep(1 * time.Second) + } + + return nil +} + +// EOF diff --git a/params/protocol_params.go b/params/protocol_params.go index 8499b9c3044b..86408172781f 100644 --- a/params/protocol_params.go +++ b/params/protocol_params.go @@ -116,4 +116,5 @@ var ( UseRocksDb int = 1 // LevelDB (0) or RocksDB (1) PrefetchCount int = 0 // Transaction Prefetch count for faster db read MaxTxsPerBlock int = 5000 // Max # of transactions in a block + Hub string = "" // Hub's id ) From 5db209d932aefc1be0bc5b02f5a2a5b899c0de24 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Fri, 20 Sep 2019 04:25:54 +0000 Subject: [PATCH 11/17] reduced maxKnownTxs per peer to 102400 --- eth/peer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/eth/peer.go b/eth/peer.go index 808fc1c8149e..f6b340e6061f 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -41,7 +41,7 @@ var ( ) const ( - maxKnownTxs = 1024000 // Maximum transactions hashes to keep in the known list (prevent DOS) + maxKnownTxs = 102400 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) // maxQueuedTxs is the maximum number of transaction lists to queue up before From e3d8b735eb227c3b3627b09a2dde84cc9920978e Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Fri, 20 Sep 2019 20:29:40 +0900 Subject: [PATCH 12/17] added maxtxsperblock command line option --- metadium/scripts/gmet.sh | 1 + 1 file changed, 1 insertion(+) diff --git a/metadium/scripts/gmet.sh b/metadium/scripts/gmet.sh index 02f8dff37806..5e30c665d100 100755 --- a/metadium/scripts/gmet.sh +++ b/metadium/scripts/gmet.sh @@ -176,6 +176,7 @@ function start () OPTS="$COINBASE $DISCOVER $RPCOPT $BOOT_NODES $NONCE_LIMIT $TESTNET ${GMET_OPTS}" [ "$PORT" = "" ] || OPTS="${OPTS} --port $(($PORT + 1))" [ "$HUB" = "" ] || OPTS="${OPTS} --hub ${HUB}" + [ "MAX_TXS_PER_BLOCK" = "" ] || OPTS="${OPTS} --maxtxsperblock ${MAX_TXS_PER_BLOCK}" [ -d "$d/logs" ] || mkdir -p $d/logs From b96f7893ed2ff4616492c443f91a6765e840e27e Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Sun, 22 Sep 2019 16:34:37 +0900 Subject: [PATCH 13/17] fixed script error in gmet.sh --- metadium/scripts/gmet.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadium/scripts/gmet.sh b/metadium/scripts/gmet.sh index 5e30c665d100..d36e3402956d 100755 --- a/metadium/scripts/gmet.sh +++ b/metadium/scripts/gmet.sh @@ -176,7 +176,7 @@ function start () OPTS="$COINBASE $DISCOVER $RPCOPT $BOOT_NODES $NONCE_LIMIT $TESTNET ${GMET_OPTS}" [ "$PORT" = "" ] || OPTS="${OPTS} --port $(($PORT + 1))" [ "$HUB" = "" ] || OPTS="${OPTS} --hub ${HUB}" - [ "MAX_TXS_PER_BLOCK" = "" ] || OPTS="${OPTS} --maxtxsperblock ${MAX_TXS_PER_BLOCK}" + [ "$MAX_TXS_PER_BLOCK" = "" ] || OPTS="${OPTS} --maxtxsperblock ${MAX_TXS_PER_BLOCK}" [ -d "$d/logs" ] || mkdir -p $d/logs From 1a2af5b9a55c571f315668e5497f30ae1d5c1e38 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Mon, 23 Sep 2019 08:48:29 +0900 Subject: [PATCH 14/17] fixed occasional crashes --- miner/worker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/miner/worker.go b/miner/worker.go index 57f54173538f..9c7fe466fa80 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -793,7 +793,7 @@ func (w *worker) commitTransactions(txs *types.TransactionsByPriceAndNonce, coin return atomic.LoadInt32(interrupt) == commitInterruptNewHead } // If we don't have enough gas for any further transactions then we're done - if w.current.gasPool.Gas() < params.TxGas { + if w.current.gasPool == nil || w.current.gasPool.Gas() < params.TxGas { log.Trace("Not enough gas for further transactions", "have", w.current.gasPool, "want", params.TxGas) break } From fff6df257e0ace88a5b596c0bddf679063a0edc3 Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Mon, 23 Sep 2019 08:49:11 +0900 Subject: [PATCH 15/17] increased block announcement queue to reduce the chance of losing them --- eth/peer.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/eth/peer.go b/eth/peer.go index f6b340e6061f..5e2f97d20395 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -52,12 +52,12 @@ const ( // maxQueuedProps is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. - maxQueuedProps = 4 + maxQueuedProps = 1024 // maxQueuedAnns is the maximum number of block announcements to queue up before // dropping broadcasts. Similarly to block propagations, there's no point to queue // above some healthy uncle limit, so use that. - maxQueuedAnns = 4 + maxQueuedAnns = 1024 handshakeTimeout = 5 * time.Second ) @@ -134,11 +134,6 @@ func (p *peer) broadcast() { for { select { - case txs := <-p.queuedTxs: - for _, i := range txs { - b.Put(i) - } - case prop := <-p.queuedProps: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return @@ -151,6 +146,11 @@ func (p *peer) broadcast() { } p.Log().Trace("Announced block", "number", block.Number(), "hash", block.Hash()) + case txs := <-p.queuedTxs: + for _, i := range txs { + b.Put(i) + } + case <-p.term: return } From 59bb27fac7ba97863ceeebfe11ac7018c6e52d3f Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Tue, 24 Sep 2019 17:47:31 +0900 Subject: [PATCH 16/17] removed local vs. remote account bias as it affects performance negatively --- miner/worker.go | 37 ++++++++++++++++++++++--------------- 1 file changed, 22 insertions(+), 15 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index 9c7fe466fa80..a12034539455 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1278,24 +1278,31 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) w.updateSnapshot() return } - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) + if !metaminer.IsPoW() { // Metadium + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { return } - } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) - if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { - return + } else { + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending + for _, account := range w.eth.TxPool().Locals() { + if txs := remoteTxs[account]; len(txs) > 0 { + delete(remoteTxs, account) + localTxs[account] = txs + } + } + if len(localTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) + if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { + return + } + } + if len(remoteTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) + if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { + return + } } } w.commit(uncles, w.fullTaskHook, true, tstart) From a038d3da0bc55c2f6d8b0372f04611d29bd1e33a Mon Sep 17 00:00:00 2001 From: Uh Sado Date: Wed, 25 Sep 2019 06:04:00 +0000 Subject: [PATCH 17/17] reverted b81b486dc1a7d1cb21d30d04d9cec78f64d0b951 --- miner/worker.go | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/miner/worker.go b/miner/worker.go index a12034539455..9c7fe466fa80 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -1278,31 +1278,24 @@ func (w *worker) commitNewWork(interrupt *int32, noempty bool, timestamp int64) w.updateSnapshot() return } - if !metaminer.IsPoW() { // Metadium - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, pending) + // Split the pending transactions into locals and remotes + localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending + for _, account := range w.eth.TxPool().Locals() { + if txs := remoteTxs[account]; len(txs) > 0 { + delete(remoteTxs, account) + localTxs[account] = txs + } + } + if len(localTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { return } - } else { - // Split the pending transactions into locals and remotes - localTxs, remoteTxs := make(map[common.Address]types.Transactions), pending - for _, account := range w.eth.TxPool().Locals() { - if txs := remoteTxs[account]; len(txs) > 0 { - delete(remoteTxs, account) - localTxs[account] = txs - } - } - if len(localTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, localTxs) - if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { - return - } - } - if len(remoteTxs) > 0 { - txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) - if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { - return - } + } + if len(remoteTxs) > 0 { + txs := types.NewTransactionsByPriceAndNonce(w.current.signer, remoteTxs) + if w.commitTransactions(txs, w.coinbase, interrupt, nil, nil) { + return } } w.commit(uncles, w.fullTaskHook, true, tstart)