Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#45639
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
Leavrth authored and ti-chi-bot committed Aug 15, 2023
1 parent 4e6af00 commit 88212df
Show file tree
Hide file tree
Showing 10 changed files with 95 additions and 84 deletions.
4 changes: 3 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1768,11 +1768,13 @@ func (rc *Client) PreCheckTableClusterIndex(
return nil
}

func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64) error {
func (rc *Client) InstallLogFileManager(ctx context.Context, startTS, restoreTS uint64, metadataDownloadBatchSize uint) error {
init := LogFileManagerInit{
StartTS: startTS,
RestoreTS: restoreTS,
Storage: rc.storage,

MetadataDownloadBatchSize: metadataDownloadBatchSize,
}
var err error
rc.logFileManager, err = CreateLogFileManager(ctx, init)
Expand Down
17 changes: 10 additions & 7 deletions br/pkg/restore/log_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@ import (
"go.uber.org/zap"
)

const (
readMetaConcurrency = 128
readMetaBatchSize = 512
)

// MetaIter is the type of iterator of metadata files' content.
type MetaIter = iter.TryNextor[*backuppb.Metadata]

Expand Down Expand Up @@ -56,13 +51,17 @@ type logFileManager struct {

storage storage.ExternalStorage
helper *stream.MetadataHelper

metadataDownloadBatchSize uint
}

// LogFileManagerInit is the config needed for initializing the log file manager.
type LogFileManagerInit struct {
StartTS uint64
RestoreTS uint64
Storage storage.ExternalStorage

MetadataDownloadBatchSize uint
}

type DDLMetaGroup struct {
Expand All @@ -78,6 +77,8 @@ func CreateLogFileManager(ctx context.Context, init LogFileManagerInit) (*logFil
restoreTS: init.RestoreTS,
storage: init.Storage,
helper: stream.NewMetadataHelper(),

metadataDownloadBatchSize: init.MetadataDownloadBatchSize,
}
err := fm.loadShiftTS(ctx)
if err != nil {
Expand All @@ -96,7 +97,7 @@ func (rc *logFileManager) loadShiftTS(ctx context.Context) error {
value uint64
exists bool
}{}
err := stream.FastUnmarshalMetaData(ctx, rc.storage, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, rc.storage, rc.metadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := rc.helper.ParseToMetadata(raw)
if err != nil {
return err
Expand Down Expand Up @@ -165,8 +166,10 @@ func (rc *logFileManager) createMetaIterOver(ctx context.Context, s storage.Exte
}
return meta, nil
}
// TODO: maybe we need to be able to adjust the concurrency to download files,
// which currently is the same as the chunk size
reader := iter.Transform(namesIter, readMeta,
iter.WithChunkSize(readMetaBatchSize), iter.WithConcurrency(readMetaConcurrency))
iter.WithChunkSize(rc.metadataDownloadBatchSize), iter.WithConcurrency(rc.metadataDownloadBatchSize))
return reader, nil
}

Expand Down
5 changes: 5 additions & 0 deletions br/pkg/restore/log_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ func testReadMetaBetweenTSWithVersion(t *testing.T, m metaMaker) {
StartTS: c.startTS,
RestoreTS: c.endTS,
Storage: loc,

MetadataDownloadBatchSize: 32,
}
cli, err := CreateLogFileManager(ctx, init)
req.Equal(cli.ShiftTS(), c.expectedShiftTS)
Expand Down Expand Up @@ -300,6 +302,7 @@ func testReadFromMetadataWithVersion(t *testing.T, m metaMaker) {

meta := new(StreamMetadataSet)
meta.Helper = stream.NewMetadataHelper()
meta.MetadataDownloadBatchSize = 128
meta.LoadUntilAndCalculateShiftTS(ctx, loc, c.untilTS)

var metas []*backuppb.Metadata
Expand Down Expand Up @@ -459,6 +462,8 @@ func testFileManagerWithMeta(t *testing.T, m metaMaker) {
StartTS: start,
RestoreTS: end,
Storage: loc,

MetadataDownloadBatchSize: 32,
})
req.NoError(err)

Expand Down
7 changes: 4 additions & 3 deletions br/pkg/restore/stream_metas.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ type StreamMetadataSet struct {

// keeps the meta-information of metadata as little as possible
// to save the memory
metadataInfos map[string]*MetadataInfo
metadataInfos map[string]*MetadataInfo
MetadataDownloadBatchSize uint

// a parser of metadata
Helper *stream.MetadataHelper
Expand Down Expand Up @@ -62,7 +63,7 @@ func (ms *StreamMetadataSet) LoadUntilAndCalculateShiftTS(ctx context.Context, s
metadataMap.metas = make(map[string]*MetadataInfo)
// `shiftUntilTS` must be less than `until`
metadataMap.shiftUntilTS = until
err := stream.FastUnmarshalMetaData(ctx, s, func(path string, raw []byte) error {
err := stream.FastUnmarshalMetaData(ctx, s, ms.MetadataDownloadBatchSize, func(path string, raw []byte) error {
m, err := ms.Helper.ParseToMetadataHard(raw)
if err != nil {
return err
Expand Down Expand Up @@ -154,7 +155,7 @@ func (ms *StreamMetadataSet) RemoveDataFilesAndUpdateMetadataInBatch(ctx context
item []string
sync.Mutex
}
worker := utils.NewWorkerPool(128, "delete files")
worker := utils.NewWorkerPool(ms.MetadataDownloadBatchSize, "delete files")
eg, cx := errgroup.WithContext(ctx)
for path, metaInfo := range ms.metadataInfos {
path := path
Expand Down
18 changes: 12 additions & 6 deletions br/pkg/restore/stream_metas_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ func TestTruncateLog(t *testing.T) {
require.NoError(t, fakeStreamBackup(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -221,7 +222,8 @@ func TestTruncateLogV2(t *testing.T) {
require.NoError(t, fakeStreamBackupV2(l))

s := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
require.NoError(t, s.LoadFrom(ctx, l))

Expand Down Expand Up @@ -1188,7 +1190,8 @@ func TestTruncate1(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -1703,7 +1706,8 @@ func TestTruncate2(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2086,7 +2090,8 @@ func TestTruncate3(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down Expand Up @@ -2298,7 +2303,8 @@ func TestCalculateShiftTS(t *testing.T) {
for _, until := range ts.until {
t.Logf("case %d, param %d, until %d", i, j, until)
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
Helper: stream.NewMetadataHelper(),
MetadataDownloadBatchSize: 128,
}
err := generateFiles(ctx, s, cs.metas, tmpDir)
require.NoError(t, err)
Expand Down
5 changes: 2 additions & 3 deletions br/pkg/stream/stream_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ const (
streamBackupMetaPrefix = "v1/backupmeta"

streamBackupGlobalCheckpointPrefix = "v1/global_checkpoint"

metaDataWorkerPoolSize = 128
)

func GetStreamBackupMetaPrefix() string {
Expand Down Expand Up @@ -300,9 +298,10 @@ func (*MetadataHelper) Marshal(meta *backuppb.Metadata) ([]byte, error) {
func FastUnmarshalMetaData(
ctx context.Context,
s storage.ExternalStorage,
metaDataWorkerPoolSize uint,
fn func(path string, rawMetaData []byte) error,
) error {
log.Info("use workers to speed up reading metadata files", zap.Int("workers", metaDataWorkerPoolSize))
log.Info("use workers to speed up reading metadata files", zap.Uint("workers", metaDataWorkerPoolSize))
pool := utils.NewWorkerPool(metaDataWorkerPoolSize, "metadata")
eg, ectx := errgroup.WithContext(ctx)
opt := &storage.WalkOption{SubDir: GetStreamBackupMetaPrefix()}
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ go_test(
],
embed = [":task"],
flaky = True,
<<<<<<< HEAD
=======
shard_count = 18,
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand Down
24 changes: 24 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ const (
flagCipherKey = "crypter.key"
flagCipherKeyFile = "crypter.key-file"

flagMetadataDownloadBatchSize = "metadata-download-batch-size"
defaultMetadataDownloadBatchSize = 128

unlimited = 0
crypterAES128KeyLen = 16
crypterAES192KeyLen = 24
Expand Down Expand Up @@ -242,6 +245,15 @@ type Config struct {

// whether there's explicit filter
ExplicitFilter bool `json:"-" toml:"-"`
<<<<<<< HEAD
=======

// KeyspaceName is the name of the keyspace of the task
KeyspaceName string `json:"keyspace-name" toml:"keyspace-name"`

// Metadata download batch size, such as metadata for log restore
MetadataDownloadBatchSize uint `json:"metadata-download-batch-size" toml:"metadata-download-batch-size"`
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
}

// DefineCommonFlags defines the flags common to all BRIE commands.
Expand Down Expand Up @@ -297,6 +309,11 @@ func DefineCommonFlags(flags *pflag.FlagSet) {
"by the hexadecimal string, eg: \"0123456789abcdef0123456789abcdef\"")
flags.String(flagCipherKeyFile, "", "FilePath, its content is used as the cipher-key")

flags.Uint(flagMetadataDownloadBatchSize, defaultMetadataDownloadBatchSize,
"the batch size of downloading metadata, such as log restore metadata for truncate or restore")

_ = flags.MarkHidden(flagMetadataDownloadBatchSize)

storage.DefineFlags(flags)
}

Expand Down Expand Up @@ -587,6 +604,10 @@ func (cfg *Config) ParseFromFlags(flags *pflag.FlagSet) error {
return errors.Trace(err)
}

if cfg.MetadataDownloadBatchSize, err = flags.GetUint(flagMetadataDownloadBatchSize); err != nil {
return errors.Trace(err)
}

return cfg.normalizePDURLs()
}

Expand Down Expand Up @@ -748,6 +769,9 @@ func (cfg *Config) adjust() {
if cfg.ChecksumConcurrency == 0 {
cfg.ChecksumConcurrency = variable.DefChecksumTableConcurrency
}
if cfg.MetadataDownloadBatchSize == 0 {
cfg.MetadataDownloadBatchSize = defaultMetadataDownloadBatchSize
}
}

func normalizePDURL(pd string, useTLS bool) (string, error) {
Expand Down
33 changes: 31 additions & 2 deletions br/pkg/task/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,8 +962,9 @@ func RunStreamTruncate(c context.Context, g glue.Glue, cmdName string, cfg *Stre

readMetaDone := console.ShowTask("Reading Metadata... ", glue.WithTimeCost())
metas := restore.StreamMetadataSet{
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
MetadataDownloadBatchSize: cfg.MetadataDownloadBatchSize,
Helper: stream.NewMetadataHelper(),
DryRun: cfg.DryRun,
}
shiftUntilTS, err := metas.LoadUntilAndCalculateShiftTS(ctx, storage, cfg.Until)
if err != nil {
Expand Down Expand Up @@ -1213,7 +1214,31 @@ func restoreStream(
}
}()

<<<<<<< HEAD
err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS)
=======
var taskName string
var checkpointRunner *checkpoint.CheckpointRunner[checkpoint.LogRestoreKeyType, checkpoint.LogRestoreValueType]
if cfg.UseCheckpoint {
taskName = cfg.generateLogRestoreTaskName(client.GetClusterID(ctx), cfg.StartTS, cfg.RestoreTS)
oldRatioFromCheckpoint, err := client.InitCheckpointMetadataForLogRestore(ctx, taskName, oldRatio)
if err != nil {
return errors.Trace(err)
}
oldRatio = oldRatioFromCheckpoint

checkpointRunner, err = client.StartCheckpointRunnerForLogRestore(ctx, taskName)
if err != nil {
return errors.Trace(err)
}
defer func() {
log.Info("wait for flush checkpoint...")
checkpointRunner.WaitForFinish(ctx, !gcDisabledRestorable)
}()
}

err = client.InstallLogFileManager(ctx, cfg.StartTS, cfg.RestoreTS, cfg.MetadataDownloadBatchSize)
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
if err != nil {
return err
}
Expand Down Expand Up @@ -1483,6 +1508,7 @@ func getFullBackupTS(
return backupmeta.GetEndVersion(), backupmeta.GetClusterId(), nil
}

<<<<<<< HEAD
func getGlobalResolvedTS(
ctx context.Context,
s storage.ExternalStorage,
Expand Down Expand Up @@ -1521,6 +1547,9 @@ func getGlobalResolvedTS(
}

func initFullBackupTables(
=======
func parseFullBackupTablesStorage(
>>>>>>> 6ad49e79b17 (br: make download metadata concurrency adjustable (#45639))
ctx context.Context,
cfg *RestoreConfig,
) (map[int64]*metautil.Table, error) {
Expand Down
Loading

0 comments on commit 88212df

Please sign in to comment.