Skip to content

Commit

Permalink
dm: fix checkpoint not updated when last event is ddl and skipped (#8193
Browse files Browse the repository at this point in the history
) (#8203)

close #8175
  • Loading branch information
ti-chi-bot committed Feb 21, 2023
1 parent 24f0dc1 commit 3d02946
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 1 deletion.
4 changes: 3 additions & 1 deletion dm/syncer/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,9 @@ func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventConte
}

if _, ok := stmt.(ast.DDLNode); !ok {
return nil
ddl.logger.Info("ddl that dm doesn't handle, skip it", zap.String("event", "query"),
zap.Stringer("queryEventContext", qec))
return ddl.recordSkipSQLsLocation(qec.eventContext)
}

if qec.shardingReSync != nil {
Expand Down
8 changes: 8 additions & 0 deletions dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,14 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
s.isTransactionEnd = true
return
case skip:
if job.eventHeader.EventType == replication.QUERY_EVENT {
// skipped ddl includes:
// - ddls that dm don't handle, such as analyze table(can be parsed), create function(cannot be parsed)
// - ddls related to db/table which is filtered
// for those ddls we record its location, so checkpoint can match master position if skipped ddl
// is the last binlog in source db
s.saveGlobalPoint(job.location)
}
s.updateReplicationJobTS(job, skipJobIdx)
return
}
Expand Down
30 changes: 30 additions & 0 deletions dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,7 @@ function run() {
function prepare_test_empty_gtid() {
run_sql 'DROP DATABASE if exists all_mode;' $TIDB_PORT $TIDB_PASSWORD
run_sql 'DROP DATABASE if exists all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'DROP DATABASE if exists xxx;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql "CREATE TABLE all_mode.t1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1

Expand Down Expand Up @@ -708,6 +709,35 @@ function test_source_and_target_with_empty_gtid() {
echo "check data"
check_sync_diff $WORK_DIR $cur/conf/diff_config-1.toml

# check checkpoint matches master when the last event is a ddl
# 1. ddl that dm will sync
run_sql_source1 "create table all_mode.t2(c int primary key)"
run_sql_tidb_with_retry "show create table all_mode.t2" "CREATE TABLE"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 2. ddl cannot be parsed and should be skipped
run_sql_source1 "create FUNCTION all_mode.hello (s CHAR(20)) RETURNS CHAR(50) DETERMINISTIC RETURN 'a';"
check_log_contain_with_retry "RETURNS char(50)" $WORK_DIR/worker1/log/dm-worker.log
sleep 30 # we rely on heartbeat event to flush checkpoint here, below too
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 3. ddl can be parsed and dm don't handle
run_sql_source1 "analyze table all_mode.t1"
check_log_contain_with_retry "analyze table" $WORK_DIR/worker1/log/dm-worker.log
sleep 30
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 4. ddl that is filtered
run_sql_source1 "create database xxx"
check_log_contain_with_retry "CREATE DATABASE IF NOT EXISTS" $WORK_DIR/worker1/log/dm-worker.log
sleep 30
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1

echo "<<<<<< test_source_and_target_with_empty_gtid success! >>>>>>"
}

Expand Down

0 comments on commit 3d02946

Please sign in to comment.