Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: check block-allow list before online DDL components (#1775) #1867

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 47 additions & 27 deletions syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package syncer

import (
"time"

"github.com/go-mysql-org/go-mysql/replication"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -113,26 +115,55 @@ func (s *Syncer) parseDDLSQL(sql string, p *parser.Parser, schema string) (resul
}
}

// resolveDDLSQL do two things
// * it splits multiple operations in one DDL statement into multiple DDL statements
// * try to apply online ddl by given online
// splitAndFilterDDL will split multi-schema change DDL into multiple one schema change DDL due to TiDB's limitation.
// the returned DDLs is split from `stmt`, or DDLs that online DDL component has saved before (which means the input
// `stmt` is a RENAME statement which often generated by online DDL tools).
// return @spilt sqls, @online ddl table names, @error.
func (s *Syncer) resolveDDLSQL(tctx *tcontext.Context, p *parser.Parser, stmt ast.StmtNode, schema string) (sqls []string, tables map[string]*filter.Table, err error) {
func (s *Syncer) splitAndFilterDDL(
ec eventContext,
p *parser.Parser,
stmt ast.StmtNode,
schema string,
) (sqls []string, tables map[string]*filter.Table, err error) {
sqls, err = parserpkg.SplitDDL(stmt, schema)
if err != nil {
return nil, nil, err
}
if s.onlineDDL == nil {
return sqls, nil, nil
}

statements := make([]string, 0, len(sqls))
tables = make(map[string]*filter.Table)
for _, sql := range sqls {
stmt2, err2 := p.ParseOneStmt(sql, "", "")
if err2 != nil {
return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err2.Error()), "ddl %s", sql)
}

tableNames, err2 := parserpkg.FetchDDLTableNames(schema, stmt2)
if err2 != nil {
return nil, nil, err2
}

// get real tableNames before apply block-allow list
if s.onlineDDL != nil {
for _, names := range tableNames {
names.Name = s.onlineDDL.RealName(names.Name)
}
}

shouldSkip, err2 := s.skipQuery(tableNames, stmt2, sql)
if err2 != nil {
return nil, nil, err2
}
if shouldSkip {
skipBinlogDurationHistogram.WithLabelValues("query", s.cfg.Name, s.cfg.SourceID).Observe(time.Since(ec.startTime).Seconds())
ec.tctx.L().Warn("skip event", zap.String("event", "query"), zap.String("statement", sql), zap.String("schema", schema))
continue
}

// filter and store ghost table ddl, transform online ddl
ss, tableName, err := s.handleOnlineDDL(tctx, p, schema, sql)
if err != nil {
return statements, tables, err
ss, tableName, err2 := s.handleOnlineDDL(ec.tctx, p, schema, sql, stmt2)
if err2 != nil {
return nil, nil, err2
}

if tableName != nil {
Expand All @@ -144,7 +175,8 @@ func (s *Syncer) resolveDDLSQL(tctx *tcontext.Context, p *parser.Parser, stmt as
return statements, tables, nil
}

func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*filter.Table, ast.StmtNode, error) {
// routeDDL will rename table names in DDL.
func (s *Syncer) routeDDL(p *parser.Parser, schema, sql string) (string, [][]*filter.Table, ast.StmtNode, error) {
stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return "", nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
Expand All @@ -155,14 +187,6 @@ func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*f
return "", nil, nil, err
}

ignore, err := s.skipQuery(tableNames, stmt, sql)
if err != nil {
return "", nil, nil, err
}
if ignore {
return "", nil, stmt, nil
}

targetTableNames := make([]*filter.Table, 0, len(tableNames))
for i := range tableNames {
schema, table := s.renameShardingSchema(tableNames[i].Schema, tableNames[i].Name)
Expand All @@ -177,18 +201,14 @@ func (s *Syncer) handleDDL(p *parser.Parser, schema, sql string) (string, [][]*f
return ddl, [][]*filter.Table{tableNames, targetTableNames}, stmt, err
}

// handle online ddls
// if sql is online ddls, we would find it's ghost table, and ghost ddls, then replay its table name by real table name.
func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string) ([]string, *filter.Table, error) {
// handleOnlineDDL checks if the input `sql` is came from online DDL tools.
// If so, it will save actual DDL or return the actual DDL depending on online DDL types of `sql`.
// If not, it returns original SQL and no table names.
func (s *Syncer) handleOnlineDDL(tctx *tcontext.Context, p *parser.Parser, schema, sql string, stmt ast.StmtNode) ([]string, *filter.Table, error) {
if s.onlineDDL == nil {
return []string{sql}, nil, nil
}

stmt, err := p.ParseOneStmt(sql, "", "")
if err != nil {
return nil, nil, terror.Annotatef(terror.ErrSyncerUnitParseStmt.New(err.Error()), "ddl %s", sql)
}

tableNames, err := parserpkg.FetchDDLTableNames(schema, stmt)
if err != nil {
return nil, nil, err
Expand Down
104 changes: 64 additions & 40 deletions syncer/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/pingcap/dm/dm/config"
tcontext "github.com/pingcap/dm/pkg/context"
"github.com/pingcap/dm/pkg/log"
parserpkg "github.com/pingcap/dm/pkg/parser"
"github.com/pingcap/dm/pkg/utils"

Expand All @@ -29,6 +30,7 @@ import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb-tools/pkg/filter"
router "github.com/pingcap/tidb-tools/pkg/table-router"
"go.uber.org/zap"
)

func (s *testSyncerSuite) TestAnsiQuotes(c *C) {
Expand Down Expand Up @@ -86,7 +88,11 @@ func (s *testSyncerSuite) TestCommentQuote(c *C) {
c.Assert(err, IsNil)

syncer := &Syncer{}
sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, stmt, "schemadb")
tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestCommentQuote")))
ec := eventContext{
tctx: tctx,
}
sqls, _, err := syncer.splitAndFilterDDL(ec, parser, stmt, "schemadb")
c.Assert(err, IsNil)
c.Assert(len(sqls), Equals, 1)
c.Assert(sqls[0], Equals, expectedSQL)
Expand All @@ -106,11 +112,12 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
"create table `t1` (id int)",
"create table `t1` like `t2`",
"create table `s1`.`t1` like `t2`",
"create table `s1`.`t1` like `s1`.`t2`",
"create table `t1` like `xx`.`t2`",
"truncate table `t1`",
"truncate table `s1`.`t1`",
"rename table `s1`.`t1` to `s2`.`t2`",
"rename table `t1` to `t2`, `s1`.`t1` to `t2`",
"rename table `t1` to `t2`, `s1`.`t1` to `s1`.`t2`",
"drop index i1 on `s1`.`t1`",
"drop index i1 on `t1`",
"create index i1 on `t1`(`c1`)",
Expand All @@ -126,24 +133,25 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
{"DROP DATABASE IF EXISTS `s1`"},
{"DROP DATABASE IF EXISTS `s1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`", "DROP TABLE IF EXISTS `s2`.`t2`"},
{"DROP TABLE IF EXISTS `s1`.`t1`", "DROP TABLE IF EXISTS `s2`.`t2`", "DROP TABLE IF EXISTS `test`.`xx`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"DROP TABLE IF EXISTS `s1`.`t1`"},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` (`id` INT)"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` (`id` INT)"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` LIKE `test`.`t2`"},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` LIKE `test`.`t2`"},
{"CREATE TABLE IF NOT EXISTS `test`.`t1` LIKE `xx`.`t2`"},
{"TRUNCATE TABLE `test`.`t1`"},
{},
{},
{},
{"CREATE TABLE IF NOT EXISTS `s1`.`t1` LIKE `s1`.`t2`"},
{},
{},
{"TRUNCATE TABLE `s1`.`t1`"},
{"RENAME TABLE `s1`.`t1` TO `s2`.`t2`"},
{"RENAME TABLE `test`.`t1` TO `test`.`t2`", "RENAME TABLE `s1`.`t1` TO `test`.`t2`"},
{},
{"RENAME TABLE `s1`.`t1` TO `s1`.`t2`"},
{"DROP INDEX IF EXISTS `i1` ON `s1`.`t1`"},
{"DROP INDEX IF EXISTS `i1` ON `test`.`t1`"},
{"CREATE INDEX `i1` ON `test`.`t1` (`c1`)"},
{},
{},
{"CREATE INDEX `i1` ON `s1`.`t1` (`c1`)"},
{"ALTER TABLE `test`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `test`.`t1` DROP COLUMN `c2`"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `s1`.`t1` RENAME AS `test`.`t2`", "ALTER TABLE `test`.`t2` DROP COLUMN `c2`"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT", "ALTER TABLE `s1`.`t1` RENAME AS `xx`.`t2`", "ALTER TABLE `xx`.`t2` DROP COLUMN `c2`"},
{},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT"},
{"ALTER TABLE `s1`.`t1` ADD COLUMN `c1` INT"},
}

targetSQLs := [][]string{
Expand All @@ -152,24 +160,25 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP DATABASE IF EXISTS `xs1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`", ""},
{"DROP TABLE IF EXISTS `xs1`.`t1`", "", ""},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"DROP TABLE IF EXISTS `xs1`.`t1`"},
{"CREATE TABLE IF NOT EXISTS `xs1`.`t1` (`id` INT)"},
{""},
{""},
{""},
{""},
{""},
{},
{},
{},
{"CREATE TABLE IF NOT EXISTS `xs1`.`t1` LIKE `xs1`.`t2`"},
{},
{},
{"TRUNCATE TABLE `xs1`.`t1`"},
{""},
{"", ""},
{},
{"RENAME TABLE `xs1`.`t1` TO `xs1`.`t2`"},
{"DROP INDEX IF EXISTS `i1` ON `xs1`.`t1`"},
{""},
{""},
{},
{},
{"CREATE INDEX `i1` ON `xs1`.`t1` (`c1`)"},
{"", ""},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT", "", ""},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT", "", ""},
{},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT"},
{"ALTER TABLE `xs1`.`t1` ADD COLUMN `c1` INT"},
}

p := parser.New()
Expand All @@ -191,18 +200,24 @@ func (s *testSyncerSuite) TestResolveDDLSQL(c *C) {
})
c.Assert(err, IsNil)

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveDDLSQL")))
ec := eventContext{
tctx: tctx,
}

for i, sql := range sqls {
result, err := syncer.parseDDLSQL(sql, p, "test")
c.Assert(err, IsNil)
c.Assert(result.ignore, IsFalse)
c.Assert(result.isDDL, IsTrue)

statements, _, err := syncer.resolveDDLSQL(tcontext.Background(), p, result.stmt, "test")
statements, _, err := syncer.splitAndFilterDDL(ec, p, result.stmt, "test")
c.Assert(err, IsNil)
c.Assert(statements, DeepEquals, expectedSQLs[i])
c.Assert(targetSQLs[i], HasLen, len(statements))

for j, statement := range statements {
s, _, _, err := syncer.handleDDL(p, "test", statement)
s, _, _, err := syncer.routeDDL(p, "test", statement)
c.Assert(err, IsNil)
c.Assert(s, Equals, targetSQLs[i][j])
}
Expand Down Expand Up @@ -358,11 +373,16 @@ func (s *testSyncerSuite) TestResolveGeneratedColumnSQL(c *C) {
parser, err := s.mockParser(db, mock)
c.Assert(err, IsNil)

tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveGeneratedColumnSQL")))
ec := eventContext{
tctx: tctx,
}

for _, tc := range testCases {
ast1, err := parser.ParseOneStmt(tc.sql, "", "")
c.Assert(err, IsNil)

sqls, _, err := syncer.resolveDDLSQL(tcontext.Background(), parser, ast1, "test")
sqls, _, err := syncer.splitAndFilterDDL(ec, parser, ast1, "test")
c.Assert(err, IsNil)

c.Assert(len(sqls), Equals, 1)
Expand All @@ -388,9 +408,13 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
"_t1_new",
},
}
tctx := tcontext.Background()
tctx := tcontext.Background().WithLogger(log.With(zap.String("test", "TestResolveOnlineDDL")))
p := parser.New()

ec := eventContext{
tctx: tctx,
}

for _, ca := range cases {
fn := OnlineDDLSchemes[ca.onlineType]
plugin, err := fn(tctx, s.cfg)
Expand All @@ -403,7 +427,7 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
sql := "ALTER TABLE `test`.`t1` ADD COLUMN `n` INT"
stmt, err := p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err := syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err := syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 1)
c.Assert(sqls[0], Equals, sql)
Expand All @@ -413,7 +437,7 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
sql = fmt.Sprintf("CREATE TABLE IF NOT EXISTS `test`.`%s` (`n` INT)", ca.trashName)
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 0)
c.Assert(tables, HasLen, 0)
Expand All @@ -423,14 +447,14 @@ func (s *testSyncerSuite) TestResolveOnlineDDL(c *C) {
newSQL := "ALTER TABLE `test`.`t1` ADD COLUMN `n` INT"
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 0)
c.Assert(tables, HasLen, 0)
sql = fmt.Sprintf("RENAME TABLE `test`.`%s` TO `test`.`t1`", ca.ghostname)
stmt, err = p.ParseOneStmt(sql, "", "")
c.Assert(err, IsNil)
sqls, tables, err = syncer.resolveDDLSQL(tctx, p, stmt, "test")
sqls, tables, err = syncer.splitAndFilterDDL(ec, p, stmt, "test")
c.Assert(err, IsNil)
c.Assert(sqls, HasLen, 1)
c.Assert(sqls[0], Equals, newSQL)
Expand Down Expand Up @@ -483,8 +507,8 @@ func (m mockOnlinePlugin) TableType(table string) TableType {
return ""
}

func (m mockOnlinePlugin) RealName(schema, table string) (string, string) {
return "", ""
func (m mockOnlinePlugin) RealName(table string) string {
return ""
}

func (m mockOnlinePlugin) ResetConn(tctx *tcontext.Context) error {
Expand Down
2 changes: 1 addition & 1 deletion syncer/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func isDropColumnWithIndexError(err error) bool {

// handleSpecialDDLError handles special errors for DDL execution.
func (s *Syncer) handleSpecialDDLError(tctx *tcontext.Context, err error, ddls []string, index int, conn *DBConn) error {
// We use default parser because ddls are came from *Syncer.handleDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes
// We use default parser because ddls are came from *Syncer.routeDDL, which is StringSingleQuotes, KeyWordUppercase and NameBackQuotes
parser2 := parser.New()

// it only ignore `invalid connection` error (timeout or other causes) for `ADD INDEX`.
Expand Down
5 changes: 5 additions & 0 deletions syncer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ import (
"github.com/pingcap/dm/pkg/utils"
)

// skipQuery will return true when
// - given `sql` matches builtin pattern.
// - any schema of table names is system schema.
// - any table name doesn't pass block-allow list.
// - type of SQL doesn't pass binlog filter.
func (s *Syncer) skipQuery(tables []*filter.Table, stmt ast.StmtNode, sql string) (bool, error) {
if utils.IsBuildInSkipDDL(sql) {
return true, nil
Expand Down
Loading