Skip to content

Commit

Permalink
dm: fix checkpoint not updated when last event is ddl and skipped (#8193
Browse files Browse the repository at this point in the history
) (#8202)

close #8175
  • Loading branch information
ti-chi-bot authored Feb 20, 2023
1 parent 8923c92 commit d791b17
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 1 deletion.
13 changes: 12 additions & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -1060,6 +1060,14 @@ func (s *Syncer) handleJob(job *job) (added2Queue bool, err error) {
s.isTransactionEnd = true
return
case skip:
if job.eventHeader.EventType == replication.QUERY_EVENT {
// skipped ddl includes:
// - ddls that dm don't handle, such as analyze table(can be parsed), create function(cannot be parsed)
// - ddls related to db/table which is filtered
// for those ddls we record its location, so checkpoint can match master position if skipped ddl
// is the last binlog in source db
s.saveGlobalPoint(job.location)
}
s.updateReplicationJobTS(job, skipJobIdx)
return
}
Expand Down Expand Up @@ -2829,7 +2837,10 @@ func (s *Syncer) handleQueryEvent(ev *replication.QueryEvent, ec eventContext, o
}

if _, ok := stmt.(ast.DDLNode); !ok {
return nil
qec.tctx.L().Info("ddl that dm doesn't handle, skip it", zap.String("event", "query"),
zap.Stringer("queryEventContext", qec))
*ec.lastLocation = *ec.currentLocation // before record skip location, update lastLocation
return s.recordSkipSQLsLocation(&ec)
}

if qec.shardingReSync != nil {
Expand Down
17 changes: 17 additions & 0 deletions dm/tests/all_mode/conf/dm-task2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
---
name: test
task-mode: all

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
black-white-list: "instance"

black-white-list:
instance:
do-dbs: ["all_mode"]
66 changes: 66 additions & 0 deletions dm/tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -589,11 +589,77 @@ function run() {
run_sql_both_source "SET @@global.time_zone = 'SYSTEM';"
}

function test_last_event_ddl() {
echo "[$(date)] <<<<<< start test_last_event_ddl >>>>>>"
cleanup_process
cleanup_data all_mode

run_sql 'DROP DATABASE if exists all_mode;' $TIDB_PORT $TIDB_PASSWORD
run_sql 'DROP DATABASE if exists all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'DROP DATABASE if exists xxx;' $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql 'CREATE DATABASE all_mode;' $MYSQL_PORT1 $MYSQL_PASSWORD1

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/dm-master.toml $WORK_DIR/
cp $cur/conf/dm-worker1.toml $WORK_DIR/
cp $cur/conf/dm-task2.yaml $WORK_DIR/
# start DM worker and master
run_dm_master $WORK_DIR/master $MASTER_PORT $WORK_DIR/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $WORK_DIR/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"operate-source create $WORK_DIR/source1.yaml" \
"\"result\": true" 2 \
"\"source\": \"$SOURCE_ID1\"" 1
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"start-task $WORK_DIR/dm-task2.yaml --remove-meta=true" \
"\"result\": true" 2
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
"\"result\": true" 2 \
"\"unit\": \"Sync\"" 1 \
"\"stage\": \"Running\"" 2

# check checkpoint matches master when the last event is a ddl
# 1. ddl that dm will sync
run_sql_source1 "create table all_mode.t2(c int primary key)"
run_sql_tidb_with_retry "show create table all_mode.t2" "CREATE TABLE"
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 2. ddl cannot be parsed and should be skipped
run_sql_source1 "create FUNCTION all_mode.hello (s CHAR(20)) RETURNS CHAR(50) DETERMINISTIC RETURN 'a';"
check_log_contain_with_retry "RETURNS char(50)" $WORK_DIR/worker1/log/dm-worker.log
sleep 30 # we rely on heartbeat event to flush checkpoint here, below too
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 3. ddl can be parsed and dm don't handle
run_sql_source1 "analyze table all_mode.t1"
check_log_contain_with_retry "analyze table" $WORK_DIR/worker1/log/dm-worker.log
sleep 30
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1
# 4. ddl that is filtered
run_sql_source1 "create database xxx"
check_log_contain_with_retry "CREATE DATABASE IF NOT EXISTS" $WORK_DIR/worker1/log/dm-worker.log
sleep 30
run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
'"synced": true' 1

echo "<<<<<< test_last_event_ddl success! >>>>>>"
}

cleanup_data_upstream all_mode
cleanup_data all_mode
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
test_last_event_ddl
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"

0 comments on commit d791b17

Please sign in to comment.