Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
Merge branch 'release-5.0' into release-5.0-d5e5cdc7676e
Browse files Browse the repository at this point in the history
  • Loading branch information
glorv authored Apr 14, 2021
2 parents 568f2aa + b47663f commit 81cc4d8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 15 deletions.
35 changes: 22 additions & 13 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -1346,21 +1346,26 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
// if index-engine checkpoint is lower than `CheckpointStatusClosed`, there must be
// data-engines that need to be restore or import. Otherwise, all data-engines should
// be finished already.

if indexEngineCp.Status < checkpoints.CheckpointStatusClosed {
indexWorker := rc.indexWorkers.Apply()
defer rc.indexWorkers.Recycle(indexWorker)

indexEngine, err := rc.backend.OpenEngine(ctx, tr.tableName, indexEngineID, rc.ts)
if err != nil {
return errors.Trace(err)
}

// The table checkpoint status less than `CheckpointStatusIndexImported` implies
// that index engine checkpoint status less than `CheckpointStatusImported`.
// So the index engine must be found in above process
if indexEngine == nil {
return errors.Errorf("table checkpoint status %v incompitable with index engine checkpoint status %v",
cp.Status, indexEngineCp.Status)
// import backend can't reopen engine if engine is closed, so
// only open index engine if any data engines don't finish writing.
var indexEngine *backend.OpenedEngine
var err error
for engineID, engine := range cp.Engines {
if engineID == indexEngineID {
continue
}
if engine.Status < checkpoints.CheckpointStatusAllWritten {
indexEngine, err = rc.backend.OpenEngine(ctx, tr.tableName, indexEngineID, rc.ts)
if err != nil {
return errors.Trace(err)
}
break
}
}

logTask := tr.logger.Begin(zap.InfoLevel, "import whole table")
Expand Down Expand Up @@ -1443,8 +1448,12 @@ func (tr *TableRestore) restoreEngines(pCtx context.Context, rc *Controller, cp
return errors.Trace(restoreErr)
}

closedIndexEngine, restoreErr = indexEngine.Close(ctx)
rc.saveStatusCheckpoint(tr.tableName, indexEngineID, err, checkpoints.CheckpointStatusClosed)
if indexEngine != nil {
closedIndexEngine, restoreErr = indexEngine.Close(ctx)
} else {
closedIndexEngine, restoreErr = rc.backend.UnsafeCloseEngine(ctx, tr.tableName, indexEngineID)
}
rc.saveStatusCheckpoint(tr.tableName, indexEngineID, restoreErr, checkpoints.CheckpointStatusClosed)
} else if indexEngineCp.Status == checkpoints.CheckpointStatusClosed {
// If index engine file has been closed but not imported only if context cancel occurred
// when `importKV()` execution, so `UnsafeCloseEngine` and continue import it.
Expand Down
3 changes: 1 addition & 2 deletions tests/lightning_checkpoint_engines/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ for BACKEND in importer local; do
done

echo "******** Verify checkpoint no-op ********"
# cleanup importer data directory to avoid open-engine failure in import backend.
rm -rf $TEST_DIR/importer/*
# all engines should have been imported here.
do_run_lightning $BACKEND config

run_sql 'SELECT count(*), sum(c) FROM cpeng.a'
Expand Down

0 comments on commit 81cc4d8

Please sign in to comment.