Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: always get latest PD leader when access PD after initialized #46726

Merged
merged 3 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions br/pkg/lightning/importer/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,6 +412,7 @@ func TestCheckCSVHeader(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -465,6 +466,7 @@ func TestCheckTableEmpty(t *testing.T) {
dbMetas,
preInfoGetter,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -622,6 +624,7 @@ func TestLocalResource(t *testing.T) {
nil,
preInfoGetter,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
17 changes: 12 additions & 5 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func NewImportControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb,
cfg, p.DBMetas, preInfoGetter, cpdb, pdCli,
)

rc := &Controller{
Expand Down Expand Up @@ -525,6 +525,8 @@ func (rc *Controller) Close() {

// Run starts the restore task.
func (rc *Controller) Run(ctx context.Context) error {
failpoint.Inject("beforeRun", func() {})

opts := []func(context.Context) error{
rc.setGlobalVariables,
rc.restoreSchema,
Expand Down Expand Up @@ -1433,7 +1435,7 @@ const (

func (rc *Controller) keepPauseGCForDupeRes(ctx context.Context) (<-chan struct{}, error) {
tlsOpt := rc.tls.ToPDSecurityOption()
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.cfg.TiDB.PdAddr}, tlsOpt)
pdCli, err := pd.NewClientWithContext(ctx, []string{rc.pdCli.GetLeaderAddr()}, tlsOpt)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -1594,8 +1596,13 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

// Disable GC because TiDB enables GC already.

currentLeaderAddr := rc.pdCli.GetLeaderAddr()
// remove URL scheme
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "http://")
currentLeaderAddr = strings.TrimPrefix(currentLeaderAddr, "https://")
kvStore, err = driver.TiKVDriver{}.OpenWithOptions(
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", rc.cfg.TiDB.PdAddr, rc.keyspaceName),
fmt.Sprintf("tikv://%s?disableGC=true&keyspaceName=%s", currentLeaderAddr, rc.keyspaceName),
driver.WithSecurity(rc.tls.ToTiKVSecurityConfig()),
)
if err != nil {
Expand Down Expand Up @@ -1800,7 +1807,7 @@ func (rc *Controller) importTables(ctx context.Context) (finalErr error) {
}

func (rc *Controller) registerTaskToPD(ctx context.Context) (undo func(), _ error) {
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg)
etcdCli, err := dialEtcdWithCfg(ctx, rc.cfg, rc.pdCli.GetLeaderAddr())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -2102,7 +2109,7 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
rc.status.TotalFileSize.Store(estimatedSizeResult.SizeWithoutIndex)
}
if isLocalBackend(rc.cfg) {
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
pdController, err := pdutil.NewPdController(ctx, rc.pdCli.GetLeaderAddr(),
rc.tls.TLSConfig(), rc.tls.ToPDSecurityOption())
if err != nil {
return common.NormalizeOrWrapErr(common.ErrCreatePDClient, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
37 changes: 26 additions & 11 deletions br/pkg/lightning/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,21 @@

// PrecheckItemBuilder is used to build precheck items
type PrecheckItemBuilder struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
pdLeaderAddrGetter func() string
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
// pdCli **must not** be nil for local backend
func NewPrecheckItemBuilderFromConfig(ctx context.Context, cfg *config.Config, pdCli pd.Client, opts ...ropts.PrecheckItemBuilderOption) (*PrecheckItemBuilder, error) {
func NewPrecheckItemBuilderFromConfig(
ctx context.Context,
cfg *config.Config,
pdCli pd.Client,
opts ...ropts.PrecheckItemBuilderOption,
) (*PrecheckItemBuilder, error) {

Check warning on line 40 in br/pkg/lightning/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/precheck.go#L40

Added line #L40 was not covered by tests
var gerr error
builderCfg := new(ropts.PrecheckItemBuilderConfig)
for _, o := range opts {
Expand Down Expand Up @@ -71,7 +77,7 @@
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdCli), gerr

Check warning on line 80 in br/pkg/lightning/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/precheck.go#L80

Added line #L80 was not covered by tests
}

// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
Expand All @@ -80,12 +86,21 @@
dbMetas []*mydump.MDDatabaseMeta,
preInfoGetter PreImportInfoGetter,
checkpointsDB checkpoints.DB,
pdCli pd.Client,
) *PrecheckItemBuilder {
leaderAddrGetter := func() string {
return cfg.TiDB.PdAddr
}

Check warning on line 93 in br/pkg/lightning/importer/precheck.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/lightning/importer/precheck.go#L92-L93

Added lines #L92 - L93 were not covered by tests
// in tests we may not have a pdCli
if pdCli != nil {
leaderAddrGetter = pdCli.GetLeaderAddr
}
return &PrecheckItemBuilder{
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
cfg: cfg,
dbMetas: dbMetas,
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdLeaderAddrGetter: leaderAddrGetter,
}
}

Expand Down Expand Up @@ -117,7 +132,7 @@
case precheck.CheckLocalTempKVDir:
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg), nil
return NewCDCPITRCheckItem(b.cfg, b.pdLeaderAddrGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
22 changes: 14 additions & 8 deletions br/pkg/lightning/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,17 +747,19 @@ func (ci *checkpointCheckItem) checkpointIsValid(ctx context.Context, tableInfo
// CDCPITRCheckItem check downstream has enabled CDC or PiTR. It's exposed to let
// caller override the Instruction message.
type CDCPITRCheckItem struct {
cfg *config.Config
Instruction string
cfg *config.Config
Instruction string
leaderAddrGetter func() string
// used in test
etcdCli *clientv3.Client
}

// NewCDCPITRCheckItem creates a checker to check downstream has enabled CDC or PiTR.
func NewCDCPITRCheckItem(cfg *config.Config) precheck.Checker {
func NewCDCPITRCheckItem(cfg *config.Config, leaderAddrGetter func() string) precheck.Checker {
return &CDCPITRCheckItem{
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
cfg: cfg,
Instruction: "local backend is not compatible with them. Please switch to tidb backend then try again.",
leaderAddrGetter: leaderAddrGetter,
}
}

Expand All @@ -766,7 +768,11 @@ func (*CDCPITRCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckTargetUsingCDCPITR
}

func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client, error) {
func dialEtcdWithCfg(
ctx context.Context,
cfg *config.Config,
leaderAddr string,
) (*clientv3.Client, error) {
cfg2, err := cfg.ToTLS()
if err != nil {
return nil, err
Expand All @@ -775,7 +781,7 @@ func dialEtcdWithCfg(ctx context.Context, cfg *config.Config) (*clientv3.Client,

return clientv3.New(clientv3.Config{
TLS: tlsConfig,
Endpoints: []string{cfg.TiDB.PdAddr},
Endpoints: []string{leaderAddr},
AutoSyncInterval: 30 * time.Second,
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{
Expand All @@ -802,7 +808,7 @@ func (ci *CDCPITRCheckItem) Check(ctx context.Context) (*precheck.CheckResult, e

if ci.etcdCli == nil {
var err error
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg)
ci.etcdCli, err = dialEtcdWithCfg(ctx, ci.cfg, ci.leaderAddrGetter())
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,7 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
Backend: config.BackendLocal,
},
}
ci := NewCDCPITRCheckItem(cfg)
ci := NewCDCPITRCheckItem(cfg, nil)
checker := ci.(*CDCPITRCheckItem)
checker.etcdCli = testEtcdCluster.RandClient()
result, err := ci.Check(ctx)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
for _, checkItemID := range []precheck.CheckItemID{
precheck.CheckLargeDataFile,
precheck.CheckSourcePermission,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1192,7 +1192,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1351,7 +1351,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB())
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
rc := &Controller{
cfg: cfg,
tls: tls,
Expand Down Expand Up @@ -1447,7 +1447,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/log/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func InitLogger(cfg *Config, _ string) error {
// Disable annoying TiDB Log.
// TODO: some error logs outputs randomly, we need to fix them in TiDB.
// this LEVEL only affects SlowQueryLogger, later ReplaceGlobals will overwrite it.
tidbLogCfg.Level = "fatal"
tidbLogCfg.Level = "debug"
// this also init GRPCLogger, controlled by GRPC_DEBUG env.
err := logutil.InitLogger(&tidbLogCfg)
if err != nil {
Expand Down
Loading
Loading