diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index b02402005aa43..b70348aaa5fd9 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -628,6 +628,9 @@ func TestCheckpointCompactedRestoreRunner(t *testing.T) { respCount++ } + exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + require.True(t, exists) + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker) require.NoError(t, err) require.Equal(t, 3, respCount) @@ -635,7 +638,7 @@ func TestCheckpointCompactedRestoreRunner(t *testing.T) { err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) require.NoError(t, err) - exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + exists = checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) require.False(t, exists) exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName)) require.False(t, exists) diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index c05ec3df8c2f2..2e55cc3eb81c2 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -170,8 +170,10 @@ func ExistsSstRestoreCheckpoint( dom *domain.Domain, dbName string, ) bool { + // we only check the existence of the checkpoint data table + // because the checkpoint metadata is not used for restore return dom.InfoSchema(). - TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName)) + TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointDataTableName)) } func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error { diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index ec3e3539324ad..bfd977afe1fa6 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -152,6 +152,7 @@ func NewSstRestoreManager( storeCount uint, createCheckpointSessionFn func() (glue.Session, error), ) (*SstRestoreManager, error) { + var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] // This poolSize is similar to full restore, as both workflows are comparable. // The poolSize should be greater than concurrencyPerStore multiplied by the number of stores. poolSize := concurrencyPerStore * 32 * storeCount @@ -166,14 +167,12 @@ func NewSstRestoreManager( return nil, errors.Trace(err) } if se != nil { - checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) if err != nil { return nil, errors.Trace(err) } - s.checkpointRunner = checkpointRunner } - // TODO implement checkpoint - s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, nil) + s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, checkpointRunner) return s, nil } @@ -182,14 +181,13 @@ type LogClient struct { logRestoreManager *LogRestoreManager sstRestoreManager *SstRestoreManager - cipher *backuppb.CipherInfo - pdClient pd.Client - pdHTTPClient pdhttp.Client - clusterID uint64 - dom *domain.Domain - tlsConf *tls.Config - keepaliveConf keepalive.ClientParameters - concurrencyPerStore uint + cipher *backuppb.CipherInfo + pdClient pd.Client + pdHTTPClient pdhttp.Client + clusterID uint64 + dom *domain.Domain + tlsConf *tls.Config + keepaliveConf keepalive.ClientParameters rawKVClient *rawkv.RawKVBatchClient storage storage.ExternalStorage @@ -263,6 +261,7 @@ func (rc *LogClient) RestoreCompactedSstFiles( // Collect all items from the iterator in advance to avoid blocking during restoration. // This approach ensures that we have all necessary data ready for processing, // preventing any potential delays caused by waiting for the iterator to yield more items. + start := time.Now() for r := compactionsIter.TryNext(ctx); !r.Finished; r = compactionsIter.TryNext(ctx) { if r.Err != nil { return r.Err @@ -295,6 +294,13 @@ func (rc *LogClient) RestoreCompactedSstFiles( } }() + log.Info("[Compacted SST Restore] Start to restore SST files", + zap.Int("sst-file-count", len(backupFileSets)), zap.Duration("iterate-take", time.Since(start))) + start = time.Now() + defer func() { + log.Info("[Compacted SST Restore] Restore SST files finished", zap.Duration("restore-take", time.Since(start))) + }() + // To optimize performance and minimize cross-region downloads, // we are currently opting for a single restore approach instead of batch restoration. // This decision is similar to the handling of raw and txn restores, @@ -422,7 +428,7 @@ func (rc *LogClient) InitClients( opt := snapclient.NewSnapFileImporterOptions( rc.cipher, metaClient, importCli, backend, - snapclient.RewriteModeKeyspace, stores, rc.concurrencyPerStore, createCallBacks, closeCallBacks, + snapclient.RewriteModeKeyspace, stores, concurrencyPerStore, createCallBacks, closeCallBacks, ) snapFileImporter, err := snapclient.NewSnapFileImporter( ctx, rc.dom.Store().GetCodec().GetAPIVersion(), snapclient.TiDBCompcated, opt) @@ -442,9 +448,24 @@ func (rc *LogClient) InitClients( func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore( ctx context.Context, ) (map[string]struct{}, error) { - // get sst checkpoint to skip repeated files sstCheckpointSets := make(map[string]struct{}) - // TODO initial checkpoint + + if checkpoint.ExistsSstRestoreCheckpoint(ctx, rc.dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName) { + // we need to load the checkpoint data for the following restore + execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor() + _, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.CustomSSTRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) { + sstCheckpointSets[v.Name] = struct{}{} + }) + if err != nil { + return nil, errors.Trace(err) + } + } else { + // initialize the checkpoint metadata since it is the first time to restore. + err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.unsafeSession, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil) + if err != nil { + return nil, errors.Trace(err) + } + } return sstCheckpointSets, nil } diff --git a/br/pkg/restore/restorer.go b/br/pkg/restore/restorer.go index 4e36086eee1c2..9d999af9c09fc 100644 --- a/br/pkg/restore/restorer.go +++ b/br/pkg/restore/restorer.go @@ -359,7 +359,6 @@ func (p *PipelineRestorerWrapper[T]) WithSplit(ctx context.Context, i iter.TryNe // Check if the accumulated items meet the criteria for splitting. if strategy.ShouldSplit() { - log.Info("Trying to start region split with accumulations") startTime := time.Now() // Execute the split operation on the accumulated items. diff --git a/br/pkg/restore/snap_client/BUILD.bazel b/br/pkg/restore/snap_client/BUILD.bazel index b9abcd2e99f7d..5df612e4750e6 100644 --- a/br/pkg/restore/snap_client/BUILD.bazel +++ b/br/pkg/restore/snap_client/BUILD.bazel @@ -82,7 +82,7 @@ go_test( ], embed = [":snap_client"], flaky = True, - shard_count = 18, + shard_count = 19, deps = [ "//br/pkg/errors", "//br/pkg/glue", diff --git a/br/pkg/restore/snap_client/import.go b/br/pkg/restore/snap_client/import.go index d1a81f0836362..3db134fddf1e0 100644 --- a/br/pkg/restore/snap_client/import.go +++ b/br/pkg/restore/snap_client/import.go @@ -213,6 +213,9 @@ func NewSnapFileImporter( kvMode KvMode, options *SnapFileImporterOptions, ) (*SnapFileImporter, error) { + if options.concurrencyPerStore == 0 { + return nil, errors.New("concurrencyPerStore must be greater than 0") + } fileImporter := &SnapFileImporter{ apiVersion: apiVersion, kvMode: kvMode, diff --git a/br/pkg/restore/snap_client/import_test.go b/br/pkg/restore/snap_client/import_test.go index 9d9c79fe1a6f6..23b7fd8fc81a0 100644 --- a/br/pkg/restore/snap_client/import_test.go +++ b/br/pkg/restore/snap_client/import_test.go @@ -155,6 +155,13 @@ func (client *fakeImporterClient) MultiIngest( return &import_sstpb.IngestResponse{}, nil } +func TestUnproperConfigSnapImporter(t *testing.T) { + ctx := context.Background() + opt := snapclient.NewSnapFileImporterOptionsForTest(nil, nil, nil, snapclient.RewriteModeKeyspace, 0) + _, err := snapclient.NewSnapFileImporter(ctx, kvrpcpb.APIVersion_V1, snapclient.TiDBFull, opt) + require.Error(t, err) +} + func TestSnapImporter(t *testing.T) { ctx := context.Background() splitClient := split.NewFakeSplitClient() diff --git a/br/tests/br_pitr/run.sh b/br/tests/br_pitr/run.sh index 4b863a389903a..4ec1936228fe3 100644 --- a/br/tests/br_pitr/run.sh +++ b/br/tests/br_pitr/run.sh @@ -107,6 +107,7 @@ fi # PITR restore echo "run pitr" run_sql "DROP DATABASE __TiDB_BR_Temporary_Log_Restore_Checkpoint;" +run_sql "DROP DATABASE __TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint;" run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1 check_result