Skip to content

Commit

Permalink
lightning: fix panic when user cancel (#41236)
Browse files Browse the repository at this point in the history
close #41235
  • Loading branch information
lance6716 authored Feb 10, 2023
1 parent 1746f02 commit 532689c
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 1 deletion.
10 changes: 10 additions & 0 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/google/btree"
"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/br/pkg/lightning/backend"
"github.com/pingcap/tidb/br/pkg/lightning/backend/kv"
"github.com/pingcap/tidb/br/pkg/lightning/checkpoints"
Expand Down Expand Up @@ -1224,6 +1225,15 @@ func (w *Writer) flushKVs(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}

failpoint.Inject("orphanWriterGoRoutine", func() {
_ = common.KillMySelf()
// mimic we meet context cancel error when `addSST`
<-ctx.Done()
time.Sleep(5 * time.Second)
failpoint.Return(errors.Trace(ctx.Err()))
})

err = w.addSST(ctx, meta)
if err != nil {
return errors.Trace(err)
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/lightning.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,11 @@ func (l *Lightning) run(taskCtx context.Context, taskCfg *config.Config, o *opti
o.logger.Error("restore failed", log.ShortError(err))
return errors.Trace(err)
}

failpoint.Inject("orphanWriterGoRoutine", func() {
// don't exit too quickly to expose panic
defer time.Sleep(time.Second * 10)
})
defer procedure.Close()

err = procedure.Run(ctx)
Expand Down
14 changes: 13 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ func (tr *TableRestore) restoreEngine(
metrics, _ := metric.FromContext(ctx)

// Restore table data
ChunkLoop:
for chunkIndex, chunk := range cp.Chunks {
if rc.status != nil && rc.status.backend == config.BackendTiDB {
rc.status.FinishedFileSize.Add(chunk.Chunk.Offset - chunk.Key.Offset)
Expand All @@ -524,9 +525,15 @@ func (tr *TableRestore) restoreEngine(
}
checkFlushLock.Unlock()

failpoint.Inject("orphanWriterGoRoutine", func() {
if chunkIndex > 0 {
<-pCtx.Done()
}
})

select {
case <-pCtx.Done():
return nil, pCtx.Err()
break ChunkLoop
default:
}

Expand Down Expand Up @@ -615,6 +622,11 @@ func (tr *TableRestore) restoreEngine(
}

wg.Wait()
select {
case <-pCtx.Done():
return nil, pCtx.Err()
default:
}

// Report some statistics into the log for debugging.
totalKVSize := uint64(0)
Expand Down
5 changes: 5 additions & 0 deletions br/tests/lightning_checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ for i in $(seq "$CHUNK_COUNT"); do
done
done

PKG="github.com/pingcap/tidb/br/pkg/lightning"
export GO_FAILPOINTS="$PKG/backend/local/orphanWriterGoRoutine=return();$PKG/restore/orphanWriterGoRoutine=return();$PKG/orphanWriterGoRoutine=return()"
# test won't panic
do_run_lightning config

# Set the failpoint to kill the lightning instance as soon as
# one file (after writing totally $ROW_COUNT rows) is imported.
# If checkpoint does work, this should kill exactly $CHUNK_COUNT instances of lightnings.
Expand Down

0 comments on commit 532689c

Please sign in to comment.