Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45639
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Aug 15, 2023
1 parent cf36a9c commit 578ba8f
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 84 deletions.
4 changes: 3 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1734,11 +1734,13 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,

MetadataDownloadBatchSize: metadataDownloadBatchSize,
}
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ import (
"go.uber.org/zap"
)

const (
readMetaConcurrency = 128
readMetaBatchSize = 512
)

// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

Expand Down Expand Up @@ -56,13 +51,17 @@ type logFileManager struct {

storage storage.ExternalStorage
helper *stream.MetadataHelper

metadataDownloadBatchSize uint
}

// LogFileManagerInit is the config needed for initializing the log file manager.
type LogFileManagerInit struct {
StartTS uint64
RestoreTS uint64
Storage storage.ExternalStorage

MetadataDownloadBatchSize uint
}

type DDLMetaGroup struct {
Expand All @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(),

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
err := fm.loadShiftTS(ctx)
if err != nil {
Expand All @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error {
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
Expand Down Expand Up @@ -162,8 +163,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
}
return meta, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
reader := iter.Transform(namesIter, readMeta,
iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency))
iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize))
return reader, nil
}

Expand Down
5 changes: 5 additions & 0 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,

MetadataDownloadBatchSize: 32,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
Expand Down Expand Up @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.MetadataDownloadBatchSize = 128
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
Expand Down Expand Up @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) {
StartTS: start,
RestoreTS: end,
Storage: loc,

MetadataDownloadBatchSize: 32,
})
req.NoError(err)

Expand Down
7 changes: 4 additions & 3 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type StreamMetadataSet struct {

// keeps the meta-information of metadata as little as possible
// to save the memory
metadataInfos map[string]*MetadataInfo
metadataInfos map[string]*MetadataInfo
MetadataDownloadBatchSize uint

// a parser of metadata
Helper *stream.MetadataHelper
Expand Down Expand Up @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s
metadataMap.metas = make(map[string]*MetadataInfo)
// `shiftUntilTS` must be less than `until`
metadataMap.shiftUntilTS = until
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := ms.Helper.ParseToMetadataHard(raw)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context
item []string
sync.Mutex
}
worker := utils.NewWorkerPool(128, "delete files")
worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files")
eg, cx := errgroup.WithContext(ctx)
for path, metaInfo := range ms.metadataInfos {
path := path
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) {
require.NoError(t, fakeStreamBackup(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) {
require.NoError(t, fakeStreamBackupV2(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
streamBackupMetaPrefix = "v1/backupmeta"

streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"

metaDataWorkerPoolSize = 128
)

func GetStreamBackupMetaPrefix() string {
Expand Down Expand Up @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
func FastUnmarshalMetaData(
ctx context.Context,
s storage.ExternalStorage,
metaDataWorkerPoolSize uint,
fn func(path string, rawMetaData []byte) error,
) error {
log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize))
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata")
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ go_test(
],
embed = [":task"],
flaky = True,
<<<<<<< HEAD
=======
shard_count = 18,
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@ const (
flagCipherKey = "crypter.key"
flagCipherKeyFile = "crypter.key-file"

flagMetadataDownloadBatchSize = "metadata-download-batch-size"
defaultMetadataDownloadBatchSize = 128

unlimited = 0
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
Expand Down Expand Up @@ -234,6 +237,15 @@ type Config struct {

// whether there's explicit filter
ExplicitFilter bool `json:"-" toml:"-"`
<<<<<<< HEAD
=======

// KeyspaceName is the name of the keyspace of the task
KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"`

// Metadata download batch size, such as metadata for log restore
MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"`
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -289,6 +301,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
"by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"")
flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key")

flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
}

Expand Down Expand Up @@ -579,6 +596,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

Expand Down Expand Up @@ -740,6 +761,9 @@ func (cfg *Config) adjust() {
if cfg.ChecksumConcurrency == 0 {
cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency
}
if cfg.MetadataDownloadBatchSize == 0 {
cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize
}
}

func normalizePDURL(pd string, useTLS bool) (string, error) {
Expand Down
54 changes: 52 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,8 +933,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
if err != nil {
Expand Down Expand Up @@ -1146,7 +1147,52 @@ func restoreStream(
// mode or emptied schedulers
defer restorePostWork(ctx, client, restoreSchedulers)

<<<<<<< HEAD
err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS)
=======
// It need disable GC in TiKV when PiTR.
// because the process of PITR is concurrent and kv events isn't sorted by tso.
restoreGc, oldRatio, err := KeepGcDisabled(g, mgr.GetStorage())
if err != nil {
return errors.Trace(err)
}
gcDisabledRestorable := false
defer func() {
// don't restore the gc-ratio-threshold if checkpoint mode is used and restored is not finished
if cfg.UseCheckpoint && !gcDisabledRestorable {
log.Info("skip restore the gc-ratio-threshold for next retry")
return
}

log.Info("start to restore gc", zap.String("ratio", oldRatio))
if err := restoreGc(oldRatio); err != nil {
log.Error("failed to set gc enabled", zap.Error(err))
}
log.Info("finish restoring gc")
}()

var taskName string
var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType]
if cfg.UseCheckpoint {
taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS)
oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio)
if err != nil {
return errors.Trace(err)
}
oldRatio = oldRatioFromCheckpoint

checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName)
if err != nil {
return errors.Trace(err)
}
defer func() {
log.Info("wait for flush checkpoint...")
checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable)
}()
}

err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize)
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
if err != nil {
return err
}
Expand Down Expand Up @@ -1418,6 +1464,7 @@ func getFullBackupTS(
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

<<<<<<< HEAD
func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
Expand Down Expand Up @@ -1456,6 +1503,9 @@ func getGlobalResolvedTS(
}

func initFullBackupTables(
=======
func parseFullBackupTablesStorage(
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
ctx context.Context,
cfg *RestoreConfig,
) (map[int64]*metautil.Table, error) {
Expand Down
Loading

0 comments on commit 578ba8f

Please sign in to comment.