From 5ee6815ea42d84cba28af35f15a3b51e65076730 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 6 May 2022 13:28:25 +0800 Subject: [PATCH 1/3] fix empty gtid config --- dm/pkg/gtid/gtid.go | 27 ++++++++ dm/tests/incremental_mode/conf/dm-task.yaml | 5 +- dm/tests/incremental_mode/conf/source1.yaml | 2 +- .../incremental_mode/data/db1.prepare.sql | 1 + .../incremental_mode/data/db2.prepare.sql | 1 + dm/tests/incremental_mode/run.sh | 66 ++++++------------- 6 files changed, 51 insertions(+), 51 deletions(-) diff --git a/dm/pkg/gtid/gtid.go b/dm/pkg/gtid/gtid.go index 19a1a22300b..4290608a047 100644 --- a/dm/pkg/gtid/gtid.go +++ b/dm/pkg/gtid/gtid.go @@ -14,9 +14,13 @@ package gtid import ( + "strings" + "github.com/go-mysql-org/go-mysql/mysql" "github.com/pingcap/errors" + "go.uber.org/zap" + "github.com/pingcap/tiflow/dm/pkg/log" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -80,14 +84,37 @@ func ParserGTID(flavor, gtidStr string) (Set, error) { case mysql.MariaDBFlavor: m = &MariadbGTIDSet{} case mysql.MySQLFlavor: + // check for xxx:0 + if IsNilMySQLGTIDSet(gtidStr) { + log.L().Warn("get empty gtid set end with `0`", zap.String("gtid", gtidStr)) + return MinGTIDSet(mysql.MySQLFlavor), nil + } m = &MySQLGTIDSet{} default: return nil, terror.ErrNotSupportedFlavor.Generate(flavor) } + err = m.Set(gtid) return m, err } +// check whether a gtid set is nil(start sync from start) +// mysql: uuid:0 +// mariadb: x-y-0(no need to handle) +func IsNilMySQLGTIDSet(gStr string) bool { + sp := strings.Split(gStr, ",") + if len(sp) != 1 { + return false + } + + sep := strings.Split(sp[0], ":") + if len(sep) != 2 { + return false + } + interval := strings.TrimSpace(sep[1]) + return interval == "0" +} + // MinGTIDSet returns the min GTID set. func MinGTIDSet(flavor string) Set { // use mysql as default diff --git a/dm/tests/incremental_mode/conf/dm-task.yaml b/dm/tests/incremental_mode/conf/dm-task.yaml index a9464ea5241..cc77f61faed 100644 --- a/dm/tests/incremental_mode/conf/dm-task.yaml +++ b/dm/tests/incremental_mode/conf/dm-task.yaml @@ -1,6 +1,6 @@ --- name: test -task-mode: task-mode-placeholder +task-mode: incremental is-sharding: false meta-schema: "dm_meta" # enable-heartbeat: true @@ -17,8 +17,6 @@ target-database: mysql-instances: - source-id: "mysql-replica-01" meta: - binlog-name: binlog-name-placeholder-1 - binlog-pos: binlog-pos-placeholder-1 binlog-gtid: binlog-gtid-placeholder-1 block-allow-list: "instance" mydumper-config-name: "global" @@ -29,7 +27,6 @@ mysql-instances: meta: binlog-name: binlog-name-placeholder-2 binlog-pos: binlog-pos-placeholder-2 - binlog-gtid: binlog-gtid-placeholder-2 block-allow-list: "instance" mydumper-config-name: "global" loader-config-name: "global" diff --git a/dm/tests/incremental_mode/conf/source1.yaml b/dm/tests/incremental_mode/conf/source1.yaml index 679a2f4db7c..ebed995e45f 100644 --- a/dm/tests/incremental_mode/conf/source1.yaml +++ b/dm/tests/incremental_mode/conf/source1.yaml @@ -2,7 +2,7 @@ source-id: mysql-replica-01 flavor: 'mysql' enable-gtid: true relay-binlog-name: '' -relay-binlog-gtid: '' +relay-binlog-gtid: binlog-gtid-placeholder enable-relay: false from: host: 127.0.0.1 diff --git a/dm/tests/incremental_mode/data/db1.prepare.sql b/dm/tests/incremental_mode/data/db1.prepare.sql index e337422e448..6d422d3d87a 100644 --- a/dm/tests/incremental_mode/data/db1.prepare.sql +++ b/dm/tests/incremental_mode/data/db1.prepare.sql @@ -1,4 +1,5 @@ drop database if exists `incremental_mode`; +reset master; create database `incremental_mode`; use `incremental_mode`; create table t1 (id int, name varchar(20), primary key(`id`)); diff --git a/dm/tests/incremental_mode/data/db2.prepare.sql b/dm/tests/incremental_mode/data/db2.prepare.sql index feff2661656..e02768fc6f4 100644 --- a/dm/tests/incremental_mode/data/db2.prepare.sql +++ b/dm/tests/incremental_mode/data/db2.prepare.sql @@ -1,4 +1,5 @@ drop database if exists `incremental_mode`; +reset master; create database `incremental_mode`; use `incremental_mode`; create table t2 (id int auto_increment, name varchar(20), primary key (`id`)); diff --git a/dm/tests/incremental_mode/run.sh b/dm/tests/incremental_mode/run.sh index e86ace8800b..1955ab305e4 100755 --- a/dm/tests/incremental_mode/run.sh +++ b/dm/tests/incremental_mode/run.sh @@ -9,11 +9,22 @@ TASK_NAME="test" API_VERSION="v1alpha1" +function get_uuid() { + uuid=$(echo "show variables like '%server_uuid%';" | MYSQL_PWD=123456 mysql -uroot -h$1 -P$2 | awk 'FNR == 2 {print $2}') + echo $uuid +} + +function get_binlog_name() { + binlog_name=$(echo "SHOW BINARY LOGS;" | MYSQL_PWD=123456 mysql -uroot -h127.0.0.1 -P3307 | awk 'FNR == 2 {print $1}') + echo $binlog_name +} + function run() { run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_contains 'Query OK, 2 rows affected' run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 check_contains 'Query OK, 3 rows affected' + uuid=($(get_uuid $MYSQL_HOST1 $MYSQL_PORT1)) export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/dm/worker/defaultKeepAliveTTL=return(1)" @@ -38,6 +49,7 @@ function run() { # operate mysql config to worker cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "s/binlog-gtid-placeholder/$uuid:0/g" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 @@ -67,43 +79,10 @@ function run() { run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT - # start a task in `full` mode - echo "start task in full mode" - cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml - sed -i "s/task-mode-placeholder/full/g" $WORK_DIR/dm-task.yaml - # avoid cannot unmarshal !!str `binlog-...` into uint32 error - sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml - dmctl_start_task $WORK_DIR/dm-task.yaml - - check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - - dmctl_stop_task $TASK_NAME - - # $worker1_run_source_1 > 0 means source1 is operated to worker1 - worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true - if [ $worker1_run_source_1 -gt 0 ]; then - name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - else - name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ') - gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ') - fi - # kill worker1 and worker2 kill_dm_worker check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 - # start a task in `incremental` mode - # using account with limited privileges run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 check_count 'Query OK, 0 rows affected' 7 @@ -112,7 +91,6 @@ function run() { # update mysql config sed -i "s/root/dm_incremental/g" $WORK_DIR/source1.yaml - sed -i "s/relay-binlog-gtid: ''/relay-binlog-gtid: '$gtid1'/g" $WORK_DIR/source1.yaml sed -i "s/root/dm_incremental/g" $WORK_DIR/source2.yaml run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ @@ -142,16 +120,12 @@ function run() { "start-relay -s $worker2bound worker2" \ "\"result\": true" 2 - echo "start task in incremental mode" - cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml - sed -i "s/task-mode-placeholder/incremental/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-name-placeholder-1//g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-1//g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml - - sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml - sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml + worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true + echo "start task in incremental mode with nil gtid/pos" + binlog_name=($(get_binlog_name $MYSQL_HOST1 $MYSQL_PORT1)) + sed "s/binlog-gtid-placeholder-1/$uuid:0/g" $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml + sed -i "s/binlog-name-placeholder-2/$binlog_name/g" $WORK_DIR/dm-task.yaml + sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml # test graceful display error export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/GetEventError=return' @@ -179,9 +153,9 @@ function run() { sleep 3 # check not specify binlog name could also update active relay log if [ $worker1_run_source_1 -gt 0 ]; then - grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker1/log/dm-worker.log + grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker1/log/dm-worker.log else - grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker2/log/dm-worker.log + grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker2/log/dm-worker.log fi run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 From 18a79623523f357bf1e0a8fb7bc733766c7a28aa Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Fri, 6 May 2022 17:20:59 +0800 Subject: [PATCH 2/3] fix ut --- dm/relay/local_reader_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dm/relay/local_reader_test.go b/dm/relay/local_reader_test.go index a5889ccb979..11ee5374d72 100644 --- a/dm/relay/local_reader_test.go +++ b/dm/relay/local_reader_test.go @@ -55,7 +55,7 @@ type testReaderSuite struct { func (t *testReaderSuite) SetUpSuite(c *C) { var err error t.lastPos = 0 - t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0") + t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:1") c.Assert(err, IsNil) c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/relay/SetHeartbeatInterval", "return(10000)"), IsNil) } @@ -613,7 +613,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) { { "ba8f633f-1f15-11eb-b1c7-0242ac110002", "ba8f633f-1f15-11eb-b1c7-0242ac110002.000001", - "ba8f633f-1f15-11eb-b1c7-0242ac110002:0", + "ba8f633f-1f15-11eb-b1c7-0242ac110002:1", []FileEventResult{ { "mysql.000001", From 73806a43b579401d54896c59e067db98e221bf40 Mon Sep 17 00:00:00 2001 From: gmhdbjd Date: Tue, 10 May 2022 09:49:37 +0800 Subject: [PATCH 3/3] address comment --- dm/pkg/gtid/gtid.go | 2 +- dm/pkg/gtid/gtid_test.go | 11 +++++++++++ dm/tests/incremental_mode/run.sh | 8 ++++---- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/dm/pkg/gtid/gtid.go b/dm/pkg/gtid/gtid.go index 4290608a047..249c161d105 100644 --- a/dm/pkg/gtid/gtid.go +++ b/dm/pkg/gtid/gtid.go @@ -100,7 +100,7 @@ func ParserGTID(flavor, gtidStr string) (Set, error) { // check whether a gtid set is nil(start sync from start) // mysql: uuid:0 -// mariadb: x-y-0(no need to handle) +// mariadb: 0-0-0(no need to handle) func IsNilMySQLGTIDSet(gStr string) bool { sp := strings.Split(gStr, ",") if len(sp) != 1 { diff --git a/dm/pkg/gtid/gtid_test.go b/dm/pkg/gtid/gtid_test.go index fbe731bdaa8..1d126adeb18 100644 --- a/dm/pkg/gtid/gtid_test.go +++ b/dm/pkg/gtid/gtid_test.go @@ -19,6 +19,7 @@ import ( "github.com/go-mysql-org/go-mysql/mysql" . "github.com/pingcap/check" + "github.com/stretchr/testify/require" "github.com/pingcap/tiflow/dm/pkg/terror" ) @@ -96,6 +97,16 @@ func (s *testGTIDSuite) TestSortingGTIDSet(c *C) { c.Assert(gSet.String(), Equals, sortedGTIDSet) } +func TestIsNilGTIDSet(t *testing.T) { + require.False(t, IsNilMySQLGTIDSet("")) + require.False(t, IsNilMySQLGTIDSet("xxxxx")) + require.False(t, IsNilMySQLGTIDSet("xxxxx:0,yyyy:0")) + require.False(t, IsNilMySQLGTIDSet("xxxxx:1-2")) + require.False(t, IsNilMySQLGTIDSet("xxxxx:0-0")) + require.True(t, IsNilMySQLGTIDSet("xxxxx:0")) + require.True(t, IsNilMySQLGTIDSet(" xxxxx:0 ")) +} + func (s *testGTIDSuite) TestMinGTIDSet(c *C) { gset := MinGTIDSet(mysql.MySQLFlavor) _, ok := gset.(*MySQLGTIDSet) diff --git a/dm/tests/incremental_mode/run.sh b/dm/tests/incremental_mode/run.sh index 1955ab305e4..53988c66821 100755 --- a/dm/tests/incremental_mode/run.sh +++ b/dm/tests/incremental_mode/run.sh @@ -121,8 +121,8 @@ function run() { "\"result\": true" 2 worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true - echo "start task in incremental mode with nil gtid/pos" - binlog_name=($(get_binlog_name $MYSQL_HOST1 $MYSQL_PORT1)) + echo "start task in incremental mode with zero gtid/pos" + binlog_name=($(get_binlog_name $MYSQL_HOST2 $MYSQL_PORT2)) sed "s/binlog-gtid-placeholder-1/$uuid:0/g" $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml sed -i "s/binlog-name-placeholder-2/$binlog_name/g" $WORK_DIR/dm-task.yaml sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml @@ -153,9 +153,9 @@ function run() { sleep 3 # check not specify binlog name could also update active relay log if [ $worker1_run_source_1 -gt 0 ]; then - grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker1/log/dm-worker.log - else grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker2/log/dm-worker.log + else + grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker1/log/dm-worker.log fi run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1