Skip to content

Commit

Permalink
compact restore: use closure to initial snapshot restore checkpoint (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
3pointer authored Dec 19, 2024
1 parent a3574aa commit 659e3e7
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 14 deletions.
23 changes: 18 additions & 5 deletions br/pkg/restore/snap_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ const minBatchDdlSize = 1

type SnapClient struct {
restorer restore.SstRestorer
// Use a closure to lazy load checkpoint runner
getRestorerFn func(*checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer
// Tool clients used by SnapClient
pdClient pd.Client
pdHTTPClient pdhttp.Client
Expand Down Expand Up @@ -148,7 +150,8 @@ type SnapClient struct {
rewriteMode RewriteMode

// checkpoint information for snapshot restore
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]
checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]

checkpointChecksum map[int64]*checkpoint.ChecksumItem
}

Expand All @@ -168,7 +171,10 @@ func NewRestoreClient(
}
}

func (rc *SnapClient) GetRestorer() restore.SstRestorer {
func (rc *SnapClient) GetRestorer(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer {
if rc.restorer == nil {
rc.restorer = rc.getRestorerFn(checkpointRunner)
}
return rc.restorer
}

Expand Down Expand Up @@ -389,7 +395,10 @@ func (rc *SnapClient) InitCheckpoint(
return checkpointSetWithTableID, nil, errors.Trace(err)
}
rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName)
return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err)
if err != nil {
return checkpointSetWithTableID, nil, errors.Trace(err)
}
return checkpointSetWithTableID, checkpointClusterConfig, nil
}

func (rc *SnapClient) WaitForFinishCheckpoint(ctx context.Context, flush bool) {
Expand Down Expand Up @@ -539,15 +548,19 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage
return errors.Trace(err)
}
// Raw/Txn restore are not support checkpoint for now
rc.restorer = restore.NewSimpleSstRestorer(ctx, fileImporter, rc.workerPool, nil)
rc.getRestorerFn = func(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer {
return restore.NewSimpleSstRestorer(ctx, fileImporter, rc.workerPool, nil)
}
} else {
// or create a fileImporter with the cluster API version
fileImporter, err = NewSnapFileImporter(
ctx, rc.dom.Store().GetCodec().GetAPIVersion(), TiDBFull, opt)
if err != nil {
return errors.Trace(err)
}
rc.restorer = restore.NewMultiTablesRestorer(ctx, fileImporter, rc.workerPool, rc.checkpointRunner)
rc.getRestorerFn = func(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer {
return restore.NewMultiTablesRestorer(ctx, fileImporter, rc.workerPool, checkpointRunner)
}
}
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions br/pkg/restore/snap_client/tikv_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,9 +385,10 @@ func (rc *SnapClient) RestoreSSTFiles(
}
})

retErr = rc.restorer.GoRestore(onProgress, tableIDWithFilesGroup...)
r := rc.GetRestorer(rc.checkpointRunner)
retErr = r.GoRestore(onProgress, tableIDWithFilesGroup...)
if retErr != nil {
return retErr
}
return rc.restorer.WaitUntilFinish()
return r.WaitUntilFinish()
}
4 changes: 2 additions & 2 deletions br/pkg/task/restore_raw.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR
defer restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulers, cfg.Online)

start := time.Now()
err = client.GetRestorer().GoRestore(onProgress, restore.CreateUniqueFileSets(files))
err = client.GetRestorer(nil).GoRestore(onProgress, restore.CreateUniqueFileSets(files))
if err != nil {
return errors.Trace(err)
}
err = client.GetRestorer().WaitUntilFinish()
err = client.GetRestorer(nil).WaitUntilFinish()
if err != nil {
return errors.Trace(err)
}
Expand Down
4 changes: 2 additions & 2 deletions br/pkg/task/restore_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,11 +102,11 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config)
}
defer restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulers, false)

err = client.GetRestorer().GoRestore(onProgress, restore.CreateUniqueFileSets(files))
err = client.GetRestorer(nil).GoRestore(onProgress, restore.CreateUniqueFileSets(files))
if err != nil {
return errors.Trace(err)
}
err = client.GetRestorer().WaitUntilFinish()
err = client.GetRestorer(nil).WaitUntilFinish()
if err != nil {
return errors.Trace(err)
}
Expand Down
10 changes: 7 additions & 3 deletions br/tests/br_restore_checkpoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,20 @@ if [ $restore_fail -ne 1 ]; then
fi

# PITR with checkpoint but failed in the log restore metakv stage
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\");\
github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)"
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\")"
export GO_FAILPOINTS=$GO_FAILPOINTS";github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)"
restore_fail=0
run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1
export GO_FAILPOINTS=""
if [ $restore_fail -ne 1 ]; then
echo 'PITR success'
echo 'PITR success, but should fail'
exit 1
fi

# check the snapshot restore has checkpoint data
run_sql 'select count(*) from '"__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"'.`cpt_data`;'
check_contains "count(*): 1"

# PITR with checkpoint but failed in the log restore datakv stage
# skip the snapshot restore stage
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"corrupt-last-table-files\")"
Expand Down

0 comments on commit 659e3e7

Please sign in to comment.