Skip to content

Commit

Permalink
Merge branch 'release-5.4' into release-5.4-fcdbf826ea8a
Browse files Browse the repository at this point in the history
  • Loading branch information
LittleFall authored Apr 21, 2022
2 parents b7a12e3 + 52c636c commit 49d0952
Show file tree
Hide file tree
Showing 33 changed files with 786 additions and 410 deletions.
50 changes: 41 additions & 9 deletions br/pkg/lightning/mydump/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"context"
"path/filepath"
"sort"
"strings"

"github.com/pingcap/errors"
filter "github.com/pingcap/tidb-tools/pkg/table-filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"github.com/pingcap/tidb/br/pkg/lightning/common"
"github.com/pingcap/tidb/br/pkg/lightning/config"
"github.com/pingcap/tidb/br/pkg/lightning/log"
"github.com/pingcap/tidb/br/pkg/storage"
Expand All @@ -30,12 +32,30 @@ import (

type MDDatabaseMeta struct {
Name string
SchemaFile string
SchemaFile FileInfo
Tables []*MDTableMeta
Views []*MDTableMeta
charSet string
}

func (m *MDDatabaseMeta) GetSchema(ctx context.Context, store storage.ExternalStorage) (string, error) {
schema, err := ExportStatement(ctx, store, m.SchemaFile, m.charSet)
if err != nil {
log.L().Warn("failed to extract table schema",
zap.String("Path", m.SchemaFile.FileMeta.Path),
log.ShortError(err),
)
schema = nil
}
schemaStr := strings.TrimSpace(string(schema))
// set default if schema sql is empty
if len(schemaStr) == 0 {
schemaStr = "CREATE DATABASE IF NOT EXISTS " + common.EscapeIdentifier(m.Name)
}

return schemaStr, nil
}

type MDTableMeta struct {
DB string
Name string
Expand Down Expand Up @@ -219,7 +239,7 @@ func (s *mdLoaderSetup) setup(ctx context.Context, store storage.ExternalStorage
// setup database schema
if len(s.dbSchemas) != 0 {
for _, fileInfo := range s.dbSchemas {
if _, dbExists := s.insertDB(fileInfo.TableName.Schema, fileInfo.FileMeta.Path); dbExists && s.loader.router == nil {
if _, dbExists := s.insertDB(fileInfo); dbExists && s.loader.router == nil {
return errors.Errorf("invalid database schema file, duplicated item - %s", fileInfo.FileMeta.Path)
}
}
Expand Down Expand Up @@ -406,23 +426,29 @@ func (s *mdLoaderSetup) route() error {
return nil
}

func (s *mdLoaderSetup) insertDB(dbName string, path string) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[dbName]
func (s *mdLoaderSetup) insertDB(f FileInfo) (*MDDatabaseMeta, bool) {
dbIndex, ok := s.dbIndexMap[f.TableName.Schema]
if ok {
return s.loader.dbs[dbIndex], true
}
s.dbIndexMap[dbName] = len(s.loader.dbs)
s.dbIndexMap[f.TableName.Schema] = len(s.loader.dbs)
ptr := &MDDatabaseMeta{
Name: dbName,
SchemaFile: path,
Name: f.TableName.Schema,
SchemaFile: f,
charSet: s.loader.charSet,
}
s.loader.dbs = append(s.loader.dbs, ptr)
return ptr, false
}

func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
tableIndex, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
return dbMeta.Tables[tableIndex], dbExists, true
Expand All @@ -442,7 +468,13 @@ func (s *mdLoaderSetup) insertTable(fileInfo FileInfo) (*MDTableMeta, bool, bool
}

func (s *mdLoaderSetup) insertView(fileInfo FileInfo) (bool, bool) {
dbMeta, dbExists := s.insertDB(fileInfo.TableName.Schema, "")
dbFileInfo := FileInfo{
TableName: filter.Table{
Schema: fileInfo.TableName.Schema,
},
FileMeta: SourceFileMeta{Type: SourceTypeSchemaSchema},
}
dbMeta, dbExists := s.insertDB(dbFileInfo)
_, ok := s.tableIndexMap[fileInfo.TableName]
if ok {
meta := &MDTableMeta{
Expand Down
29 changes: 19 additions & 10 deletions br/pkg/lightning/mydump/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ func (s *testMydumpLoaderSuite) TestTableInfoNotFound(c *C) {
loader, err := md.NewMyDumpLoader(ctx, s.cfg)
c.Assert(err, IsNil)
for _, dbMeta := range loader.GetDatabases() {
dbSQL, err := dbMeta.GetSchema(ctx, store)
c.Assert(err, IsNil)
c.Assert(dbSQL, Equals, "CREATE DATABASE IF NOT EXISTS `db`")
for _, tblMeta := range dbMeta.Tables {
sql, err := tblMeta.GetSchema(ctx, store)
c.Assert(sql, Equals, "")
Expand Down Expand Up @@ -272,8 +275,14 @@ func (s *testMydumpLoaderSuite) TestDataWithoutSchema(c *C) {
mdl, err := md.NewMyDumpLoader(context.Background(), s.cfg)
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "",
Name: "db",
SchemaFile: md.FileInfo{
TableName: filter.Table{
Schema: "db",
Name: "",
},
FileMeta: md.SourceFileMeta{Type: md.SourceTypeSchemaSchema},
},
Tables: []*md.MDTableMeta{{
DB: "db",
Name: "tbl",
Expand Down Expand Up @@ -302,7 +311,7 @@ func (s *testMydumpLoaderSuite) TestTablesWithDots(c *C) {
c.Assert(err, IsNil)
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{{
Name: "db",
SchemaFile: "db-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "db", Name: ""}, FileMeta: md.SourceFileMeta{Path: "db-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "db",
Expand Down Expand Up @@ -396,7 +405,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "a1",
SchemaFile: "a1-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "a1", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a1-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "a1",
Expand Down Expand Up @@ -427,11 +436,11 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "d0",
SchemaFile: "d0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d0", Name: ""}, FileMeta: md.SourceFileMeta{Path: "d0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
},
{
Name: "b",
SchemaFile: "a0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "b", Name: ""}, FileMeta: md.SourceFileMeta{Path: "a0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "b",
Expand All @@ -449,7 +458,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "c",
SchemaFile: "c0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "c", Name: ""}, FileMeta: md.SourceFileMeta{Path: "c0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "c",
Expand All @@ -463,7 +472,7 @@ func (s *testMydumpLoaderSuite) TestRouter(c *C) {
},
{
Name: "v",
SchemaFile: "e0-schema-create.sql",
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "v", Name: ""}, FileMeta: md.SourceFileMeta{Path: "e0-schema-create.sql", Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "v",
Expand Down Expand Up @@ -552,7 +561,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
c.Assert(mdl.GetDatabases(), DeepEquals, []*md.MDDatabaseMeta{
{
Name: "d1",
SchemaFile: filepath.FromSlash("d1/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d1", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d1/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d1",
Expand Down Expand Up @@ -605,7 +614,7 @@ func (s *testMydumpLoaderSuite) TestFileRouting(c *C) {
},
{
Name: "d2",
SchemaFile: filepath.FromSlash("d2/schema.sql"),
SchemaFile: md.FileInfo{TableName: filter.Table{Schema: "d2", Name: ""}, FileMeta: md.SourceFileMeta{Path: filepath.FromSlash("d2/schema.sql"), Type: md.SourceTypeSchemaSchema}},
Tables: []*md.MDTableMeta{
{
DB: "d2",
Expand Down
24 changes: 19 additions & 5 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,7 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
concurrency := utils.MinInt(tableCount, rc.cfg.App.RegionConcurrency)
ch := make(chan string, concurrency)
eg, gCtx := errgroup.WithContext(ctx)

for i := 0; i < concurrency; i++ {
eg.Go(func() error {
for tblName := range ch {
Expand Down Expand Up @@ -1125,17 +1126,23 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
})
}
loop:
for _, db := range rc.dbMetas {
for _, tbl := range db.Tables {
ch <- common.UniqueTable(tbl.DB, tbl.Name)
select {
case ch <- common.UniqueTable(tbl.DB, tbl.Name):
case <-gCtx.Done():
break loop
}

}
}
close(ch)
if err := eg.Wait(); err != nil {
if common.IsContextCanceledError(err) {
return nil
}
return errors.Trace(err)
return errors.Annotate(err, "check table contains data failed")
}

if len(tableNames) > 0 {
Expand All @@ -1147,13 +1154,20 @@ func (rc *Controller) checkTableEmpty(ctx context.Context) error {
return nil
}

func tableContainsData(ctx context.Context, db utils.QueryExecutor, tableName string) (bool, error) {
func tableContainsData(ctx context.Context, db utils.DBExecutor, tableName string) (bool, error) {
failpoint.Inject("CheckTableEmptyFailed", func() {
failpoint.Return(false, errors.New("mock error"))
})
query := "select 1 from " + tableName + " limit 1"
exec := common.SQLWithRetry{
DB: db,
Logger: log.L(),
}
var dump int
err := db.QueryRowContext(ctx, query).Scan(&dump)
err := exec.QueryRow(ctx, "check table empty", query, &dump)

switch {
case err == sql.ErrNoRows:
case errors.ErrorEqual(err, sql.ErrNoRows):
return false, nil
case err != nil:
return false, errors.Trace(err)
Expand Down
16 changes: 16 additions & 0 deletions br/pkg/lightning/restore/check_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/worker"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -476,6 +477,9 @@ func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
c.Assert(err, IsNil)
rc.tidbGlue = glue.NewExternalTiDBGlue(db, mysql.ModeNone)
mock.MatchExpectationsInOrder(false)
// test auto retry retryable error
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnError(mysql.NewErr(errno.ErrPDServerTimeout))
mock.ExpectQuery("select 1 from `test1`.`tbl1` limit 1").
WillReturnRows(sqlmock.NewRows([]string{""}).RowError(0, sql.ErrNoRows))
mock.ExpectQuery("select 1 from `test1`.`tbl2` limit 1").
Expand Down Expand Up @@ -541,6 +545,18 @@ func (s *checkInfoSuite) TestCheckTableEmpty(c *C) {
err = rc.checkTableEmpty(ctx)
c.Assert(err, IsNil)
c.Assert(mock.ExpectationsWereMet(), IsNil)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed", `return`)
c.Assert(err, IsNil)
defer func() {
_ = failpoint.Disable("github.com/pingcap/tidb/br/pkg/lightning/restore/CheckTableEmptyFailed")
}()

// restrict the concurrency to ensure there are more tables than workers
rc.cfg.App.RegionConcurrency = 1
// test check tables not stuck but return the right error
err = rc.checkTableEmpty(ctx)
c.Assert(err, ErrorMatches, ".*check table contains data failed: mock error.*")
}

func (s *checkInfoSuite) TestLocalResource(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (m *dbTableMetaMgr) AllocTableRowIDs(ctx context.Context, rawRowIDMax int64
}

if status == metaStatusChecksuming {
return errors.New("target table is calculating checksum, please wait unit the checksum is finished and try again.")
return errors.New("Target table is calculating checksum. Please wait until the checksum is finished and try again.")
}

if metaTaskID == m.taskID {
Expand Down
Loading

0 comments on commit 49d0952

Please sign in to comment.