Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
syncer: fetch downstream table first for IF NOT EXISTS
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed May 17, 2021
1 parent 48dacb1 commit cdf95c6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 7 deletions.
26 changes: 19 additions & 7 deletions syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -632,10 +632,9 @@ func (s *Syncer) getTable(tctx *tcontext.Context, origSchema, origTable, renamed
return ti, nil
}

// trackTableInfoFromDownstream tries to track the table info from the downstream.
// trackTableInfoFromDownstream tries to track the table info from the downstream. It will not overwrite existing table.
func (s *Syncer) trackTableInfoFromDownstream(tctx *tcontext.Context, origSchema, origTable, renamedSchema, renamedTable string) error {
// TODO: Switch to use the HTTP interface to retrieve the TableInfo directly
// (and get rid of ddlDBConn).
// TODO: Switch to use the HTTP interface to retrieve the TableInfo directly if HTTP port is available
// use parser for downstream.
parser2, err := utils.GetParserForConn(tctx.Ctx, s.ddlDBConn.baseConn.DBConn)
if err != nil {
Expand Down Expand Up @@ -2176,13 +2175,15 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
srcTables, targetTables := tableNames[0], tableNames[1]
srcTable := srcTables[0]

// Make sure the tables are all loaded into the schema tracker.
// Make sure the needed tables are all loaded into the schema tracker.
var (
shouldExecDDLOnSchemaTracker bool
shouldSchemaExist bool
shouldTableExistNum int // tableNames[:shouldTableExistNum] should exist
shouldRefTableExistNum int // tableNames[1:shouldTableExistNum] should exist, since first one is "caller table"
tryFetchDownstreamTable bool // to make sure if not exists will execute correctly
)

switch node := stmt.(type) {
case *ast.CreateDatabaseStmt:
shouldExecDDLOnSchemaTracker = true
Expand All @@ -2199,10 +2200,16 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
case *ast.RecoverTableStmt:
shouldExecDDLOnSchemaTracker = true
shouldSchemaExist = true
case *ast.CreateTableStmt, *ast.CreateViewStmt:
case *ast.CreateTableStmt:
shouldExecDDLOnSchemaTracker = true
shouldSchemaExist = true
// for CREATE TABLE LIKE/AS, there should be reference tables which should exist
// for CREATE TABLE LIKE/AS, the reference tables should exist
shouldRefTableExistNum = len(srcTables)
tryFetchDownstreamTable = node.IfNotExists
case *ast.CreateViewStmt:
shouldExecDDLOnSchemaTracker = true
shouldSchemaExist = true
// for CREATE VIEW LIKE/AS, the reference tables should exist
shouldRefTableExistNum = len(srcTables)
case *ast.DropTableStmt:
shouldExecDDLOnSchemaTracker = true
Expand Down Expand Up @@ -2244,7 +2251,6 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
}
}
// skip getTable before in above loop
// nolint:ifshort
start := 1
if shouldTableExistNum > start {
start = shouldTableExistNum
Expand All @@ -2258,6 +2264,12 @@ func (s *Syncer) trackDDL(usedSchema string, sql string, tableNames [][]*filter.
}
}

if tryFetchDownstreamTable {
if err := s.trackTableInfoFromDownstream(ec.tctx, srcTable.Schema, srcTable.Name, targetTables[0].Schema, targetTables[0].Name); err != nil {
return err
}
}

if shouldExecDDLOnSchemaTracker {
if err := s.schemaTracker.Exec(ec.tctx.Ctx, usedSchema, sql); err != nil {
ec.tctx.L().Error("cannot track DDL", zap.String("schema", usedSchema), zap.String("statement", sql), log.WrapStringerField("location", ec.currentLocation), log.ShortError(err))
Expand Down
36 changes: 36 additions & 0 deletions syncer/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,3 +1691,39 @@ func (s *Syncer) setupMockCheckpoint(c *C, checkPointDBConn *sql.Conn, checkPoin
s.checkpoint.(*RemoteCheckPoint).dbConn = &DBConn{cfg: s.cfg, baseConn: conn.NewBaseConn(checkPointDBConn, &retry.FiniteRetryStrategy{})}
c.Assert(s.checkpoint.(*RemoteCheckPoint).prepare(tcontext.Background()), IsNil)
}

func (s *testSyncerSuite) TestTrackDownstreamTableWontOverwrite(c *C) {
syncer := Syncer{}
ctx := context.Background()
tctx := tcontext.Background()

db, mock, err := sqlmock.New()
c.Assert(err, IsNil)
dbConn, err := db.Conn(ctx)
c.Assert(err, IsNil)
baseConn := conn.NewBaseConn(dbConn, &retry.FiniteRetryStrategy{})
syncer.ddlDBConn = &DBConn{cfg: s.cfg, baseConn: baseConn}
syncer.schemaTracker, err = schema.NewTracker(ctx, s.cfg.Name, defaultTestSessionCfg, baseConn)
c.Assert(err, IsNil)

upTable, downTable := "up", "down"
schema := "test"
createTableSQL := "CREATE TABLE up (c1 int, c2 int);"

mock.ExpectQuery("SHOW VARIABLES LIKE 'sql_mode'").WillReturnRows(
sqlmock.NewRows([]string{"Variable_name", "Value"}).AddRow("sql_mode", ""))
mock.ExpectQuery("SHOW CREATE TABLE.*").WillReturnRows(
sqlmock.NewRows([]string{"Table", "Create Table"}).
AddRow(downTable, " CREATE TABLE `"+downTable+"` (\n `c` int(11) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))

c.Assert(syncer.schemaTracker.CreateSchemaIfNotExists(schema), IsNil)
c.Assert(syncer.schemaTracker.Exec(ctx, "test", createTableSQL), IsNil)
ti, err := syncer.getTable(tctx, schema, upTable, schema, downTable)
c.Assert(err, IsNil)
c.Assert(ti.Columns, HasLen, 2)
c.Assert(syncer.trackTableInfoFromDownstream(tctx, schema, upTable, schema, downTable), IsNil)
newTi, err := syncer.getTable(tctx, schema, upTable, schema, downTable)
c.Assert(err, IsNil)
c.Assert(newTi, DeepEquals, ti)
c.Assert(mock.ExpectationsWereMet(), IsNil)
}
1 change: 1 addition & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function test_session_config() {

sed -i 's/tidb_retry_limit: "fjs"/tidb_retry_limit: "10"/g' $WORK_DIR/dm-task.yaml
dmctl_start_task "$WORK_DIR/dm-task.yaml" "--remove-meta"
run_sql_source1 "create table if not exists all_mode.t1 (c int); insert into all_mode.t1 (id, name) values (9, 'haha');"

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down

0 comments on commit cdf95c6

Please sign in to comment.