From 3d02946835106933260f7f73e5e356634406213c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 21 Feb 2023 11:23:05 +0800 Subject: [PATCH] dm: fix checkpoint not updated when last event is ddl and skipped (#8193) (#8203) close pingcap/tiflow#8175 --- dm/syncer/ddl.go | 4 +++- dm/syncer/syncer.go | 8 ++++++++ dm/tests/all_mode/run.sh | 30 ++++++++++++++++++++++++++++++ 3 files changed, 41 insertions(+), 1 deletion(-) diff --git a/dm/syncer/ddl.go b/dm/syncer/ddl.go index ee524895d40..305d239df4d 100644 --- a/dm/syncer/ddl.go +++ b/dm/syncer/ddl.go @@ -272,7 +272,9 @@ func (ddl *DDLWorker) HandleQueryEvent(ev *replication.QueryEvent, ec eventConte } if _, ok := stmt.(ast.DDLNode); !ok { - return nil + ddl.logger.Info("ddl that dm doesn't handle, skip it", zap.String("event", "query"), + zap.Stringer("queryEventContext", qec)) + return ddl.recordSkipSQLsLocation(qec.eventContext) } if qec.shardingReSync != nil { diff --git a/dm/syncer/syncer.go b/dm/syncer/syncer.go index aad973ba17c..23ecb70c16c 100644 --- a/dm/syncer/syncer.go +++ b/dm/syncer/syncer.go @@ -1078,6 +1078,14 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) { s.isTransactionEnd = true return case skip: + if job.eventHeader.EventType == replication.QUERY_EVENT { + // skipped ddl includes: + // - ddls that dm don't handle, such as analyze table(can be parsed), create function(cannot be parsed) + // - ddls related to db/table which is filtered + // for those ddls we record its location, so checkpoint can match master position if skipped ddl + // is the last binlog in source db + s.saveGlobalPoint(job.location) + } s.updateReplicationJobTS(job, skipJobIdx) return } diff --git a/dm/tests/all_mode/run.sh b/dm/tests/all_mode/run.sh index f6d109ca97e..9dcd52dbd7f 100755 --- a/dm/tests/all_mode/run.sh +++ b/dm/tests/all_mode/run.sh @@ -649,6 +649,7 @@ function run() { function prepare_test_empty_gtid() { run_sql 'DROP DATABASE if exists all_mode;' $TIDB_PORT $TIDB_PASSWORD run_sql 'DROP DATABASE if exists all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql 'DROP DATABASE if exists xxx;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql 'CREATE DATABASE all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "CREATE TABLE all_mode.t1(i TINYINT, j INT UNIQUE KEY);" $MYSQL_PORT1 $MYSQL_PASSWORD1 @@ -708,6 +709,35 @@ function test_source_and_target_with_empty_gtid() { echo "check data" check_sync_diff $WORK_DIR $cur/conf/diff_config-1.toml + # check checkpoint matches master when the last event is a ddl + # 1. ddl that dm will sync + run_sql_source1 "create table all_mode.t2(c int primary key)" + run_sql_tidb_with_retry "show create table all_mode.t2" "CREATE TABLE" + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + # 2. ddl cannot be parsed and should be skipped + run_sql_source1 "create FUNCTION all_mode.hello (s CHAR(20)) RETURNS CHAR(50) DETERMINISTIC RETURN 'a';" + check_log_contain_with_retry "RETURNS char(50)" $WORK_DIR/worker1/log/dm-worker.log + sleep 30 # we rely on heartbeat event to flush checkpoint here, below too + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + # 3. ddl can be parsed and dm don't handle + run_sql_source1 "analyze table all_mode.t1" + check_log_contain_with_retry "analyze table" $WORK_DIR/worker1/log/dm-worker.log + sleep 30 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + # 4. ddl that is filtered + run_sql_source1 "create database xxx" + check_log_contain_with_retry "CREATE DATABASE IF NOT EXISTS" $WORK_DIR/worker1/log/dm-worker.log + sleep 30 + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status test" \ + '"synced": true' 1 + echo "<<<<<< test_source_and_target_with_empty_gtid success! >>>>>>" }