From 7b353a013189920eb390b97eedebde449a2272cd Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 26 Nov 2024 15:36:17 +0800 Subject: [PATCH 1/9] lightning: add precheck that PD and TiDB must belongs to same cluster Signed-off-by: lance6716 --- lightning/pkg/importer/check_info.go | 7 +++ lightning/pkg/importer/import.go | 5 +- lightning/pkg/importer/precheck.go | 8 ++- lightning/pkg/importer/precheck_impl.go | 66 +++++++++++++++++++++++++ lightning/pkg/precheck/precheck.go | 2 + pkg/lightning/common/errors.go | 19 +++---- 6 files changed, 96 insertions(+), 11 deletions(-) diff --git a/lightning/pkg/importer/check_info.go b/lightning/pkg/importer/check_info.go index 78ca56010d564..e28eb167ce57d 100644 --- a/lightning/pkg/importer/check_info.go +++ b/lightning/pkg/importer/check_info.go @@ -159,3 +159,10 @@ func (rc *Controller) checkCDCPiTR(ctx context.Context) error { } return rc.doPreCheckOnItem(ctx, precheck.CheckTargetUsingCDCPITR) } + +func (rc *Controller) checkPDTiDBFromSameCluster(ctx context.Context) error { + if rc.cfg.TikvImporter.Backend == config.BackendTiDB { + return nil + } + return rc.doPreCheckOnItem(ctx, precheck.CheckPDTiDBFromSameCluster) +} diff --git a/lightning/pkg/importer/import.go b/lightning/pkg/importer/import.go index 2858f9dca0be1..ad0c55a8069ae 100644 --- a/lightning/pkg/importer/import.go +++ b/lightning/pkg/importer/import.go @@ -479,7 +479,7 @@ func NewImportControllerWithPauser( } preCheckBuilder := NewPrecheckItemBuilder( - cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli, + cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli, db, ) rc := &Controller{ @@ -1908,6 +1908,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error { if err := rc.checkClusterRegion(ctx); err != nil { return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs() } + if err := rc.checkPDTiDBFromSameCluster(ctx); err != nil { + return common.ErrCheckPDTiDBFromSameCluster.Wrap(err).GenWithStackByArgs() + } } // even if checkpoint exists, we still need to make sure CDC/PiTR task is not running. if err := rc.checkCDCPiTR(ctx); err != nil { diff --git a/lightning/pkg/importer/precheck.go b/lightning/pkg/importer/precheck.go index 477c80cd17c42..a4494b4b2eed6 100644 --- a/lightning/pkg/importer/precheck.go +++ b/lightning/pkg/importer/precheck.go @@ -16,6 +16,7 @@ package importer import ( "context" + "database/sql" "github.com/pingcap/errors" ropts "github.com/pingcap/tidb/lightning/pkg/importer/opts" @@ -42,6 +43,7 @@ type PrecheckItemBuilder struct { preInfoGetter PreImportInfoGetter checkpointsDB checkpoints.DB pdAddrsGetter func(context.Context) []string + targetDB *sql.DB } // NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config @@ -91,7 +93,7 @@ func NewPrecheckItemBuilderFromConfig( if err != nil { return nil, errors.Trace(err) } - return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli), gerr + return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli, targetDB), gerr } // NewPrecheckItemBuilder creates a new PrecheckItemBuilder @@ -101,6 +103,7 @@ func NewPrecheckItemBuilder( preInfoGetter PreImportInfoGetter, checkpointsDB checkpoints.DB, pdHTTPCli pdhttp.Client, + targetDB *sql.DB, ) *PrecheckItemBuilder { pdAddrsGetter := func(context.Context) []string { return []string{cfg.TiDB.PdAddr} @@ -125,6 +128,7 @@ func NewPrecheckItemBuilder( preInfoGetter: preInfoGetter, checkpointsDB: checkpointsDB, pdAddrsGetter: pdAddrsGetter, + targetDB: targetDB, } } @@ -157,6 +161,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil case precheck.CheckTargetUsingCDCPITR: return NewCDCPITRCheckItem(b.cfg, b.pdAddrsGetter), nil + case precheck.CheckPDTiDBFromSameCluster: + return NewPDTiDBFromSameClusterCheckItem(b.targetDB, b.pdAddrsGetter), nil default: return nil, errors.Errorf("unsupported check item: %v", checkID) } diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 14567eee5299f..1194808fd1700 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -17,6 +17,7 @@ package importer import ( "cmp" "context" + "database/sql" "fmt" "path/filepath" "reflect" @@ -1433,3 +1434,68 @@ func hasDefault(col *model.ColumnInfo) bool { return col.DefaultIsExpr || col.DefaultValue != nil || !mysql.HasNotNullFlag(col.GetFlag()) || col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag()) } + +type pdTiDBFromSameClusterCheckItem struct { + db *sql.DB + pdAddrsGetter func(context.Context) []string +} + +// NewPDTiDBFromSameClusterCheckItem creates a new pdTiDBFromSameClusterCheckItem. +func NewPDTiDBFromSameClusterCheckItem( + db *sql.DB, + pdAddrsGetter func(context.Context) []string, +) precheck.Checker { + return &pdTiDBFromSameClusterCheckItem{ + db: db, + pdAddrsGetter: pdAddrsGetter, + } +} + +func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.CheckResult, error) { + theResult := &precheck.CheckResult{ + Item: i.GetCheckItemID(), + Severity: precheck.Critical, + Passed: true, + Message: "PD and TiDB in configuration are from the same cluster", + } + + pdAddrs := i.pdAddrsGetter(ctx) + pdAddrsMap := make(map[string]struct{}, len(pdAddrs)) + for _, addr := range pdAddrs { + pdAddrsMap[addr] = struct{}{} + } + + pdAddrsFromTiDB := make([]string, 0, len(pdAddrs)) + rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO") + if err != nil { + return nil, errors.Trace(err) + } + defer rows.Close() + for rows.Next() { + var addr string + err = rows.Scan(&addr) + if err != nil { + return nil, errors.Trace(err) + } + // if intersection is not empty, we can say URLs from TiDB and PD are from the same cluster + if _, ok := pdAddrsMap[addr]; ok { + return theResult, nil + } + pdAddrsFromTiDB = append(pdAddrsFromTiDB, addr) + } + if err = rows.Err(); err != nil { + return nil, errors.Trace(err) + } + + theResult.Passed = false + theResult.Message = fmt.Sprintf( + "PD and TiDB in configuration are not from the same cluster, "+ + "PD addresses read from PD are: %v, PD addresses read from TiDB are %v", + pdAddrs, pdAddrsFromTiDB, + ) + return theResult, nil +} + +func (i *pdTiDBFromSameClusterCheckItem) GetCheckItemID() precheck.CheckItemID { + return precheck.CheckPDTiDBFromSameCluster +} diff --git a/lightning/pkg/precheck/precheck.go b/lightning/pkg/precheck/precheck.go index f785b458bed3e..47c2cc4665450 100644 --- a/lightning/pkg/precheck/precheck.go +++ b/lightning/pkg/precheck/precheck.go @@ -45,6 +45,7 @@ const ( CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT" CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR" CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR" + CheckPDTiDBFromSameCluster CheckItemID = "CHECK_PD_TIDB_FROM_SAME_CLUSTER" ) var ( @@ -63,6 +64,7 @@ var ( CheckLocalDiskPlacement: "Local disk placement", CheckLocalTempKVDir: "Local temp KV dir", CheckTargetUsingCDCPITR: "Target using CDC/PITR", + CheckPDTiDBFromSameCluster: "PD and TiDB are from the same cluster", } ) diff --git a/pkg/lightning/common/errors.go b/pkg/lightning/common/errors.go index 2ba7bc95e01f1..44caa0814108f 100644 --- a/pkg/lightning/common/errors.go +++ b/pkg/lightning/common/errors.go @@ -44,15 +44,16 @@ var ( ErrInvalidSchemaFile = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile")) ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles")) - ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet")) - ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict")) - ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed")) - ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion")) - ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource")) - ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty")) - ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader")) - ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource")) - ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR")) + ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet")) + ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict")) + ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed")) + ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion")) + ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource")) + ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty")) + ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader")) + ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource")) + ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR")) + ErrCheckPDTiDBFromSameCluster = errors.Normalize("check PD and TiDB in the same cluster error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckPDTiDBSameCluster")) ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint")) ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint")) From b62c228ba1ffef9de562a98b9b6a95ca77825b72 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 26 Nov 2024 16:21:51 +0800 Subject: [PATCH 2/9] fix CI Signed-off-by: lance6716 --- errors.toml | 5 +++++ lightning/pkg/importer/check_info_test.go | 3 +++ lightning/pkg/importer/import_test.go | 2 +- lightning/pkg/importer/precheck_impl.go | 13 ++++++++++--- lightning/pkg/importer/precheck_test.go | 2 +- lightning/pkg/importer/table_import_test.go | 6 +++--- 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/errors.toml b/errors.toml index 554ba7cb6f8dc..f8567cca42a7b 100644 --- a/errors.toml +++ b/errors.toml @@ -526,6 +526,11 @@ error = ''' check local storage resource error ''' +["Lightning:PreCheck:ErrCheckPDTiDBSameCluster"] +error = ''' +check PD and TiDB in the same cluster error +''' + ["Lightning:PreCheck:ErrCheckTableEmpty"] error = ''' check table empty error diff --git a/lightning/pkg/importer/check_info_test.go b/lightning/pkg/importer/check_info_test.go index ed02f15f23149..1550ff42cd1c3 100644 --- a/lightning/pkg/importer/check_info_test.go +++ b/lightning/pkg/importer/check_info_test.go @@ -413,6 +413,7 @@ func TestCheckCSVHeader(t *testing.T) { preInfoGetter, nil, nil, + nil, ) preInfoGetter.dbInfosCache = rc.dbInfos err = rc.checkCSVHeader(ctx) @@ -467,6 +468,7 @@ func TestCheckTableEmpty(t *testing.T) { preInfoGetter, nil, nil, + nil, ) rc := &Controller{ @@ -625,6 +627,7 @@ func TestLocalResource(t *testing.T) { preInfoGetter, nil, nil, + nil, ) rc := &Controller{ cfg: cfg, diff --git a/lightning/pkg/importer/import_test.go b/lightning/pkg/importer/import_test.go index 1dbc692a4c13a..f948353cbfe22 100644 --- a/lightning/pkg/importer/import_test.go +++ b/lightning/pkg/importer/import_test.go @@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) { dbMetas: make([]*mydump.MDDatabaseMeta, 0), } cpdb := panicCheckpointDB{} - theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil, db) ctl := &Controller{ cfg: cfg, saveCpCh: make(chan saveCp), diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 1194808fd1700..1cd04197cdcf3 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "fmt" + "net/url" "path/filepath" "reflect" "slices" @@ -1466,7 +1467,7 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C } pdAddrsFromTiDB := make([]string, 0, len(pdAddrs)) - rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO") + rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'") if err != nil { return nil, errors.Trace(err) } @@ -1477,11 +1478,17 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C if err != nil { return nil, errors.Trace(err) } + u, err2 := url.Parse(addr) + if err2 != nil { + return nil, errors.Trace(err2) + } + pdAddr := u.Host + // if intersection is not empty, we can say URLs from TiDB and PD are from the same cluster - if _, ok := pdAddrsMap[addr]; ok { + if _, ok := pdAddrsMap[pdAddr]; ok { return theResult, nil } - pdAddrsFromTiDB = append(pdAddrsFromTiDB, addr) + pdAddrsFromTiDB = append(pdAddrsFromTiDB, pdAddr) } if err = rows.Err(); err != nil { return nil, errors.Trace(err) diff --git a/lightning/pkg/importer/precheck_test.go b/lightning/pkg/importer/precheck_test.go index 01e67752322ae..59dfef2f9c1ca 100644 --- a/lightning/pkg/importer/precheck_test.go +++ b/lightning/pkg/importer/precheck_test.go @@ -33,7 +33,7 @@ func TestPrecheckBuilderBasic(t *testing.T) { preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) require.NoError(t, err) - theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil, nil) for _, checkItemID := range []precheck.CheckItemID{ precheck.CheckLargeDataFile, precheck.CheckSourcePermission, diff --git a/lightning/pkg/importer/table_import_test.go b/lightning/pkg/importer/table_import_test.go index 60beb1d206baf..80d1f0206f956 100644 --- a/lightning/pkg/importer/table_import_test.go +++ b/lightning/pkg/importer/table_import_test.go @@ -1195,7 +1195,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { targetInfoGetter: targetInfoGetter, srcStorage: mockStore, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil, nil) rc := &Controller{ cfg: cfg, store: mockStore, @@ -1332,7 +1332,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { targetInfoGetter: targetInfoGetter, dbMetas: dbMetas, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil, nil) rc := &Controller{ cfg: cfg, taskMgr: mockTaskMetaMgr{}, @@ -1425,7 +1425,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() { for _, ca := range cases { template := NewSimpleTemplate() cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}} - theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil, nil) rc := &Controller{ cfg: cfg, checkTemplate: template, From f8acc4d89336973459b398333ee57038ad5501e4 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 26 Nov 2024 16:21:51 +0800 Subject: [PATCH 3/9] fix CI Signed-off-by: lance6716 --- errors.toml | 5 +++++ lightning/pkg/importer/check_info_test.go | 3 +++ lightning/pkg/importer/import_test.go | 2 +- lightning/pkg/importer/precheck_impl.go | 13 ++++++++++--- lightning/pkg/importer/precheck_test.go | 2 +- lightning/pkg/importer/table_import_test.go | 6 +++--- 6 files changed, 23 insertions(+), 8 deletions(-) diff --git a/errors.toml b/errors.toml index 554ba7cb6f8dc..f8567cca42a7b 100644 --- a/errors.toml +++ b/errors.toml @@ -526,6 +526,11 @@ error = ''' check local storage resource error ''' +["Lightning:PreCheck:ErrCheckPDTiDBSameCluster"] +error = ''' +check PD and TiDB in the same cluster error +''' + ["Lightning:PreCheck:ErrCheckTableEmpty"] error = ''' check table empty error diff --git a/lightning/pkg/importer/check_info_test.go b/lightning/pkg/importer/check_info_test.go index ed02f15f23149..1550ff42cd1c3 100644 --- a/lightning/pkg/importer/check_info_test.go +++ b/lightning/pkg/importer/check_info_test.go @@ -413,6 +413,7 @@ func TestCheckCSVHeader(t *testing.T) { preInfoGetter, nil, nil, + nil, ) preInfoGetter.dbInfosCache = rc.dbInfos err = rc.checkCSVHeader(ctx) @@ -467,6 +468,7 @@ func TestCheckTableEmpty(t *testing.T) { preInfoGetter, nil, nil, + nil, ) rc := &Controller{ @@ -625,6 +627,7 @@ func TestLocalResource(t *testing.T) { preInfoGetter, nil, nil, + nil, ) rc := &Controller{ cfg: cfg, diff --git a/lightning/pkg/importer/import_test.go b/lightning/pkg/importer/import_test.go index 1dbc692a4c13a..f948353cbfe22 100644 --- a/lightning/pkg/importer/import_test.go +++ b/lightning/pkg/importer/import_test.go @@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) { dbMetas: make([]*mydump.MDDatabaseMeta, 0), } cpdb := panicCheckpointDB{} - theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil, db) ctl := &Controller{ cfg: cfg, saveCpCh: make(chan saveCp), diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 1194808fd1700..1cd04197cdcf3 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -19,6 +19,7 @@ import ( "context" "database/sql" "fmt" + "net/url" "path/filepath" "reflect" "slices" @@ -1466,7 +1467,7 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C } pdAddrsFromTiDB := make([]string, 0, len(pdAddrs)) - rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO") + rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'") if err != nil { return nil, errors.Trace(err) } @@ -1477,11 +1478,17 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C if err != nil { return nil, errors.Trace(err) } + u, err2 := url.Parse(addr) + if err2 != nil { + return nil, errors.Trace(err2) + } + pdAddr := u.Host + // if intersection is not empty, we can say URLs from TiDB and PD are from the same cluster - if _, ok := pdAddrsMap[addr]; ok { + if _, ok := pdAddrsMap[pdAddr]; ok { return theResult, nil } - pdAddrsFromTiDB = append(pdAddrsFromTiDB, addr) + pdAddrsFromTiDB = append(pdAddrsFromTiDB, pdAddr) } if err = rows.Err(); err != nil { return nil, errors.Trace(err) diff --git a/lightning/pkg/importer/precheck_test.go b/lightning/pkg/importer/precheck_test.go index 01e67752322ae..59dfef2f9c1ca 100644 --- a/lightning/pkg/importer/precheck_test.go +++ b/lightning/pkg/importer/precheck_test.go @@ -33,7 +33,7 @@ func TestPrecheckBuilderBasic(t *testing.T) { preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil) require.NoError(t, err) - theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil, nil) for _, checkItemID := range []precheck.CheckItemID{ precheck.CheckLargeDataFile, precheck.CheckSourcePermission, diff --git a/lightning/pkg/importer/table_import_test.go b/lightning/pkg/importer/table_import_test.go index 60beb1d206baf..80d1f0206f956 100644 --- a/lightning/pkg/importer/table_import_test.go +++ b/lightning/pkg/importer/table_import_test.go @@ -1195,7 +1195,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() { targetInfoGetter: targetInfoGetter, srcStorage: mockStore, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil, nil) rc := &Controller{ cfg: cfg, store: mockStore, @@ -1332,7 +1332,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() { targetInfoGetter: targetInfoGetter, dbMetas: dbMetas, } - theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil, nil) rc := &Controller{ cfg: cfg, taskMgr: mockTaskMetaMgr{}, @@ -1425,7 +1425,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() { for _, ca := range cases { template := NewSimpleTemplate() cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}} - theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil) + theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil, nil) rc := &Controller{ cfg: cfg, checkTemplate: template, From 6935ea7518f39e1882979b19920fe53ae9af5702 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Tue, 26 Nov 2024 22:32:19 +0800 Subject: [PATCH 4/9] Trigger Build From 05a10c8a772db18a3ce287f36c0de708e3302c57 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Nov 2024 09:19:19 +0800 Subject: [PATCH 5/9] fix CI Signed-off-by: lance6716 --- lightning/pkg/importer/precheck_impl.go | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 1cd04197cdcf3..0e5650250a11a 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -1462,8 +1462,12 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C pdAddrs := i.pdAddrsGetter(ctx) pdAddrsMap := make(map[string]struct{}, len(pdAddrs)) - for _, addr := range pdAddrs { - pdAddrsMap[addr] = struct{}{} + for _, addrURL := range pdAddrs { + u, err2 := url.Parse(addrURL) + if err2 != nil { + return nil, errors.Trace(err2) + } + pdAddrsMap[u.Host] = struct{}{} } pdAddrsFromTiDB := make([]string, 0, len(pdAddrs)) @@ -1478,17 +1482,12 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C if err != nil { return nil, errors.Trace(err) } - u, err2 := url.Parse(addr) - if err2 != nil { - return nil, errors.Trace(err2) - } - pdAddr := u.Host // if intersection is not empty, we can say URLs from TiDB and PD are from the same cluster - if _, ok := pdAddrsMap[pdAddr]; ok { + if _, ok := pdAddrsMap[addr]; ok { return theResult, nil } - pdAddrsFromTiDB = append(pdAddrsFromTiDB, pdAddr) + pdAddrsFromTiDB = append(pdAddrsFromTiDB, addr) } if err = rows.Err(); err != nil { return nil, errors.Trace(err) @@ -1503,6 +1502,6 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C return theResult, nil } -func (i *pdTiDBFromSameClusterCheckItem) GetCheckItemID() precheck.CheckItemID { +func (*pdTiDBFromSameClusterCheckItem) GetCheckItemID() precheck.CheckItemID { return precheck.CheckPDTiDBFromSameCluster } From 5b299527112d75da45ab38f389efbcb67a51c494 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Nov 2024 11:41:05 +0800 Subject: [PATCH 6/9] add UT Signed-off-by: lance6716 --- lightning/pkg/importer/precheck_impl_test.go | 47 ++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/lightning/pkg/importer/precheck_impl_test.go b/lightning/pkg/importer/precheck_impl_test.go index e35535ef6be69..37b7c68af2904 100644 --- a/lightning/pkg/importer/precheck_impl_test.go +++ b/lightning/pkg/importer/precheck_impl_test.go @@ -19,6 +19,7 @@ import ( "fmt" "testing" + "github.com/DATA-DOG/go-sqlmock" "github.com/docker/go-units" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/streamhelper" @@ -683,3 +684,49 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() { s.Require().True(result.Passed) s.Require().Equal("TiDB Lightning is not using local backend, skip this check", result.Message) } + +func (s *precheckImplSuite) TestPDTiDBFromSameCluster() { + ctx := context.Background() + db, mock, err := sqlmock.New() + s.Require().NoError(err) + pdAddrGetter := func(ctx context.Context) []string { + return []string{"http://1.2.3.4:2379", "https://2.3.4.5:2379"} + } + + // check wrong host and port + mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`). + WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}). + AddRow("1.2.3.4:2380").AddRow("10.20.30.40:2379"), + ) + + checker := NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter) + result, err := checker.Check(ctx) + s.Require().NoError(err) + s.Require().False(result.Passed) + s.Require().Equal( + "PD and TiDB in configuration are not from the same cluster, "+ + "PD addresses read from PD are: [http://1.2.3.4:2379 https://2.3.4.5:2379], "+ + "PD addresses read from TiDB are [1.2.3.4:2380 10.20.30.40:2379]", + result.Message) + + // check partial match is enough + mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`). + WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}). + AddRow("1.2.3.4:2379"), + ) + checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter) + result, err = checker.Check(ctx) + s.Require().NoError(err) + s.Require().True(result.Passed) + + mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`). + WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}). + AddRow("1.2.3.4:2379").AddRow("2.3.4.5:2379").AddRow("3.4.5.6:2379"), + ) + checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter) + result, err = checker.Check(ctx) + s.Require().NoError(err) + s.Require().True(result.Passed) + + s.Require().NoError(mock.ExpectationsWereMet()) +} From 222a2ed70abcc4ef02b55368f9e4dfc4577caf23 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Nov 2024 15:21:29 +0800 Subject: [PATCH 7/9] add comment Signed-off-by: lance6716 --- lightning/pkg/importer/precheck_impl.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 0e5650250a11a..279d64cee4635 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -1436,6 +1436,21 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag()) } +// pdTiDBFromSameClusterCheckItem checks whether PD and TiDB are from the same +// cluster. The implementation is +// +// - try to get PD leader's client URLs from PD configuration, by pdAddrsGetter. +// - From TiDB configuration, read all PD node information by SQL. For each PD +// node in the TiDB's cluster, read etcd member client URLs and return the first +// one. This is done by query INFORMATION_SCHEMA.CLUSTER_INFO table and the +// executor memtableRetriever.dataForTiDBClusterInfo. +// +// In happy path, we get PD leader's addresses from etcd, there must be an +// intersection with addresses from TiDB if they are in the same cluster, and +// vice versa. If we can't reach PD leader, we will use the PD address set in +// lightning's task configuration, or in TiDB's configuration. Then it may have +// false alert if PD has multiple endpoints and above configuration uses one of +// them, while etcd information uses another one. type pdTiDBFromSameClusterCheckItem struct { db *sql.DB pdAddrsGetter func(context.Context) []string @@ -1483,7 +1498,8 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C return nil, errors.Trace(err) } - // if intersection is not empty, we can say URLs from TiDB and PD are from the same cluster + // if intersection is not empty, we can say URLs from TiDB and PD are from the + // same cluster. See comments above pdTiDBFromSameClusterCheckItem struct. if _, ok := pdAddrsMap[addr]; ok { return theResult, nil } From b7a0c372ded37425c2644ff37f70f984afa2e3aa Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Nov 2024 18:33:50 +0800 Subject: [PATCH 8/9] provide common functions for CDC Signed-off-by: lance6716 --- lightning/pkg/importer/precheck_impl.go | 64 +++++++----------- lightning/pkg/importer/precheck_impl_test.go | 6 +- pkg/util/util.go | 69 ++++++++++++++++++++ 3 files changed, 97 insertions(+), 42 deletions(-) diff --git a/lightning/pkg/importer/precheck_impl.go b/lightning/pkg/importer/precheck_impl.go index 279d64cee4635..1acee2cccc1be 100644 --- a/lightning/pkg/importer/precheck_impl.go +++ b/lightning/pkg/importer/precheck_impl.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/cdcutil" "github.com/pingcap/tidb/pkg/util/engine" "github.com/pingcap/tidb/pkg/util/set" @@ -1436,21 +1437,18 @@ func hasDefault(col *model.ColumnInfo) bool { col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag()) } -// pdTiDBFromSameClusterCheckItem checks whether PD and TiDB are from the same -// cluster. The implementation is +// pdTiDBFromSameClusterCheckItem provides two sources of PD addresses and use +// util.CheckIfSameCluster to check if they are from the same cluster. // -// - try to get PD leader's client URLs from PD configuration, by pdAddrsGetter. -// - From TiDB configuration, read all PD node information by SQL. For each PD -// node in the TiDB's cluster, read etcd member client URLs and return the first -// one. This is done by query INFORMATION_SCHEMA.CLUSTER_INFO table and the -// executor memtableRetriever.dataForTiDBClusterInfo. +// The first source stands for PD leader's all etcd client URL addresses in most +// time, the second source stands for all PD nodes' first etcd client URL +// addresses. // -// In happy path, we get PD leader's addresses from etcd, there must be an -// intersection with addresses from TiDB if they are in the same cluster, and -// vice versa. If we can't reach PD leader, we will use the PD address set in -// lightning's task configuration, or in TiDB's configuration. Then it may have -// false alert if PD has multiple endpoints and above configuration uses one of -// them, while etcd information uses another one. +// If we can't reach PD leader, the first source will be replaced by the PD +// address set in lightning's task configuration, or in TiDB's configuration. +// Then it may have false alert if PD has multiple endpoints and above +// configuration uses one of them, while etcd information uses another one, and +// there are no common addresses passed to util.CheckIfSameCluster. type pdTiDBFromSameClusterCheckItem struct { db *sql.DB pdAddrsGetter func(context.Context) []string @@ -1475,38 +1473,26 @@ func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.C Message: "PD and TiDB in configuration are from the same cluster", } - pdAddrs := i.pdAddrsGetter(ctx) - pdAddrsMap := make(map[string]struct{}, len(pdAddrs)) - for _, addrURL := range pdAddrs { - u, err2 := url.Parse(addrURL) - if err2 != nil { - return nil, errors.Trace(err2) + pdLeaderAddrsGetter := func(ctx context.Context) ([]string, error) { + addrs := i.pdAddrsGetter(ctx) + for idx, addrURL := range addrs { + u, err2 := url.Parse(addrURL) + if err2 != nil { + return nil, errors.Trace(err2) + } + addrs[idx] = u.Host } - pdAddrsMap[u.Host] = struct{}{} + return addrs, nil } - pdAddrsFromTiDB := make([]string, 0, len(pdAddrs)) - rows, err := i.db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'") + sameCluster, pdAddrs, pdAddrsFromTiDB, err := util.CheckIfSameCluster( + ctx, pdLeaderAddrsGetter, util.GetPDsAddrWithoutScheme(i.db), + ) if err != nil { return nil, errors.Trace(err) } - defer rows.Close() - for rows.Next() { - var addr string - err = rows.Scan(&addr) - if err != nil { - return nil, errors.Trace(err) - } - - // if intersection is not empty, we can say URLs from TiDB and PD are from the - // same cluster. See comments above pdTiDBFromSameClusterCheckItem struct. - if _, ok := pdAddrsMap[addr]; ok { - return theResult, nil - } - pdAddrsFromTiDB = append(pdAddrsFromTiDB, addr) - } - if err = rows.Err(); err != nil { - return nil, errors.Trace(err) + if sameCluster { + return theResult, nil } theResult.Passed = false diff --git a/lightning/pkg/importer/precheck_impl_test.go b/lightning/pkg/importer/precheck_impl_test.go index 37b7c68af2904..2adfb4ce30f36 100644 --- a/lightning/pkg/importer/precheck_impl_test.go +++ b/lightning/pkg/importer/precheck_impl_test.go @@ -690,7 +690,7 @@ func (s *precheckImplSuite) TestPDTiDBFromSameCluster() { db, mock, err := sqlmock.New() s.Require().NoError(err) pdAddrGetter := func(ctx context.Context) []string { - return []string{"http://1.2.3.4:2379", "https://2.3.4.5:2379"} + return []string{"https://1.2.3.4:2379", "http://127.0.0.1:2379"} } // check wrong host and port @@ -705,7 +705,7 @@ func (s *precheckImplSuite) TestPDTiDBFromSameCluster() { s.Require().False(result.Passed) s.Require().Equal( "PD and TiDB in configuration are not from the same cluster, "+ - "PD addresses read from PD are: [http://1.2.3.4:2379 https://2.3.4.5:2379], "+ + "PD addresses read from PD are: [1.2.3.4:2379 127.0.0.1:2379], "+ "PD addresses read from TiDB are [1.2.3.4:2380 10.20.30.40:2379]", result.Message) @@ -721,7 +721,7 @@ func (s *precheckImplSuite) TestPDTiDBFromSameCluster() { mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`). WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}). - AddRow("1.2.3.4:2379").AddRow("2.3.4.5:2379").AddRow("3.4.5.6:2379"), + AddRow("2.3.4.5:2379").AddRow("3.4.5.6:2379").AddRow("1.2.3.4:2379"), ) checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter) result, err = checker.Check(ctx) diff --git a/pkg/util/util.go b/pkg/util/util.go index ded6218542d59..e8f056b4da0a0 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -17,6 +17,8 @@ package util import ( "bufio" "bytes" + "context" + "database/sql" "encoding/json" "fmt" "io" @@ -289,3 +291,70 @@ func GetRecoverError(r any) error { } return errors.Errorf("%v", r) } + +// CheckIfSameCluster reads PD addresses registered in etcd from two sources, to +// check if there are common addresses in both sources. If there are common +// addresses, the first return value is true which means we have confidence that +// the two sources are in the same cluster. If there are no common addresses, the +// first return value is false, which means 1) the two sources are in different +// clusters, or 2) the two sources may be in the same cluster but the getter +// function does not return the common addresses. +// +// The getters should keep the same format of the returned addresses, like both +// have URL scheme or not. +// +// The second and third return values are the PD addresses from the first and +// second getters respectively. The fourth return value is the error occurred. +func CheckIfSameCluster( + ctx context.Context, + pdAddrsGetter, pdAddrsGetter2 func(context.Context) ([]string, error), +) (bool, []string, []string, error) { + addrs, err := pdAddrsGetter(ctx) + if err != nil { + return false, nil, nil, errors.Trace(err) + } + addrsMap := make(map[string]struct{}, len(addrs)) + for _, a := range addrs { + addrsMap[a] = struct{}{} + } + + addrs2, err := pdAddrsGetter2(ctx) + if err != nil { + return false, nil, nil, errors.Trace(err) + } + for _, a := range addrs2 { + if _, ok := addrsMap[a]; ok { + return true, addrs, addrs2, nil + } + } + return false, addrs, addrs2, nil +} + +// GetPDsAddrWithoutScheme returns a function that read all PD nodes' first etcd +// client URL by SQL query. This is done by query INFORMATION_SCHEMA.CLUSTER_INFO +// table and its executor memtableRetriever.dataForTiDBClusterInfo. +func GetPDsAddrWithoutScheme(db *sql.DB) func(context.Context) ([]string, error) { + return func(ctx context.Context) ([]string, error) { + rows, err := db.QueryContext(ctx, "SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'") + if err != nil { + return nil, errors.Trace(err) + } + defer rows.Close() + var ret []string + for rows.Next() { + var addr string + err = rows.Scan(&addr) + if err != nil { + return nil, errors.Trace(err) + } + + // if intersection is not empty, we can say URLs from TiDB and PD are from the + // same cluster. See comments above pdTiDBFromSameClusterCheckItem struct. + ret = append(ret, addr) + } + if err = rows.Err(); err != nil { + return nil, errors.Trace(err) + } + return ret, nil + } +} From d7a8dfb78c08855b920a573aae130ad8680aab68 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Wed, 27 Nov 2024 20:31:54 +0800 Subject: [PATCH 9/9] fix Signed-off-by: lance6716 --- lightning/pkg/importer/BUILD.bazel | 1 + 1 file changed, 1 insertion(+) diff --git a/lightning/pkg/importer/BUILD.bazel b/lightning/pkg/importer/BUILD.bazel index dc25bdc18b42d..4f2413b0832a7 100644 --- a/lightning/pkg/importer/BUILD.bazel +++ b/lightning/pkg/importer/BUILD.bazel @@ -66,6 +66,7 @@ go_library( "//pkg/table/tables", "//pkg/tablecodec", "//pkg/types", + "//pkg/util", "//pkg/util/cdcutil", "//pkg/util/codec", "//pkg/util/collate",