Skip to content

Commit

Permalink
executor/brie: use the default value from flags (#48025) (#48442)
Browse files Browse the repository at this point in the history
close #48000
  • Loading branch information
ti-chi-bot authored Dec 8, 2023
1 parent 6b72ccd commit 1725235
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 17 deletions.
3 changes: 2 additions & 1 deletion br/pkg/task/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ go_test(
],
embed = [":task"],
flaky = True,
shard_count = 18,
shard_count = 21,
deps = [
"//br/pkg/conn",
"//br/pkg/errors",
Expand All @@ -111,6 +111,7 @@ go_test(
"//parser/model",
"//statistics/handle",
"//tablecodec",
"//util/table-filter",
"@com_github_golang_protobuf//proto",
"@com_github_pingcap_errors//:errors",
"@com_github_pingcap_kvproto//pkg/brpb",
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/task/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"github.com/pingcap/tidb/util/mathutil"
"github.com/spf13/pflag"
"github.com/tikv/client-go/v2/oracle"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -773,6 +774,21 @@ func ParseTSString(ts string, tzCheck bool) (uint64, error) {
return oracle.GoTimeToTS(t1), nil
}

func DefaultBackupConfig() BackupConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineBackupFlags(fs)
cfg := BackupConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
if err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
}
return cfg
}

func parseCompressionType(s string) (backuppb.CompressionType, error) {
var ct backuppb.CompressionType
switch s {
Expand Down
10 changes: 10 additions & 0 deletions br/pkg/task/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,16 @@ func HiddenFlagsForStream(flags *pflag.FlagSet) {
storage.HiddenFlagsForStream(flags)
}

func DefaultConfig() Config {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
cfg := Config{}
if err := cfg.ParseFromFlags(fs); err != nil {
log.Panic("infallible operation failed.", zap.Error(err))
}
return cfg
}

// DefineDatabaseFlags defines the required --db flag for `db` subcommand.
func DefineDatabaseFlags(command *cobra.Command) {
command.Flags().String(flagDatabase, "", "database name")
Expand Down
80 changes: 80 additions & 0 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import (

backup "github.com/pingcap/kvproto/pkg/brpb"
"github.com/pingcap/kvproto/pkg/encryptionpb"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/utils"
"github.com/pingcap/tidb/config"
filter "github.com/pingcap/tidb/util/table-filter"
"github.com/spf13/pflag"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -157,3 +160,80 @@ func TestCheckCipherKey(t *testing.T) {
}
}
}

func must[T any](t T, err error) T {
if err != nil {
panic(err)
}
return t
}

func expectedDefaultConfig() Config {
return Config{
BackendOptions: storage.BackendOptions{S3: storage.S3BackendOptions{ForcePathStyle: true}},
PD: []string{"127.0.0.1:2379"},
ChecksumConcurrency: 4,
Checksum: true,
SendCreds: true,
CheckRequirements: true,
FilterStr: []string(nil),
TableFilter: filter.CaseInsensitive(must(filter.Parse([]string{"*.*"}))),
Schemas: map[string]struct{}{},
Tables: map[string]struct{}{},
SwitchModeInterval: 300000000000,
GRPCKeepaliveTime: 10000000000,
GRPCKeepaliveTimeout: 3000000000,
CipherInfo: backup.CipherInfo{CipherType: 1},
MetadataDownloadBatchSize: 0x80,
}
}

func expectedDefaultBackupConfig() BackupConfig {
return BackupConfig{
Config: expectedDefaultConfig(),
GCTTL: utils.DefaultBRGCSafePointTTL,
CompressionConfig: CompressionConfig{
CompressionType: backup.CompressionType_ZSTD,
},
IgnoreStats: true,
UseBackupMetaV2: true,
UseCheckpoint: true,
}
}

func expectedDefaultRestoreConfig() RestoreConfig {
defaultConfig := expectedDefaultConfig()
defaultConfig.Concurrency = defaultRestoreConcurrency
return RestoreConfig{
Config: defaultConfig,
RestoreCommonConfig: RestoreCommonConfig{Online: false,
MergeSmallRegionSizeBytes: 0x6000000,
MergeSmallRegionKeyCount: 0xea600,
WithSysTable: false,
ResetSysUsers: []string{"cloud_admin", "root"}},
NoSchema: false,
PDConcurrency: 0x1,
BatchFlushInterval: 16000000000,
DdlBatchSize: 0x80,
WithPlacementPolicy: "STRICT",
UseCheckpoint: true,
}
}

func TestDefault(t *testing.T) {
def := DefaultConfig()
defaultConfig := expectedDefaultConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultBackup(t *testing.T) {
def := DefaultBackupConfig()
defaultConfig := expectedDefaultBackupConfig()
require.Equal(t, defaultConfig, def)
}

func TestDefaultRestore(t *testing.T) {
def := DefaultRestoreConfig()
defaultConfig := expectedDefaultRestoreConfig()
require.Equal(t, defaultConfig, def)
}
17 changes: 17 additions & 0 deletions br/pkg/task/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,6 +569,23 @@ func removeCheckpointDataForLogRestore(ctx context.Context, storageName string,
return errors.Trace(checkpoint.RemoveCheckpointDataForLogRestore(ctx, s, taskName, clusterID))
}

func DefaultRestoreConfig() RestoreConfig {
fs := pflag.NewFlagSet("dummy", pflag.ContinueOnError)
DefineCommonFlags(fs)
DefineRestoreFlags(fs)
cfg := RestoreConfig{}
err := multierr.Combine(
cfg.ParseFromFlags(fs),
cfg.RestoreCommonConfig.ParseFromFlags(fs),
cfg.Config.ParseFromFlags(fs),
)
if err != nil {
log.Panic("infallible failed.", zap.Error(err))
}

return cfg
}

// RunRestore starts a restore task inside the current goroutine.
func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConfig) error {
etcdCLI, err := dialEtcdWithCfg(c, cfg.Config)
Expand Down
30 changes: 14 additions & 16 deletions executor/brie.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,21 +228,15 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
}

tidbCfg := config.GetGlobalConfig()
cfg := task.Config{
TLS: task.TLSConfig{
CA: tidbCfg.Security.ClusterSSLCA,
Cert: tidbCfg.Security.ClusterSSLCert,
Key: tidbCfg.Security.ClusterSSLKey,
},
PD: strings.Split(tidbCfg.Path, ","),
Concurrency: 4,
Checksum: true,
SendCreds: true,
LogProgress: true,
CipherInfo: backuppb.CipherInfo{
CipherType: encryptionpb.EncryptionMethod_PLAINTEXT,
},
tlsCfg := task.TLSConfig{
CA: tidbCfg.Security.ClusterSSLCA,
Cert: tidbCfg.Security.ClusterSSLCert,
Key: tidbCfg.Security.ClusterSSLKey,
}
pds := strings.Split(tidbCfg.Path, ",")
cfg := task.DefaultConfig()
cfg.PD = pds
cfg.TLS = tlsCfg

storageURL, err := storage.ParseRawURL(s.Storage)
if err != nil {
Expand Down Expand Up @@ -310,7 +304,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)

switch s.Kind {
case ast.BRIEKindBackup:
e.backupCfg = &task.BackupConfig{Config: cfg}
bcfg := task.DefaultBackupConfig()
bcfg.Config = cfg
e.backupCfg = &bcfg

for _, opt := range s.Options {
switch opt.Tp {
Expand Down Expand Up @@ -338,7 +334,9 @@ func (b *executorBuilder) buildBRIE(s *ast.BRIEStmt, schema *expression.Schema)
}

case ast.BRIEKindRestore:
e.restoreCfg = &task.RestoreConfig{Config: cfg}
rcfg := task.DefaultRestoreConfig()
rcfg.Config = cfg
e.restoreCfg = &rcfg
for _, opt := range s.Options {
switch opt.Tp {
case ast.BRIEOptionOnline:
Expand Down

0 comments on commit 1725235

Please sign in to comment.