Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix : go-routine leak in commitInterrupt channel #851

Merged
merged 5 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion miner/fake_miner.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ var (
// Test accounts
testBankKey, _ = crypto.GenerateKey()
TestBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey)
testBankFunds = big.NewInt(1000000000000000000)
testBankFunds = big.NewInt(9000000000000000000)

testUserKey, _ = crypto.GenerateKey()
testUserAddress = crypto.PubkeyToAddress(testUserKey.PublicKey)
Expand Down
35 changes: 19 additions & 16 deletions miner/test_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,16 +358,14 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) {
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

interruptCh, stopFn := getInterruptTimer(ctx, w.current, w.chain.CurrentBlock())
w.commitTransactionsWithDelay(w.current, txset, nil, interruptCh, delay)
//nolint:contextcheck
w.commitTransactions(w.current, txset, nil, context.Background())

// Only update the snapshot if any new transactions were added
// to the pending block
if tcount != w.current.tcount {
w.updateSnapshot(w.current)
}

stopFn()
} else {
// Special case, if the consensus engine is 0 period clique(dev mode),
// submit sealing work here since all empty submission will be rejected
Expand All @@ -393,7 +391,7 @@ func (w *worker) mainLoopWithDelay(ctx context.Context, delay uint) {
}

// nolint:gocognit
func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}, delay uint) bool {
func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context, delay uint) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -420,11 +418,15 @@ func (w *worker) commitTransactionsWithDelay(env *environment, txs *types.Transa
mainloop:
for {
// case of interrupting by timeout
select {
case <-interruptCh:
commitInterruptCounter.Inc(1)
break mainloop
default:
if interruptCtx != nil {
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
commitInterruptCounter.Inc(1)
log.Warn("Tx Level Interrupt")
break mainloop
default:
}
}
// In the following three cases, we will interrupt the execution of the transaction.
// (1) new head block event arrival, the interrupt signal is 1
Expand Down Expand Up @@ -581,15 +583,16 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
return
}

var interruptCh chan struct{}
//nolint:contextcheck
var interruptCtx = context.Background()

stopFn := func() {}
defer func() {
stopFn()
}()

if !noempty {
interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
}

ctx, span := tracing.StartSpan(ctx, "commitWork")
Expand All @@ -610,7 +613,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
}

// Fill pending transactions from the txpool
w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCh, delay)
w.fillTransactionsWithDelay(ctx, interrupt, work, interruptCtx, delay)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand All @@ -627,7 +630,7 @@ func (w *worker) commitWorkWithDelay(ctx context.Context, interrupt *int32, noem
}

// nolint:gocognit
func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}, delay uint) {
func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context, delay uint) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -751,7 +754,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32
})

tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay)
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx, delay)
})

if committed {
Expand All @@ -774,7 +777,7 @@ func (w *worker) fillTransactionsWithDelay(ctx context.Context, interrupt *int32
})

tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCh, delay)
committed = w.commitTransactionsWithDelay(env, txs, interrupt, interruptCtx, delay)
})

if committed {
Expand Down
57 changes: 29 additions & 28 deletions miner/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,8 @@ func (w *worker) mainLoop(ctx context.Context) {
txset := types.NewTransactionsByPriceAndNonce(w.current.signer, txs, cmath.FromBig(w.current.header.BaseFee))
tcount := w.current.tcount

var interruptCh chan struct{}

w.commitTransactions(w.current, txset, nil, interruptCh)
//nolint:contextcheck
w.commitTransactions(w.current, txset, nil, context.Background())

// Only update the snapshot if any new transactions were added
// to the pending block
Expand Down Expand Up @@ -945,7 +944,7 @@ func (w *worker) commitTransaction(env *environment, tx *types.Transaction) ([]*
}

//nolint:gocognit
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCh chan struct{}) bool {
func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByPriceAndNonce, interrupt *int32, interruptCtx context.Context) bool {
gasLimit := env.header.GasLimit
if env.gasPool == nil {
env.gasPool = new(core.GasPool).AddGas(gasLimit)
Expand All @@ -971,11 +970,15 @@ func (w *worker) commitTransactions(env *environment, txs *types.TransactionsByP
mainloop:
for {
// case of interrupting by timeout
select {
case <-interruptCh:
commitInterruptCounter.Inc(1)
break mainloop
default:
if interruptCtx != nil {
// case of interrupting by timeout
select {
case <-interruptCtx.Done():
commitInterruptCounter.Inc(1)
log.Warn("Tx Level Interrupt")
break mainloop
default:
}
}

// In the following three cases, we will interrupt the execution of the transaction.
Expand Down Expand Up @@ -1266,7 +1269,7 @@ func startProfiler(profile string, filepath string, number uint64) (func() error
// be customized with the plugin in the future.
//
//nolint:gocognit
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCh chan struct{}) {
func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *environment, interruptCtx context.Context) {
ctx, span := tracing.StartSpan(ctx, "fillTransactions")
defer tracing.EndSpan(span)

Expand Down Expand Up @@ -1390,7 +1393,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.LocalCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
committed = w.commitTransactions(env, txs, interrupt, interruptCtx)
})

if committed {
Expand All @@ -1413,7 +1416,7 @@ func (w *worker) fillTransactions(ctx context.Context, interrupt *int32, env *en
})

tracing.Exec(ctx, "", "worker.RemoteCommitTransactions", func(ctx context.Context, span trace.Span) {
committed = w.commitTransactions(env, txs, interrupt, interruptCh)
committed = w.commitTransactions(env, txs, interrupt, interruptCtx)
})

if committed {
Expand All @@ -1438,10 +1441,10 @@ func (w *worker) generateWork(ctx context.Context, params *generateParams) (*typ
}
defer work.discard()

interruptCh, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn := getInterruptTimer(ctx, work, w.chain.CurrentBlock())
defer stopFn()

w.fillTransactions(ctx, nil, work, interruptCh)
w.fillTransactions(ctx, nil, work, interruptCtx)

return w.engine.FinalizeAndAssemble(ctx, w.chain, work.header, work.state, work.txs, work.unclelist(), work.receipts)
}
Expand Down Expand Up @@ -1479,15 +1482,16 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
return
}

var interruptCh chan struct{}
//nolint:contextcheck
var interruptCtx = context.Background()

stopFn := func() {}
defer func() {
stopFn()
}()

if !noempty {
interruptCh, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
interruptCtx, stopFn = getInterruptTimer(ctx, work, w.chain.CurrentBlock())
}

ctx, span := tracing.StartSpan(ctx, "commitWork")
Expand All @@ -1508,7 +1512,7 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
}

// Fill pending transactions from the txpool
w.fillTransactions(ctx, interrupt, work, interruptCh)
w.fillTransactions(ctx, interrupt, work, interruptCtx)

err = w.commit(ctx, work.copy(), w.fullTaskHook, true, start)
if err != nil {
Expand All @@ -1524,28 +1528,25 @@ func (w *worker) commitWork(ctx context.Context, interrupt *int32, noempty bool,
w.current = work
}

func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (chan struct{}, func()) {
func getInterruptTimer(ctx context.Context, work *environment, current *types.Block) (context.Context, func()) {
delay := time.Until(time.Unix(int64(work.header.Time), 0))

timeoutTimer := time.NewTimer(delay)
stopFn := func() {
timeoutTimer.Stop()
}
interruptCtx, cancel := context.WithTimeout(context.Background(), delay)

blockNumber := current.NumberU64() + 1
interruptCh := make(chan struct{})

go func() {
select {
case <-timeoutTimer.C:
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)

close(interruptCh)
case <-interruptCtx.Done():
if interruptCtx.Err() != context.Canceled {
log.Info("Commit Interrupt. Pre-committing the current block", "block", blockNumber)
cancel()
}
case <-ctx.Done(): // nothing to do
}
}()

return interruptCh, stopFn
return interruptCtx, cancel
}

// commit runs any post-transaction state modifications, assembles the final block
Expand Down
20 changes: 6 additions & 14 deletions miner/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,21 +43,18 @@ import (
"github.com/ethereum/go-ethereum/tests/bor/mocks"
)

// nolint : paralleltest
func TestGenerateBlockAndImportEthash(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, false, false)
}

// nolint : paralleltest
func TestGenerateBlockAndImportClique(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, true, false)
}

// nolint : paralleltest
func TestGenerateBlockAndImportBor(t *testing.T) {
t.Parallel()

testGenerateBlockAndImport(t, false, true)
}

Expand Down Expand Up @@ -627,18 +624,18 @@ func testGetSealingWork(t *testing.T, chainConfig *params.ChainConfig, engine co
}
}

// nolint:paralleltest
func TestCommitInterruptExperimentBor(t *testing.T) {
t.Parallel()
// with 1 sec block time and 200 millisec tx delay we should get 5 txs per block
testCommitInterruptExperimentBor(t, 200, 5)

time.Sleep(3 * time.Second)
// with 1 sec block time and 100 millisec tx delay we should get 10 txs per block
testCommitInterruptExperimentBor(t, 100, 10)
}

// nolint:thelper
func testCommitInterruptExperimentBor(t *testing.T, delay uint, txCount int) {
t.Helper()

var (
engine consensus.Engine
chainConfig *params.ChainConfig
Expand Down Expand Up @@ -726,11 +723,6 @@ func BenchmarkBorMining(b *testing.B) {
chain, _ := core.NewBlockChain(db2, nil, back.chain.Config(), engine, vm.Config{}, nil, nil, nil)
defer chain.Stop()

// Ignore empty commit here for less noise.
w.skipSealHook = func(task *task) bool {
return len(task.receipts) == 0
}

// fulfill tx pool
const (
totalGas = testGas + params.TxGas
Expand Down