Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: add precheck that PD and TiDB must belongs to same cluster #57709

Merged
merged 10 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,11 @@ error = '''
check local storage resource error
'''

["Lightning:PreCheck:ErrCheckPDTiDBSameCluster"]
error = '''
check PD and TiDB in the same cluster error
'''

["Lightning:PreCheck:ErrCheckTableEmpty"]
error = '''
check table empty error
Expand Down
7 changes: 7 additions & 0 deletions lightning/pkg/importer/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,10 @@ func (rc *Controller) checkCDCPiTR(ctx context.Context) error {
}
return rc.doPreCheckOnItem(ctx, precheck.CheckTargetUsingCDCPITR)
}

func (rc *Controller) checkPDTiDBFromSameCluster(ctx context.Context) error {
if rc.cfg.TikvImporter.Backend == config.BackendTiDB {
return nil
}
return rc.doPreCheckOnItem(ctx, precheck.CheckPDTiDBFromSameCluster)
}
3 changes: 3 additions & 0 deletions lightning/pkg/importer/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ func TestCheckCSVHeader(t *testing.T) {
preInfoGetter,
nil,
nil,
nil,
)
preInfoGetter.dbInfosCache = rc.dbInfos
err = rc.checkCSVHeader(ctx)
Expand Down Expand Up @@ -467,6 +468,7 @@ func TestCheckTableEmpty(t *testing.T) {
preInfoGetter,
nil,
nil,
nil,
)

rc := &Controller{
Expand Down Expand Up @@ -625,6 +627,7 @@ func TestLocalResource(t *testing.T) {
preInfoGetter,
nil,
nil,
nil,
)
rc := &Controller{
cfg: cfg,
Expand Down
5 changes: 4 additions & 1 deletion lightning/pkg/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func NewImportControllerWithPauser(
}

preCheckBuilder := NewPrecheckItemBuilder(
cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli,
cfg, p.DBMetas, preInfoGetter, cpdb, pdHTTPCli, db,
)

rc := &Controller{
Expand Down Expand Up @@ -1908,6 +1908,9 @@ func (rc *Controller) preCheckRequirements(ctx context.Context) error {
if err := rc.checkClusterRegion(ctx); err != nil {
return common.ErrCheckClusterRegion.Wrap(err).GenWithStackByArgs()
}
if err := rc.checkPDTiDBFromSameCluster(ctx); err != nil {
return common.ErrCheckPDTiDBFromSameCluster.Wrap(err).GenWithStackByArgs()
}
}
// even if checkpoint exists, we still need to make sure CDC/PiTR task is not running.
if err := rc.checkCDCPiTR(ctx); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion lightning/pkg/importer/import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestPreCheckFailed(t *testing.T) {
dbMetas: make([]*mydump.MDDatabaseMeta, 0),
}
cpdb := panicCheckpointDB{}
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, make([]*mydump.MDDatabaseMeta, 0), preInfoGetter, cpdb, nil, db)
ctl := &Controller{
cfg: cfg,
saveCpCh: make(chan saveCp),
Expand Down
8 changes: 7 additions & 1 deletion lightning/pkg/importer/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package importer

import (
"context"
"database/sql"

"github.com/pingcap/errors"
ropts "github.com/pingcap/tidb/lightning/pkg/importer/opts"
Expand All @@ -42,6 +43,7 @@ type PrecheckItemBuilder struct {
preInfoGetter PreImportInfoGetter
checkpointsDB checkpoints.DB
pdAddrsGetter func(context.Context) []string
targetDB *sql.DB
}

// NewPrecheckItemBuilderFromConfig creates a new PrecheckItemBuilder from config
Expand Down Expand Up @@ -91,7 +93,7 @@ func NewPrecheckItemBuilderFromConfig(
if err != nil {
return nil, errors.Trace(err)
}
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli), gerr
return NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, cpdb, pdHTTPCli, targetDB), gerr
}

// NewPrecheckItemBuilder creates a new PrecheckItemBuilder
Expand All @@ -101,6 +103,7 @@ func NewPrecheckItemBuilder(
preInfoGetter PreImportInfoGetter,
checkpointsDB checkpoints.DB,
pdHTTPCli pdhttp.Client,
targetDB *sql.DB,
) *PrecheckItemBuilder {
pdAddrsGetter := func(context.Context) []string {
return []string{cfg.TiDB.PdAddr}
Expand All @@ -125,6 +128,7 @@ func NewPrecheckItemBuilder(
preInfoGetter: preInfoGetter,
checkpointsDB: checkpointsDB,
pdAddrsGetter: pdAddrsGetter,
targetDB: targetDB,
}
}

Expand Down Expand Up @@ -157,6 +161,8 @@ func (b *PrecheckItemBuilder) BuildPrecheckItem(checkID precheck.CheckItemID) (p
return NewLocalTempKVDirCheckItem(b.cfg, b.preInfoGetter, b.dbMetas), nil
case precheck.CheckTargetUsingCDCPITR:
return NewCDCPITRCheckItem(b.cfg, b.pdAddrsGetter), nil
case precheck.CheckPDTiDBFromSameCluster:
return NewPDTiDBFromSameClusterCheckItem(b.targetDB, b.pdAddrsGetter), nil
default:
return nil, errors.Errorf("unsupported check item: %v", checkID)
}
Expand Down
74 changes: 74 additions & 0 deletions lightning/pkg/importer/precheck_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ package importer
import (
"cmp"
"context"
"database/sql"
"fmt"
"net/url"
"path/filepath"
"reflect"
"slices"
Expand All @@ -43,6 +45,7 @@ import (
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/table"
"github.com/pingcap/tidb/pkg/types"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/cdcutil"
"github.com/pingcap/tidb/pkg/util/engine"
"github.com/pingcap/tidb/pkg/util/set"
Expand Down Expand Up @@ -1433,3 +1436,74 @@ func hasDefault(col *model.ColumnInfo) bool {
return col.DefaultIsExpr || col.DefaultValue != nil || !mysql.HasNotNullFlag(col.GetFlag()) ||
col.IsGenerated() || mysql.HasAutoIncrementFlag(col.GetFlag())
}

// pdTiDBFromSameClusterCheckItem provides two sources of PD addresses and use
// util.CheckIfSameCluster to check if they are from the same cluster.
//
// The first source stands for PD leader's all etcd client URL addresses in most
// time, the second source stands for all PD nodes' first etcd client URL
// addresses.
//
// If we can't reach PD leader, the first source will be replaced by the PD
// address set in lightning's task configuration, or in TiDB's configuration.
// Then it may have false alert if PD has multiple endpoints and above
// configuration uses one of them, while etcd information uses another one, and
// there are no common addresses passed to util.CheckIfSameCluster.
type pdTiDBFromSameClusterCheckItem struct {
db *sql.DB
pdAddrsGetter func(context.Context) []string
Comment on lines +1453 to +1454
Copy link

@wlwilliamx wlwilliamx Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend replacing these fields in the struct like:

getPDAddrsFromSQL func(context.Context) []string
getPDAddrsFromPD func(context.Context) []string

This change improves flexibility and provides a clearer abstraction by distinguishing between two different PD address getters(and it's better for CDC to use in upstream and downstream). Additionally, it simplifies unit testing, as you can now mock and test the two getters independently.

Copy link
Contributor Author

@lance6716 lance6716 Nov 27, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

distinguishing between upstream and downstream PD address

There's no upstream PD addresses. However I understand your meaning is check the two ways to get PD addresses.

  1. I don't know how CDC can reuse this struct pdTiDBFromSameClusterCheckItem, because the intersection assertion can not be configurated, and CDC wants to assert non-intersection.

}

// NewPDTiDBFromSameClusterCheckItem creates a new pdTiDBFromSameClusterCheckItem.
func NewPDTiDBFromSameClusterCheckItem(
db *sql.DB,
pdAddrsGetter func(context.Context) []string,
) precheck.Checker {
return &pdTiDBFromSameClusterCheckItem{
db: db,
pdAddrsGetter: pdAddrsGetter,
}
}

func (i *pdTiDBFromSameClusterCheckItem) Check(ctx context.Context) (*precheck.CheckResult, error) {
theResult := &precheck.CheckResult{
Item: i.GetCheckItemID(),
Severity: precheck.Critical,
Passed: true,
Message: "PD and TiDB in configuration are from the same cluster",
}

pdLeaderAddrsGetter := func(ctx context.Context) ([]string, error) {
addrs := i.pdAddrsGetter(ctx)
for idx, addrURL := range addrs {
u, err2 := url.Parse(addrURL)
if err2 != nil {
return nil, errors.Trace(err2)
}
addrs[idx] = u.Host
}
return addrs, nil
}

sameCluster, pdAddrs, pdAddrsFromTiDB, err := util.CheckIfSameCluster(
ctx, pdLeaderAddrsGetter, util.GetPDsAddrWithoutScheme(i.db),
)
if err != nil {
return nil, errors.Trace(err)
}
if sameCluster {
return theResult, nil
}

theResult.Passed = false
theResult.Message = fmt.Sprintf(
"PD and TiDB in configuration are not from the same cluster, "+
"PD addresses read from PD are: %v, PD addresses read from TiDB are %v",
pdAddrs, pdAddrsFromTiDB,
)
return theResult, nil
}

func (*pdTiDBFromSameClusterCheckItem) GetCheckItemID() precheck.CheckItemID {
return precheck.CheckPDTiDBFromSameCluster
}
47 changes: 47 additions & 0 deletions lightning/pkg/importer/precheck_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/docker/go-units"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/streamhelper"
Expand Down Expand Up @@ -683,3 +684,49 @@ func (s *precheckImplSuite) TestCDCPITRCheckItem() {
s.Require().True(result.Passed)
s.Require().Equal("TiDB Lightning is not using local backend, skip this check", result.Message)
}

func (s *precheckImplSuite) TestPDTiDBFromSameCluster() {
ctx := context.Background()
db, mock, err := sqlmock.New()
s.Require().NoError(err)
pdAddrGetter := func(ctx context.Context) []string {
return []string{"https://1.2.3.4:2379", "http://127.0.0.1:2379"}
}

// check wrong host and port
mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
AddRow("1.2.3.4:2380").AddRow("10.20.30.40:2379"),
)

checker := NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
result, err := checker.Check(ctx)
s.Require().NoError(err)
s.Require().False(result.Passed)
s.Require().Equal(
"PD and TiDB in configuration are not from the same cluster, "+
"PD addresses read from PD are: [1.2.3.4:2379 127.0.0.1:2379], "+
"PD addresses read from TiDB are [1.2.3.4:2380 10.20.30.40:2379]",
result.Message)

// check partial match is enough
mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
AddRow("1.2.3.4:2379"),
)
checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
result, err = checker.Check(ctx)
s.Require().NoError(err)
s.Require().True(result.Passed)

mock.ExpectQuery(`SELECT STATUS_ADDRESS FROM INFORMATION_SCHEMA.CLUSTER_INFO WHERE TYPE = 'pd'`).
WillReturnRows(sqlmock.NewRows([]string{"STATUS_ADDRESS"}).
AddRow("2.3.4.5:2379").AddRow("3.4.5.6:2379").AddRow("1.2.3.4:2379"),
)
checker = NewPDTiDBFromSameClusterCheckItem(db, pdAddrGetter)
result, err = checker.Check(ctx)
s.Require().NoError(err)
s.Require().True(result.Passed)

s.Require().NoError(mock.ExpectationsWereMet())
}
2 changes: 1 addition & 1 deletion lightning/pkg/importer/precheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestPrecheckBuilderBasic(t *testing.T) {

preInfoGetter, err := NewPreImportInfoGetter(cfg, mockSrc.GetAllDBFileMetas(), mockSrc.GetStorage(), mockTarget, nil, nil)
require.NoError(t, err)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, mockSrc.GetAllDBFileMetas(), preInfoGetter, nil, nil, nil)
for _, checkItemID := range []precheck.CheckItemID{
precheck.CheckLargeDataFile,
precheck.CheckSourcePermission,
Expand Down
6 changes: 3 additions & 3 deletions lightning/pkg/importer/table_import_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1195,7 +1195,7 @@ func (s *tableRestoreSuite) TestCheckClusterResource() {
targetInfoGetter: targetInfoGetter,
srcStorage: mockStore,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, []*mydump.MDDatabaseMeta{}, preInfoGetter, nil, nil, nil)
rc := &Controller{
cfg: cfg,
store: mockStore,
Expand Down Expand Up @@ -1332,7 +1332,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion() {
targetInfoGetter: targetInfoGetter,
dbMetas: dbMetas,
}
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, dbMetas, preInfoGetter, checkpoints.NewNullCheckpointsDB(), nil, nil)
rc := &Controller{
cfg: cfg,
taskMgr: mockTaskMetaMgr{},
Expand Down Expand Up @@ -1425,7 +1425,7 @@ func (s *tableRestoreSuite) TestCheckHasLargeCSV() {
for _, ca := range cases {
template := NewSimpleTemplate()
cfg := &config.Config{Mydumper: config.MydumperRuntime{StrictFormat: ca.strictFormat}}
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil)
theCheckBuilder := NewPrecheckItemBuilder(cfg, ca.dbMetas, nil, nil, nil, nil)
rc := &Controller{
cfg: cfg,
checkTemplate: template,
Expand Down
2 changes: 2 additions & 0 deletions lightning/pkg/precheck/precheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const (
CheckLocalDiskPlacement CheckItemID = "CHECK_LOCAL_DISK_PLACEMENT"
CheckLocalTempKVDir CheckItemID = "CHECK_LOCAL_TEMP_KV_DIR"
CheckTargetUsingCDCPITR CheckItemID = "CHECK_TARGET_USING_CDC_PITR"
CheckPDTiDBFromSameCluster CheckItemID = "CHECK_PD_TIDB_FROM_SAME_CLUSTER"
)

var (
Expand All @@ -63,6 +64,7 @@ var (
CheckLocalDiskPlacement: "Local disk placement",
CheckLocalTempKVDir: "Local temp KV dir",
CheckTargetUsingCDCPITR: "Target using CDC/PITR",
CheckPDTiDBFromSameCluster: "PD and TiDB are from the same cluster",
}
)

Expand Down
19 changes: 10 additions & 9 deletions pkg/lightning/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,16 @@ var (
ErrInvalidSchemaFile = errors.Normalize("invalid schema file", errors.RFCCodeText("Lightning:Loader:ErrInvalidSchemaFile"))
ErrTooManySourceFiles = errors.Normalize("too many source files", errors.RFCCodeText("Lightning:Loader:ErrTooManySourceFiles"))

ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet"))
ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict"))
ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed"))
ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion"))
ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource"))
ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))
ErrSystemRequirementNotMet = errors.Normalize("system requirement not met", errors.RFCCodeText("Lightning:PreCheck:ErrSystemRequirementNotMet"))
ErrCheckpointSchemaConflict = errors.Normalize("checkpoint schema conflict", errors.RFCCodeText("Lightning:PreCheck:ErrCheckpointSchemaConflict"))
ErrPreCheckFailed = errors.Normalize("tidb-lightning pre-check failed: %s", errors.RFCCodeText("Lightning:PreCheck:ErrPreCheckFailed"))
ErrCheckClusterRegion = errors.Normalize("check tikv cluster region error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckClusterRegion"))
ErrCheckLocalResource = errors.Normalize("check local storage resource error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckLocalResource"))
ErrCheckTableEmpty = errors.Normalize("check table empty error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckTableEmpty"))
ErrCheckCSVHeader = errors.Normalize("check csv header error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCSVHeader"))
ErrCheckDataSource = errors.Normalize("check data source error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckDataSource"))
ErrCheckCDCPiTR = errors.Normalize("check TiCDC/PiTR task error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckCDCPiTR"))
ErrCheckPDTiDBFromSameCluster = errors.Normalize("check PD and TiDB in the same cluster error", errors.RFCCodeText("Lightning:PreCheck:ErrCheckPDTiDBSameCluster"))

ErrOpenCheckpoint = errors.Normalize("open checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrOpenCheckpoint"))
ErrReadCheckpoint = errors.Normalize("read checkpoint error", errors.RFCCodeText("Lightning:Checkpoint:ErrReadCheckpoint"))
Expand Down
Loading