Skip to content

Commit

Permalink
compacted restore: fix the wrong initial configrations (#58050)
Browse files Browse the repository at this point in the history
ref #56522
  • Loading branch information
3pointer authored Dec 10, 2024
1 parent d6b313f commit b416484
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 19 deletions.
5 changes: 4 additions & 1 deletion br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -628,14 +628,17 @@ func TestCheckpointCompactedRestoreRunner(t *testing.T) {
respCount++
}

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.True(t, exists)

_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 3, respCount)

err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
exists = checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName))
require.False(t, exists)
Expand Down
4 changes: 3 additions & 1 deletion br/pkg/checkpoint/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,10 @@ func ExistsSstRestoreCheckpoint(
dom *domain.Domain,
dbName string,
) bool {
// we only check the existence of the checkpoint data table
// because the checkpoint metadata is not used for restore
return dom.InfoSchema().
TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName))
TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointDataTableName))
}

func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error {
Expand Down
51 changes: 36 additions & 15 deletions br/pkg/restore/log_client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ func NewSstRestoreManager(
storeCount uint,
createCheckpointSessionFn func() (glue.Session, error),
) (*SstRestoreManager, error) {
var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]
// This poolSize is similar to full restore, as both workflows are comparable.
// The poolSize should be greater than concurrencyPerStore multiplied by the number of stores.
poolSize := concurrencyPerStore * 32 * storeCount
Expand All @@ -166,14 +167,12 @@ func NewSstRestoreManager(
return nil, errors.Trace(err)
}
if se != nil {
checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
if err != nil {
return nil, errors.Trace(err)
}
s.checkpointRunner = checkpointRunner
}
// TODO implement checkpoint
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, nil)
s.restorer = restore.NewSimpleSstRestorer(ctx, snapFileImporter, sstWorkerPool, checkpointRunner)
return s, nil
}

Expand All @@ -182,14 +181,13 @@ type LogClient struct {
logRestoreManager *LogRestoreManager
sstRestoreManager *SstRestoreManager

cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters
concurrencyPerStore uint
cipher *backuppb.CipherInfo
pdClient pd.Client
pdHTTPClient pdhttp.Client
clusterID uint64
dom *domain.Domain
tlsConf *tls.Config
keepaliveConf keepalive.ClientParameters

rawKVClient *rawkv.RawKVBatchClient
storage storage.ExternalStorage
Expand Down Expand Up @@ -263,6 +261,7 @@ func (rc *LogClient) RestoreCompactedSstFiles(
// Collect all items from the iterator in advance to avoid blocking during restoration.
// This approach ensures that we have all necessary data ready for processing,
// preventing any potential delays caused by waiting for the iterator to yield more items.
start := time.Now()
for r := compactionsIter.TryNext(ctx); !r.Finished; r = compactionsIter.TryNext(ctx) {
if r.Err != nil {
return r.Err
Expand Down Expand Up @@ -295,6 +294,13 @@ func (rc *LogClient) RestoreCompactedSstFiles(
}
}()

log.Info("[Compacted SST Restore] Start to restore SST files",
zap.Int("sst-file-count", len(backupFileSets)), zap.Duration("iterate-take", time.Since(start)))
start = time.Now()
defer func() {
log.Info("[Compacted SST Restore] Restore SST files finished", zap.Duration("restore-take", time.Since(start)))
}()

// To optimize performance and minimize cross-region downloads,
// we are currently opting for a single restore approach instead of batch restoration.
// This decision is similar to the handling of raw and txn restores,
Expand Down Expand Up @@ -422,7 +428,7 @@ func (rc *LogClient) InitClients(

opt := snapclient.NewSnapFileImporterOptions(
rc.cipher, metaClient, importCli, backend,
snapclient.RewriteModeKeyspace, stores, rc.concurrencyPerStore, createCallBacks, closeCallBacks,
snapclient.RewriteModeKeyspace, stores, concurrencyPerStore, createCallBacks, closeCallBacks,
)
snapFileImporter, err := snapclient.NewSnapFileImporter(
ctx, rc.dom.Store().GetCodec().GetAPIVersion(), snapclient.TiDBCompcated, opt)
Expand All @@ -442,9 +448,24 @@ func (rc *LogClient) InitClients(
func (rc *LogClient) InitCheckpointMetadataForCompactedSstRestore(
ctx context.Context,
) (map[string]struct{}, error) {
// get sst checkpoint to skip repeated files
sstCheckpointSets := make(map[string]struct{})
// TODO initial checkpoint

if checkpoint.ExistsSstRestoreCheckpoint(ctx, rc.dom, checkpoint.CustomSSTRestoreCheckpointDatabaseName) {
// we need to load the checkpoint data for the following restore
execCtx := rc.unsafeSession.GetSessionCtx().GetRestrictedSQLExecutor()
_, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.CustomSSTRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) {
sstCheckpointSets[v.Name] = struct{}{}
})
if err != nil {
return nil, errors.Trace(err)
}
} else {
// initialize the checkpoint metadata since it is the first time to restore.
err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.unsafeSession, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil)
if err != nil {
return nil, errors.Trace(err)
}
}
return sstCheckpointSets, nil
}

Expand Down
1 change: 0 additions & 1 deletion br/pkg/restore/restorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,6 @@ func (p *PipelineRestorerWrapper[T]) WithSplit(ctx context.Context, i iter.TryNe

// Check if the accumulated items meet the criteria for splitting.
if strategy.ShouldSplit() {
log.Info("Trying to start region split with accumulations")
startTime := time.Now()

// Execute the split operation on the accumulated items.
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/restore/snap_client/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ go_test(
],
embed = [":snap_client"],
flaky = True,
shard_count = 18,
shard_count = 19,
deps = [
"//br/pkg/errors",
"//br/pkg/glue",
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/restore/snap_client/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,9 @@ func NewSnapFileImporter(
kvMode KvMode,
options *SnapFileImporterOptions,
) (*SnapFileImporter, error) {
if options.concurrencyPerStore == 0 {
return nil, errors.New("concurrencyPerStore must be greater than 0")
}
fileImporter := &SnapFileImporter{
apiVersion: apiVersion,
kvMode: kvMode,
Expand Down
7 changes: 7 additions & 0 deletions br/pkg/restore/snap_client/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ func (client *fakeImporterClient) MultiIngest(
return &import_sstpb.IngestResponse{}, nil
}

func TestUnproperConfigSnapImporter(t *testing.T) {
ctx := context.Background()
opt := snapclient.NewSnapFileImporterOptionsForTest(nil, nil, nil, snapclient.RewriteModeKeyspace, 0)
_, err := snapclient.NewSnapFileImporter(ctx, kvrpcpb.APIVersion_V1, snapclient.TiDBFull, opt)
require.Error(t, err)
}

func TestSnapImporter(t *testing.T) {
ctx := context.Background()
splitClient := split.NewFakeSplitClient()
Expand Down
1 change: 1 addition & 0 deletions br/tests/br_pitr/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ fi
# PITR restore
echo "run pitr"
run_sql "DROP DATABASE __TiDB_BR_Temporary_Log_Restore_Checkpoint;"
run_sql "DROP DATABASE __TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint;"
run_br --pd $PD_ADDR restore point -s "local://$TEST_DIR/$PREFIX/log" --full-backup-storage "local://$TEST_DIR/$PREFIX/full" > $res_file 2>&1

check_result
Expand Down

0 comments on commit b416484

Please sign in to comment.