Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

schemaTracker: lazy init table info in schema tracker (#1271) #1274

Merged
merged 1 commit into from
Nov 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/worker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,7 @@ func getMinPosForSubTask(ctx context.Context, subTaskCfg *config.SubTaskConfig)
}
defer checkpoint.Close()

err = checkpoint.Load(tctx, nil)
err = checkpoint.Load(tctx)
if err != nil {
return nil, errors.Annotate(err, "get min position from checkpoint")
}
Expand Down
33 changes: 20 additions & 13 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ type CheckPoint interface {
Clear(tctx *tcontext.Context) error

// Load loads all checkpoints saved by CheckPoint
Load(tctx *tcontext.Context, schemaTracker *schema.Tracker) error
Load(tctx *tcontext.Context) error

// LoadMeta loads checkpoints from meta config item or file
LoadMeta() error
Expand Down Expand Up @@ -233,6 +233,10 @@ type CheckPoint interface {
// corresponding to Meta.Check
CheckGlobalPoint() bool

// GetFlushedTableInfo gets flushed table info
// use for lazy create table in schemaTracker
GetFlushedTableInfo(schema string, table string) *model.TableInfo

// Rollback rolls global checkpoint and all table checkpoints back to flushed checkpoints
Rollback(schemaTracker *schema.Tracker)

Expand Down Expand Up @@ -682,7 +686,7 @@ func (cp *RemoteCheckPoint) createTable(tctx *tcontext.Context) error {
}

// Load implements CheckPoint.Load
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.Tracker) error {
func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context) error {
cp.Lock()
defer cp.Unlock()

Expand Down Expand Up @@ -771,29 +775,20 @@ func (cp *RemoteCheckPoint) Load(tctx *tcontext.Context, schemaTracker *schema.T
continue // skip global checkpoint
}

var ti model.TableInfo
var ti *model.TableInfo
if !bytes.Equal(tiBytes, []byte("null")) {
// only create table if `table_info` is not `null`.
if err = json.Unmarshal(tiBytes, &ti); err != nil {
return terror.ErrSchemaTrackerInvalidJSON.Delegate(err, cpSchema, cpTable)
}

if schemaTracker != nil {
if err = schemaTracker.CreateSchemaIfNotExists(cpSchema); err != nil {
return terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, cpSchema)
}
if err = schemaTracker.CreateTableIfNotExists(cpSchema, cpTable, &ti); err != nil {
return terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, cpSchema, cpTable)
}
}
}

mSchema, ok := cp.points[cpSchema]
if !ok {
mSchema = make(map[string]*binlogPoint)
cp.points[cpSchema] = mSchema
}
mSchema[cpTable] = newBinlogPoint(location, location, &ti, &ti, cp.cfg.EnableGTID)
mSchema[cpTable] = newBinlogPoint(location, location, ti, ti, cp.cfg.EnableGTID)
}

return terror.WithScope(terror.DBErrorAdapt(rows.Err(), terror.ErrDBDriverError), terror.ScopeDownstream)
Expand Down Expand Up @@ -914,3 +909,15 @@ func (cp *RemoteCheckPoint) parseMetaData() (*binlog.Location, *binlog.Location,

return loc, loc2, err
}

// GetFlushedTableInfo implements CheckPoint.GetFlushedTableInfo
func (cp *RemoteCheckPoint) GetFlushedTableInfo(schema string, table string) *model.TableInfo {
cp.Lock()
defer cp.Unlock()
if tables, ok := cp.points[schema]; ok {
if point, ok2 := tables[table]; ok2 {
return point.flushedTI
}
}
return nil
}
14 changes: 7 additions & 7 deletions syncer/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {

// try load, but should load nothing
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err := cp.Load(tctx, s.tracker)
err := cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)
Expand All @@ -165,7 +165,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
}

s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
cp.SaveGlobalPoint(binlog.Location{Position: pos1})

Expand Down Expand Up @@ -225,7 +225,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
cp.SaveGlobalPoint(binlog.Location{Position: pos3})
columns := []string{"cp_schema", "cp_table", "binlog_name", "binlog_pos", "binlog_gtid", "exit_safe_binlog_name", "exit_safe_binlog_pos", "exit_safe_binlog_gtid", "table_info", "is_global"}
s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, "", "", 0, "", "null", true))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos2)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos2)
Expand All @@ -251,7 +251,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)

s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, binlog.MinPosition)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, binlog.MinPosition)
Expand All @@ -277,7 +277,7 @@ func (s *testCheckpointSuite) testGlobalCheckPoint(c *C, cp CheckPoint) {
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)
Expand Down Expand Up @@ -316,7 +316,7 @@ SHOW MASTER STATUS: /* AFTER CONNECTION POOL ESTABLISHED */
err = cp.FlushPointsExcept(tctx, nil, nil, nil)
c.Assert(err, IsNil)
s.mock.ExpectQuery(loadCheckPointSQL).WillReturnRows(sqlmock.NewRows(nil))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint().Position, Equals, pos1)
c.Assert(cp.FlushedGlobalPoint().Position, Equals, pos1)
Expand Down Expand Up @@ -468,7 +468,7 @@ func (s *testCheckpointSuite) testTableCheckPoint(c *C, cp CheckPoint) {
s.mock.ExpectQuery(loadCheckPointSQL).WithArgs(cpid).WillReturnRows(
sqlmock.NewRows(columns).AddRow("", "", pos2.Name, pos2.Pos, gs.String(), pos2.Name, pos2.Pos, gs.String(), "null", true).
AddRow(schema, table, pos2.Name, pos2.Pos, gs.String(), "", 0, "", tiBytes, false))
err = cp.Load(tctx, s.tracker)
err = cp.Load(tctx)
c.Assert(err, IsNil)
c.Assert(cp.GlobalPoint(), DeepEquals, binlog.InitLocation(pos2, gs))
rcp = cp.(*RemoteCheckPoint)
Expand Down
11 changes: 10 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (s *Syncer) Init(ctx context.Context) (err error) {
}
rollbackHolder.Add(fr.FuncRollback{Name: "close-checkpoint", Fn: s.checkpoint.Close})

err = s.checkpoint.Load(tctx, s.schemaTracker)
err = s.checkpoint.Load(tctx)
if err != nil {
return err
}
Expand Down Expand Up @@ -588,6 +588,15 @@ func (s *Syncer) getTable(origSchema, origTable, renamedSchema, renamedTable str
return nil, terror.ErrSchemaTrackerCannotCreateSchema.Delegate(err, origSchema)
}

// if table already exists in checkpoint, create it in schema tracker
if ti = s.checkpoint.GetFlushedTableInfo(origSchema, origTable); ti != nil {
if err = s.schemaTracker.CreateTableIfNotExists(origSchema, origTable, ti); err != nil {
return nil, terror.ErrSchemaTrackerCannotCreateTable.Delegate(err, origSchema, origTable)
}
s.tctx.L().Debug("lazy init table info in schema tracker", zap.String("schema", origSchema), zap.String("table", origTable))
return ti, nil
}

// in optimistic shard mode, we should try to get the init schema (the one before modified by other tables) first.
if s.cfg.ShardMode == config.ShardOptimistic {
ti, err = s.trackInitTableInfoOptimistic(origSchema, origTable, renamedSchema, renamedTable)
Expand Down
47 changes: 47 additions & 0 deletions tests/start_task/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,54 @@ function prepare_data() {
done
}

function lazy_init_tracker() {
run_sql 'DROP DATABASE if exists start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE start_task;' $MYSQL_PORT1 $MYSQL_PASSWORD1
for j in $(seq 100); do
run_sql "CREATE TABLE start_task.t$j(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO start_task.t$j VALUES (1,10001),(1,10011);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done

run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

dmctl_start_task_standalone "$cur/conf/dm-task.yaml" "--remove-meta"

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# only table 1-50 flush checkpoint
for j in $(seq 50); do
run_sql "INSERT INTO start_task.t$j VALUES (2,20002),(2,20022);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task test" \
"\"result\": true" 2
dmctl_start_task_standalone "$cur/conf/dm-task.yaml"

for j in $(seq 100); do
run_sql "INSERT INTO start_task.t$j VALUES (3,30003),(3,30033);" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "INSERT INTO start_task.t$j VALUES (4,40004),(4,40044);" $MYSQL_PORT1 $MYSQL_PASSWORD1
done
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 20

check_log_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t50' 1
check_log_not_contains $WORK_DIR/worker1/log/dm-worker.log 'lazy init table info.*t51'

cleanup_data start_task
cleanup_process $*
}

function run() {
lazy_init_tracker
failpoints=(
# 1152 is ErrAbortingConnection
"github.com/pingcap/dm/pkg/utils/FetchTargetDoTablesFailed=return(1152)"
Expand Down