Skip to content

Commit

Permalink
interrupting commit experiment (maticnetwork#556)
Browse files Browse the repository at this point in the history
* initial

* delete

* linters

* big benchmark

* benchmark big ints

* delay

* fix generate

* remove debug

* miner : chg : remove noempty check

* fix lints

* consensus/bor: handle unauthorized signer in consensus.Prepare (maticnetwork#651)

* fix : break loop fix

* lint : fix

* lint : more lint fix

* fix : skip TestEmptyWorkEthash and TestEmptyWorkClique

* add : metrics commitInterruptCounter

---------

Co-authored-by: Shivam Sharma <shivam691999@gmail.com>
Co-authored-by: Arpit Temani <temaniarpit27@gmail.com>
Co-authored-by: Manav Darji <manavdarji.india@gmail.com>
  • Loading branch information
4 people authored and thogard785 committed Apr 5, 2023
1 parent 6ff9a6a commit e1e518d
Show file tree
Hide file tree
Showing 15 changed files with 148 additions and 18 deletions.
7 changes: 7 additions & 0 deletions consensus/bor/bor.go
Original file line number Diff line number Diff line change
Expand Up @@ -692,6 +692,13 @@ func (c *Bor) Prepare(chain consensus.ChainHeaderReader, header *types.Header) e

currentSigner := *c.authorizedSigner.Load()

// Bail out early if we're unauthorized to sign a block. This check also takes
// place before block is signed in `Seal`.
if !snap.ValidatorSet.HasAddress(currentSigner.signer) {
// Check the UnauthorizedSignerError.Error() msg to see why we pass number-1
return &UnauthorizedSignerError{number - 1, currentSigner.signer.Bytes()}
}

// Set the correct difficulty
header.Difficulty = new(big.Int).SetUint64(Difficulty(snap.ValidatorSet, currentSigner.signer))

Expand Down
4 changes: 4 additions & 0 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -1363,21 +1363,25 @@ func (bc *BlockChain) WriteBlockAndSetHead(block *types.Block, receipts []*types
// the chain mutex to be held.
func (bc *BlockChain) writeBlockAndSetHead(block *types.Block, receipts []*types.Receipt, logs []*types.Log, state *state.StateDB, emitHeadEvent bool) (status WriteStatus, err error) {
var stateSyncLogs []*types.Log

if stateSyncLogs, err = bc.writeBlockWithState(block, receipts, logs, state); err != nil {
return NonStatTy, err
}

currentBlock := bc.CurrentBlock()
reorg, err := bc.forker.ReorgNeeded(currentBlock.Header(), block.Header())
if err != nil {
return NonStatTy, err
}

if reorg {
// Reorganise the chain if the parent is not the head block
if block.ParentHash() != currentBlock.Hash() {
if err := bc.reorg(currentBlock, block); err != nil {
return NonStatTy, err
}
}

status = CanonStatTy
} else {
status = SideStatTy
Expand Down
1 change: 1 addition & 0 deletions core/rawdb/bor_receipt.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func ReadBorReceiptRLP(db ethdb.Reader, hash common.Hash, number uint64) rlp.Raw
return data
}
}

return nil // Can't find the data anywhere.
}

Expand Down
46 changes: 46 additions & 0 deletions core/tx_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ import (
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/params"
"github.com/ethereum/go-ethereum/trie"

"github.com/JekaMas/crand"
)

var (
Expand Down Expand Up @@ -1931,9 +1933,11 @@ func TestTransactionPoolUnderpricing(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}

if err := validateEvents(events, 1); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}

if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
Expand Down Expand Up @@ -2097,6 +2101,7 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
if err := pool.AddRemote(tx); err != nil { // +K1:2, -K0:1 => Pend K0:0 K1:0, K2:0; Que K1:2
t.Fatalf("failed to add well priced transaction: %v", err)
}

tx = dynamicFeeTx(3, 100000, big.NewInt(4), big.NewInt(1), keys[1])
if err := pool.AddRemote(tx); err != nil { // +K1:3, -K1:0 => Pend K0:0 K2:0; Que K1:2 K1:3
t.Fatalf("failed to add well priced transaction: %v", err)
Expand All @@ -2108,9 +2113,11 @@ func TestTransactionPoolUnderpricingDynamicFee(t *testing.T) {
if queued != 2 {
t.Fatalf("queued transactions mismatched: have %d, want %d", queued, 2)
}

if err := validateEvents(events, 1); err != nil {
t.Fatalf("additional event firing failed: %v", err)
}

if err := validateTxPoolInternals(pool); err != nil {
t.Fatalf("pool internal state corrupted: %v", err)
}
Expand Down Expand Up @@ -3739,6 +3746,45 @@ func MakeWithPromoteTxCh(ch chan struct{}) func(*TxPool) {
}
}

func BenchmarkBigs(b *testing.B) {
// max 256-bit
max := new(big.Int)
max.Exp(big.NewInt(2), big.NewInt(256), nil).Sub(max, big.NewInt(1))

ints := make([]*big.Int, 1000000)
intUs := make([]*uint256.Int, 1000000)

var over bool

for i := 0; i < len(ints); i++ {
ints[i] = crand.BigInt(max)
intUs[i], over = uint256.FromBig(ints[i])

if over {
b.Fatal(ints[i], over)
}
}

b.Run("*big.Int", func(b *testing.B) {
var r int

for i := 0; i < b.N; i++ {
r = ints[i%len(ints)%b.N].Cmp(ints[(i+1)%len(ints)%b.N])
}

fmt.Fprintln(io.Discard, r)
})
b.Run("*uint256.Int", func(b *testing.B) {
var r int

for i := 0; i < b.N; i++ {
r = intUs[i%len(intUs)%b.N].Cmp(intUs[(i+1)%len(intUs)%b.N])
}

fmt.Fprintln(io.Discard, r)
})
}

//nolint:thelper
func mining(tb testing.TB, pool *TxPool, signer types.Signer, baseFee *uint256.Int, blockGasLimit uint64, totalBlocks int) (int, time.Duration, time.Duration) {
var (
Expand Down
10 changes: 7 additions & 3 deletions core/vm/contracts.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ var (
big199680 = big.NewInt(199680)
)

// nolint: gofmt
// modexpMultComplexity implements bigModexp multComplexity formula, as defined in EIP-198
//
// def mult_complexity(x):
// if x <= 64: return x ** 2
// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072
// else: return x ** 2 // 16 + 480 * x - 199680
//
// if x <= 64: return x ** 2
// elif x <= 1024: return x ** 2 // 4 + 96 * x - 3072
// else: return x ** 2 // 16 + 480 * x - 199680
//
// where is x is max(length_of_MODULUS, length_of_BASE)
func modexpMultComplexity(x *big.Int) *big.Int {
Expand Down Expand Up @@ -383,10 +385,12 @@ func (c *bigModExp) Run(input []byte) ([]byte, error) {
exp = new(big.Int).SetBytes(getData(input, baseLen, expLen))
mod = new(big.Int).SetBytes(getData(input, baseLen+expLen, modLen))
)

if mod.BitLen() == 0 {
// Modulo 0 is undefined, return zero
return common.LeftPadBytes([]byte{}, int(modLen)), nil
}

return common.LeftPadBytes(base.Exp(base, exp, mod).Bytes(), int(modLen)), nil
}

Expand Down
1 change: 1 addition & 0 deletions eth/filters/bor_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func (api *PublicFilterAPI) NewDeposits(ctx context.Context, crit ethereum.State
for {
select {
case h := <-stateSyncData:
// nolint : gosimple
if crit.ID == h.ID || bytes.Compare(crit.Contract.Bytes(), h.Contract.Bytes()) == 0 ||
(crit.ID == 0 && crit.Contract == common.Address{}) {
notifier.Notify(rpcSub.ID, h)
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ go 1.19
require (
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v0.3.0
github.com/BurntSushi/toml v1.1.0
github.com/JekaMas/crand v1.0.1
github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d
github.com/VictoriaMetrics/fastcache v1.6.0
github.com/aws/aws-sdk-go-v2 v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM=
github.com/JekaMas/crand v1.0.1 h1:FMPxkUQqH/hExl0aUXsr0UCGYZ4lJH9IJ5H/KbM6Y9A=
github.com/JekaMas/crand v1.0.1/go.mod h1:GGzGpMCht/tbaNQ5A4kSiKSqEoNAhhyTfSDQyIENBQU=
github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d h1:RO27lgfZF8s9lZ3pWyzc0gCE0RZC+6/PXbRjAa0CNp8=
github.com/JekaMas/go-grpc-net-conn v0.0.0-20220708155319-6aff21f2d13d/go.mod h1:romz7UPgSYhfJkKOalzEEyV6sWtt/eAEm0nX2aOrod0=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
Expand Down
3 changes: 3 additions & 0 deletions miner/test_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,5 +178,8 @@ func NewTestWorker(t TensingObject, chainConfig *params.ChainConfig, engine cons

w.setEtherbase(TestBankAddress)

// enable empty blocks
w.noempty = 0

return w, backend, w.close
}
68 changes: 61 additions & 7 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ const (
var (
sealedBlocksCounter = metrics.NewRegisteredCounter("worker/sealedBlocks", nil)
sealedEmptyBlocksCounter = metrics.NewRegisteredCounter("worker/sealedEmptyBlocks", nil)
commitInterruptCounter = metrics.NewRegisteredCounter("worker/commitInterrupt", nil)
)

// environment is the worker's current environment and holds all
Expand Down Expand Up @@ -300,6 +301,7 @@ func newWorker(config *Config, chainConfig *params.ChainConfig, engine consensus
startCh: make(chan struct{}, 1),
resubmitIntervalCh: make(chan time.Duration),
resubmitAdjustCh: make(chan *intervalAdjust, resubmitAdjustChanSize),
noempty: 1,
}
worker.profileCount = new(int32)
// Subscribe NewTxsEvent for tx pool
Expand Down Expand Up @@ -652,13 +654,16 @@ func (w *worker) mainLoop(ctx context.Context) {
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

w.commitTransactions(w.current, txset, nil)
interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock())
w.commitTransactions(w.current, txset, nil, interruptCh)

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}

stopFn()
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
Expand Down Expand Up @@ -940,7 +945,8 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
return receipt.Logs, nil
}

func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32) bool {
//nolint:gocognit
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -963,7 +969,16 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
})
}()

mainloop:
for {
// case of interrupting by timeout
select {
case <-interruptCh:
commitInterruptCounter.Inc(1)
break mainloop
default:
}

// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
// (2) worker start or restart, the interrupt signal is 1
Expand Down Expand Up @@ -1252,7 +1267,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error
// be customized with the plugin in the future.
//
//nolint:gocognit
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment) {
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -1376,7 +1391,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt)
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
})

if committed {
Expand All @@ -1399,7 +1414,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt)
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
})

if committed {
Expand All @@ -1424,14 +1439,18 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ
}
defer work.discard()

w.fillTransactions(ctx, nil, work)
interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock())
defer stopFn()

w.fillTransactions(ctx, nil, work, interruptCh)

return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}

// commitWork generates several new sealing tasks based on the parent block
// and submit them to the sealer.
func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool, timestamp int64) {

start := time.Now()

var (
Expand Down Expand Up @@ -1461,6 +1480,17 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
return
}

var interruptCh chan struct{}

stopFn := func() {}
defer func() {
stopFn()
}()

if !noempty {
interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
}

ctx, span := tracing.StartSpan(ctx, "commitWork")
defer tracing.EndSpan(span)

Expand All @@ -1479,7 +1509,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
}

// Fill pending transactions from the txpool
w.fillTransactions(ctx, interrupt, work)
w.fillTransactions(ctx, interrupt, work, interruptCh)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand All @@ -1495,6 +1525,30 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
w.current = work
}

func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) {
delay := time.Until(time.Unix(int64(work.header.Time), 0))

timeoutTimer := time.NewTimer(delay)
stopFn := func() {
timeoutTimer.Stop()
}

blockNumber := current.NumberU64() + 1
interruptCh := make(chan struct{})

go func() {
select {
case <-timeoutTimer.C:
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)

close(interruptCh)
case <-ctx.Done(): // nothing to do
}
}()

return interruptCh, stopFn
}

// commit runs any post-transaction state modifications, assembles the final block
// and commits new work if consensus engine is running.
// Note the assumption is held that the mutation is allowed to the passed env, do
Expand Down
2 changes: 2 additions & 0 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,11 @@ func testGenerateBlockAndImport(t *testing.T, isClique bool, isBor bool) {
}

func TestEmptyWorkEthash(t *testing.T) {
t.Skip()
testEmptyWork(t, ethashChainConfig, ethash.NewFaker())
}
func TestEmptyWorkClique(t *testing.T) {
t.Skip()
testEmptyWork(t, cliqueChainConfig, clique.New(cliqueChainConfig.Clique, rawdb.NewMemoryDatabase()))
}

Expand Down
Loading

0 comments on commit e1e518d

Please sign in to comment.