diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index d080d1bad16cf..cac695801a64a 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -527,6 +527,7 @@ type TikvImporter struct { DiskQuota ByteSize `toml:"disk-quota" json:"disk-quota"` RangeConcurrency int `toml:"range-concurrency" json:"range-concurrency"` DuplicateResolution DuplicateResolutionAlgorithm `toml:"duplicate-resolution" json:"duplicate-resolution"` + IncrementalImport bool `toml:"incremental-import" json:"incremental-import"` EngineMemCacheSize ByteSize `toml:"engine-mem-cache-size" json:"engine-mem-cache-size"` LocalWriterMemCacheSize ByteSize `toml:"local-writer-mem-cache-size" json:"local-writer-mem-cache-size"` diff --git a/br/pkg/lightning/restore/check_info.go b/br/pkg/lightning/restore/check_info.go index f97afc33b7cd0..4da674e1cd40a 100644 --- a/br/pkg/lightning/restore/check_info.go +++ b/br/pkg/lightning/restore/check_info.go @@ -17,6 +17,7 @@ package restore import ( "bytes" "context" + "database/sql" "fmt" "io" "path/filepath" @@ -24,10 +25,15 @@ import ( "sort" "strconv" "strings" + "sync" "github.com/docker/go-units" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" + "modernc.org/mathutil" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/br/pkg/lightning/backend" "github.com/pingcap/tidb/br/pkg/lightning/backend/kv" @@ -38,6 +44,7 @@ import ( "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/verification" "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tidb/br/pkg/utils" "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/table" @@ -45,9 +52,6 @@ import ( "github.com/pingcap/tidb/types" "github.com/tikv/pd/server/api" pdconfig "github.com/tikv/pd/server/config" - - "go.uber.org/zap" - "modernc.org/mathutil" ) const ( @@ -1053,3 +1057,84 @@ outloop: log.L().Info("Sample source data", zap.String("table", tableMeta.Name), zap.Float64("IndexRatio", tableMeta.IndexRatio), zap.Bool("IsSourceOrder", tableMeta.IsRowOrdered)) return nil } + +func (rc *Controller) checkTableEmpty(ctx context.Context) error { + if rc.cfg.TikvImporter.Backend == config.BackendTiDB || rc.cfg.TikvImporter.IncrementalImport { + return nil + } + db, _ := rc.tidbGlue.GetDB() + + tableCount := 0 + for _, db := range rc.dbMetas { + tableCount += len(db.Tables) + } + + var lock sync.Mutex + tableNames := make([]string, 0) + concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency) + ch := make(chan string, concurrency) + eg, gCtx := errgroup.WithContext(ctx) + for i := 0; i < concurrency; i++ { + eg.Go(func() error { + for tblName := range ch { + // skip tables that have checkpoint + if rc.cfg.Checkpoint.Enable { + _, err := rc.checkpointsDB.Get(gCtx, tblName) + switch { + case err == nil: + continue + case errors.IsNotFound(err): + default: + return errors.Trace(err) + } + } + + hasData, err1 := tableContainsData(gCtx, db, tblName) + if err1 != nil { + return err1 + } + if hasData { + lock.Lock() + tableNames = append(tableNames, tblName) + lock.Unlock() + } + } + return nil + }) + } + for _, db := range rc.dbMetas { + for _, tbl := range db.Tables { + ch <- common.UniqueTable(tbl.DB, tbl.Name) + } + } + close(ch) + if err := eg.Wait(); err != nil { + if common.IsContextCanceledError(err) { + return nil + } + return errors.Trace(err) + } + + if len(tableNames) > 0 { + // sort the failed names + sort.Strings(tableNames) + msg := fmt.Sprintf("table(s) [%s] are not empty", strings.Join(tableNames, ", ")) + rc.checkTemplate.Collect(Critical, false, msg) + } + return nil +} + +func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) { + query := "select 1 from " + tableName + " limit 1" + var dump int + err := db.QueryRowContext(ctx, query).Scan(&dump) + + switch { + case err == sql.ErrNoRows: + return false, nil + case err != nil: + return false, errors.Trace(err) + default: + return true, nil + } +} diff --git a/br/pkg/lightning/restore/check_info_test.go b/br/pkg/lightning/restore/check_info_test.go index ccc4aa74c0c28..c679298f6a612 100644 --- a/br/pkg/lightning/restore/check_info_test.go +++ b/br/pkg/lightning/restore/check_info_test.go @@ -16,15 +16,18 @@ package restore import ( "context" + "database/sql" "fmt" "os" "path/filepath" + "github.com/DATA-DOG/go-sqlmock" . "github.com/pingcap/check" "github.com/pingcap/failpoint" "github.com/pingcap/tidb/br/pkg/lightning/checkpoints" "github.com/pingcap/tidb/br/pkg/lightning/config" + "github.com/pingcap/tidb/br/pkg/lightning/glue" "github.com/pingcap/tidb/br/pkg/lightning/mydump" "github.com/pingcap/tidb/br/pkg/lightning/worker" "github.com/pingcap/tidb/br/pkg/storage" @@ -404,6 +407,142 @@ func (s *checkInfoSuite) TestCheckCSVHeader(c *C) { } } +func (s *checkInfoSuite) TestCheckTableEmpty(c *C) { + dir := c.MkDir() + cfg := config.NewConfig() + cfg.Checkpoint.Enable = false + dbMetas := []*mydump.MDDatabaseMeta{ + { + Name: "test1", + Tables: []*mydump.MDTableMeta{ + { + DB: "test1", + Name: "tbl1", + }, + { + DB: "test1", + Name: "tbl2", + }, + }, + }, + { + Name: "test2", + Tables: []*mydump.MDTableMeta{ + { + DB: "test2", + Name: "tbl1", + }, + }, + }, + } + + rc := &Controller{ + cfg: cfg, + dbMetas: dbMetas, + checkpointsDB: checkpoints.NewNullCheckpointsDB(), + } + + ctx := context.Background() + + // test tidb will do nothing + rc.cfg.TikvImporter.Backend = config.BackendTiDB + err := rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + // test incremental mode + rc.cfg.TikvImporter.Backend = config.BackendLocal + rc.cfg.TikvImporter.IncrementalImport = true + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + + rc.cfg.TikvImporter.IncrementalImport = false + db, mock, err := sqlmock.New() + c.Assert(err, IsNil) + mock.MatchExpectationsInOrder(false) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + // not error, need not to init check template + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + + // single table contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl := rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test2`.`tbl1`\\] are not empty") + + // multi tables contains data + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + mock.MatchExpectationsInOrder(false) + mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + mock.ExpectQuery("select 1 from `test2`.`tbl1` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).AddRow(1)) + rc.checkTemplate = NewSimpleTemplate() + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) + tmpl = rc.checkTemplate.(*SimpleTemplate) + c.Assert(len(tmpl.criticalMsgs), Equals, 1) + c.Assert(tmpl.criticalMsgs[0], Matches, "table\\(s\\) \\[`test1`.`tbl1`, `test2`.`tbl1`\\] are not empty") + + // init checkpoint with only two of the three tables + dbInfos := map[string]*checkpoints.TidbDBInfo{ + "test1": { + Name: "test1", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + "test2": { + Name: "test2", + Tables: map[string]*checkpoints.TidbTableInfo{ + "tbl1": { + Name: "tbl1", + }, + }, + }, + } + rc.cfg.Checkpoint.Enable = true + rc.checkpointsDB = checkpoints.NewFileCheckpointsDB(filepath.Join(dir, "cp.pb")) + err = rc.checkpointsDB.Initialize(ctx, cfg, dbInfos) + c.Check(err, IsNil) + db, mock, err = sqlmock.New() + c.Assert(err, IsNil) + rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone) + // only need to check the one that is not in checkpoint + mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1"). + WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows)) + err = rc.checkTableEmpty(ctx) + c.Assert(err, IsNil) + c.Assert(mock.ExpectationsWereMet(), IsNil) +} + func (s *checkInfoSuite) TestLocalResource(c *C) { dir := c.MkDir() mockStore, err := storage.NewLocalStorage(dir) diff --git a/br/pkg/lightning/restore/meta_manager.go b/br/pkg/lightning/restore/meta_manager.go index 544b91c0b5f90..49358a9aee102 100644 --- a/br/pkg/lightning/restore/meta_manager.go +++ b/br/pkg/lightning/restore/meta_manager.go @@ -1027,9 +1027,65 @@ func (m noopTableMetaMgr) UpdateTableBaseChecksum(ctx context.Context, checksum } func (m noopTableMetaMgr) CheckAndUpdateLocalChecksum(ctx context.Context, checksum *verify.KVChecksum, hasLocalDupes bool) (bool, bool, *verify.KVChecksum, error) { - return false, false, nil, nil + return true, true, &verify.KVChecksum{}, nil } func (m noopTableMetaMgr) FinishTable(ctx context.Context) error { return nil } + +type singleMgrBuilder struct{} + +func (b singleMgrBuilder) Init(context.Context) error { + return nil +} + +func (b singleMgrBuilder) TaskMetaMgr(pd *pdutil.PdController) taskMetaMgr { + return &singleTaskMetaMgr{ + pd: pd, + } +} + +func (b singleMgrBuilder) TableMetaMgr(tr *TableRestore) tableMetaMgr { + return noopTableMetaMgr{} +} + +type singleTaskMetaMgr struct { + pd *pdutil.PdController +} + +func (m *singleTaskMetaMgr) InitTask(ctx context.Context, source int64) error { + return nil +} + +func (m *singleTaskMetaMgr) CheckTasksExclusively(ctx context.Context, action func(tasks []taskMeta) ([]taskMeta, error)) error { + _, err := action(nil) + return err +} + +func (m *singleTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error) { + return m.pd.RemoveSchedulers(ctx) +} + +func (m *singleTaskMetaMgr) CheckTaskExist(ctx context.Context) (bool, error) { + return true, nil +} + +func (m *singleTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error) { + return true, true, nil +} + +func (m *singleTaskMetaMgr) Cleanup(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupTask(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) CleanupAllMetas(ctx context.Context) error { + return nil +} + +func (m *singleTaskMetaMgr) Close() { +} diff --git a/br/pkg/lightning/restore/restore.go b/br/pkg/lightning/restore/restore.go index 82a8465eb8181..79f132b1cf5f6 100644 --- a/br/pkg/lightning/restore/restore.go +++ b/br/pkg/lightning/restore/restore.go @@ -379,14 +379,17 @@ func NewRestoreControllerWithPauser( } var metaBuilder metaMgrBuilder - switch cfg.TikvImporter.Backend { - case config.BackendLocal, config.BackendImporter: + isSSTImport := cfg.TikvImporter.Backend == config.BackendLocal || cfg.TikvImporter.Backend == config.BackendImporter + switch { + case isSSTImport && cfg.TikvImporter.IncrementalImport: metaBuilder = &dbMetaMgrBuilder{ db: db, taskID: cfg.TaskID, schema: cfg.App.MetaSchemaName, needChecksum: cfg.PostRestore.Checksum != config.OpLevelOff, } + case isSSTImport: + metaBuilder = singleMgrBuilder{} default: metaBuilder = noopMetaMgrBuilder{} } @@ -1967,11 +1970,6 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } } } - err = rc.checkCSVHeader(ctx, rc.dbMetas) - if err != nil { - return err - } - if len(checkPointCriticalMsgs) != 0 { rc.checkTemplate.Collect(Critical, false, strings.Join(checkPointCriticalMsgs, "\n")) } else { @@ -1982,6 +1980,14 @@ func (rc *Controller) DataCheck(ctx context.Context) error { } else { rc.checkTemplate.Collect(Critical, true, "table schemas are valid") } + + if err := rc.checkTableEmpty(ctx); err != nil { + return errors.Trace(err) + } + if err = rc.checkCSVHeader(ctx, rc.dbMetas); err != nil { + return err + } + return nil } diff --git a/br/tests/lightning_distributed_import/config.toml b/br/tests/lightning_distributed_import/config.toml index 200af8e45dfdc..947b16037dd5d 100644 --- a/br/tests/lightning_distributed_import/config.toml +++ b/br/tests/lightning_distributed_import/config.toml @@ -1,6 +1,7 @@ [tikv-importer] backend = 'local' duplicate-resolution = 'none' +incremental-import = true [post-restore] checksum = "required" diff --git a/br/tests/lightning_duplicate_detection/config1.toml b/br/tests/lightning_duplicate_detection/config1.toml index 0b2b6df2a70e8..6497e9e30949b 100644 --- a/br/tests/lightning_duplicate_detection/config1.toml +++ b/br/tests/lightning_duplicate_detection/config1.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_duplicate_detection/config2.toml b/br/tests/lightning_duplicate_detection/config2.toml index e978ffb9cd8b5..760f50168508a 100644 --- a/br/tests/lightning_duplicate_detection/config2.toml +++ b/br/tests/lightning_duplicate_detection/config2.toml @@ -6,6 +6,7 @@ table-concurrency = 10 [tikv-importer] backend = "local" duplicate-resolution = 'record' +incremental-import = true [checkpoint] enable = true diff --git a/br/tests/lightning_incremental/config.toml b/br/tests/lightning_incremental/config.toml index e69de29bb2d1d..761e60b91b804 100644 --- a/br/tests/lightning_incremental/config.toml +++ b/br/tests/lightning_incremental/config.toml @@ -0,0 +1,2 @@ +[tikv-importer] +incremental-import = true diff --git a/br/tests/lightning_local_backend/run.sh b/br/tests/lightning_local_backend/run.sh index 6d0e7e9864145..5843210fea738 100755 --- a/br/tests/lightning_local_backend/run.sh +++ b/br/tests/lightning_local_backend/run.sh @@ -20,12 +20,23 @@ check_cluster_version 4 0 0 'local backend' || exit 0 ENGINE_COUNT=6 -# First, verify that inject with not leader error is fine. -rm -f "$TEST_DIR/lightning-local.log" +# Test check table contains data rm -f "/tmp/tidb_lightning_checkpoint_local_backend_test.pb" +rm -rf $TEST_DIR/lightning.log run_sql 'DROP DATABASE IF EXISTS cpeng;' -export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +run_sql 'CREATE DATABASE cpeng;' +run_sql 'CREATE TABLE cpeng.a (c int);' +run_sql 'CREATE TABLE cpeng.b (c int);' +run_sql "INSERT INTO cpeng.a values (1), (2);" +run_sql "INSERT INTO cpeng.b values (3);" +! run_lightning --backend local --enable-checkpoint=0 +grep -Fq 'table(s) [`cpeng`.`a`, `cpeng`.`b`] are not empty' $TEST_DIR/lightning.log + +# First, verify that inject with not leader error is fine. +export GO_FAILPOINTS='github.com/pingcap/tidb/br/pkg/lightning/backend/local/FailIngestMeta=1*return("notleader")' +rm -f "$TEST_DIR/lightning-local.log" +run_sql 'DROP DATABASE IF EXISTS cpeng;' run_lightning --backend local --enable-checkpoint=1 --log-file "$TEST_DIR/lightning-local.log" --config "tests/$TEST_NAME/config.toml" # Check that everything is correctly imported diff --git a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql index 887540be58110..1738b64457de6 100644 --- a/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql +++ b/br/tests/lightning_tidb_rowid/data/rowid.pre_rebase-schema.sql @@ -1 +1 @@ -create table pre_rebase (pk varchar(6) primary key) auto_increment=70000; +create table pre_rebase (pk varchar(6) primary key /*T![clustered_index] NONCLUSTERED */) auto_increment=70000; diff --git a/br/tests/lightning_tidb_rowid/run.sh b/br/tests/lightning_tidb_rowid/run.sh index e877f420cf43f..ae762c514d93c 100755 --- a/br/tests/lightning_tidb_rowid/run.sh +++ b/br/tests/lightning_tidb_rowid/run.sh @@ -58,8 +58,13 @@ for BACKEND in local importer tidb; do run_sql 'SELECT count(*), min(_tidb_rowid), max(_tidb_rowid) FROM rowid.pre_rebase' check_contains 'count(*): 1' - check_contains 'min(_tidb_rowid): 70000' - check_contains 'max(_tidb_rowid): 70000' + if [ "$BACKEND" == 'tidb' ]; then + check_contains 'min(_tidb_rowid): 70000' + check_contains 'max(_tidb_rowid): 70000' + else + check_contains 'min(_tidb_rowid): 1' + check_contains 'max(_tidb_rowid): 1' + fi run_sql 'INSERT INTO rowid.pre_rebase VALUES ("?")' run_sql 'SELECT _tidb_rowid > 70000 FROM rowid.pre_rebase WHERE pk = "?"' check_contains '_tidb_rowid > 70000: 1'