From 4dc30f3f7adb8f4e9f4a802d400d1e7a6175f827 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Thu, 4 Feb 2021 17:42:18 +0800 Subject: [PATCH 01/11] relay: use `gtid_purged` to handle gap in Previous_gtids event --- pkg/gtid/gtid.go | 1 + relay/relay.go | 1 - tests/gtid/conf/diff_config.toml | 58 +++++++++++++++++++++ tests/gtid/conf/dm-master.toml | 5 ++ tests/gtid/conf/dm-task.yaml | 23 +++++++++ tests/gtid/conf/dm-worker1.toml | 2 + tests/gtid/conf/dm-worker2.toml | 2 + tests/gtid/conf/source1.yaml | 11 ++++ tests/gtid/conf/source2.yaml | 10 ++++ tests/gtid/data/db1.increment.sql | 2 + tests/gtid/data/db1.prepare.sql | 5 ++ tests/gtid/data/db2.increment.sql | 2 + tests/gtid/data/db2.prepare.sql | 5 ++ tests/gtid/run.sh | 85 +++++++++++++++++++++++++++++++ tests/others_integration.txt | 1 + 15 files changed, 212 insertions(+), 1 deletion(-) create mode 100644 tests/gtid/conf/diff_config.toml create mode 100644 tests/gtid/conf/dm-master.toml create mode 100644 tests/gtid/conf/dm-task.yaml create mode 100644 tests/gtid/conf/dm-worker1.toml create mode 100644 tests/gtid/conf/dm-worker2.toml create mode 100644 tests/gtid/conf/source1.yaml create mode 100644 tests/gtid/conf/source2.yaml create mode 100644 tests/gtid/data/db1.increment.sql create mode 100644 tests/gtid/data/db1.prepare.sql create mode 100644 tests/gtid/data/db2.increment.sql create mode 100644 tests/gtid/data/db2.prepare.sql create mode 100755 tests/gtid/run.sh diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index cf4a2b91ae..1465eb3400 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -40,6 +40,7 @@ type Set interface { // NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid. // like truncating `00c04543-f584-11e9-a765-0242ac120002:1-100` with `00c04543-f584-11e9-a765-0242ac120002:40-60` // should become `00c04543-f584-11e9-a765-0242ac120002:1-60`. + // TODO: support gap Truncate(end Set) error // ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed diff --git a/relay/relay.go b/relay/relay.go index 7c7675af14..65f5e5fdf1 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -990,7 +990,6 @@ func (r *Relay) setSyncConfig() error { // AdjustGTID implements Relay.AdjustGTID // starting sync at returned gset will wholly fetch a binlog from beginning of the file. -// TODO: check if starting fetch at the middle of binlog is also acceptable func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) { // setup a TCP binlog reader (because no relay can be used when upgrading). syncCfg := r.syncerCfg diff --git a/tests/gtid/conf/diff_config.toml b/tests/gtid/conf/diff_config.toml new file mode 100644 index 0000000000..ca623a38af --- /dev/null +++ b/tests/gtid/conf/diff_config.toml @@ -0,0 +1,58 @@ +# diff Configuration. + +log-level = "info" + +chunk-size = 1000 + +check-thread-count = 4 + +sample-percent = 100 + +use-rowid = false + +use-checksum = true + +fix-sql-file = "fix.sql" + +# tables need to check. +[[check-tables]] +schema = "gtid" +tables = ["~t.*"] + +[[table-config]] +schema = "gtid" +table = "t1" + +[[table-config.source-tables]] +instance-id = "source-1" +schema = "gtid" +table = "t1" + +[[table-config]] +schema = "gtid" +table = "t2" + +[[table-config.source-tables]] +instance-id = "source-2" +schema = "gtid" +table = "t2" + +[[source-db]] +host = "127.0.0.1" +port = 3306 +user = "root" +password = "123456" +instance-id = "source-1" + +[[source-db]] +host = "127.0.0.1" +port = 3307 +user = "root" +password = "123456" +instance-id = "source-2" + +[target-db] +host = "127.0.0.1" +port = 4000 +user = "test" +password = "123456" diff --git a/tests/gtid/conf/dm-master.toml b/tests/gtid/conf/dm-master.toml new file mode 100644 index 0000000000..9b360834d3 --- /dev/null +++ b/tests/gtid/conf/dm-master.toml @@ -0,0 +1,5 @@ +# Master Configuration. +master-addr = ":8261" +advertise-addr = "127.0.0.1:8261" + +rpc-timeout = "30s" diff --git a/tests/gtid/conf/dm-task.yaml b/tests/gtid/conf/dm-task.yaml new file mode 100644 index 0000000000..8e96b67f10 --- /dev/null +++ b/tests/gtid/conf/dm-task.yaml @@ -0,0 +1,23 @@ +--- +name: test +task-mode: all +is-sharding: false +clean-dump-file: false + +target-database: + host: "127.0.0.1" + port: 4000 + user: "root" + password: "" + +mysql-instances: + - source-id: "mysql-replica-01" + block-allow-list: "instance" + + - source-id: "mysql-replica-02" + block-allow-list: "instance" + +black-white-list: # compatible with deprecated config + instance: + do-dbs: ["gtid"] + diff --git a/tests/gtid/conf/dm-worker1.toml b/tests/gtid/conf/dm-worker1.toml new file mode 100644 index 0000000000..7a72ea72bf --- /dev/null +++ b/tests/gtid/conf/dm-worker1.toml @@ -0,0 +1,2 @@ +name = "worker1" +join = "127.0.0.1:8261" diff --git a/tests/gtid/conf/dm-worker2.toml b/tests/gtid/conf/dm-worker2.toml new file mode 100644 index 0000000000..010e21c73e --- /dev/null +++ b/tests/gtid/conf/dm-worker2.toml @@ -0,0 +1,2 @@ +name = "worker2" +join = "127.0.0.1:8261" diff --git a/tests/gtid/conf/source1.yaml b/tests/gtid/conf/source1.yaml new file mode 100644 index 0000000000..8c4ca2f94e --- /dev/null +++ b/tests/gtid/conf/source1.yaml @@ -0,0 +1,11 @@ +# MySQL Configuration. + +source-id: mysql-replica-01 +enable-gtid: true +enable-relay: true +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3306 + diff --git a/tests/gtid/conf/source2.yaml b/tests/gtid/conf/source2.yaml new file mode 100644 index 0000000000..3ecf14ac98 --- /dev/null +++ b/tests/gtid/conf/source2.yaml @@ -0,0 +1,10 @@ +# MySQL Configuration. + +source-id: mysql-replica-02 +enable-gtid: true +enable-relay: false +from: + host: 127.0.0.1 + user: root + password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= + port: 3307 diff --git a/tests/gtid/data/db1.increment.sql b/tests/gtid/data/db1.increment.sql new file mode 100644 index 0000000000..69a3f9740c --- /dev/null +++ b/tests/gtid/data/db1.increment.sql @@ -0,0 +1,2 @@ +use gtid; +insert into t1 values (2); diff --git a/tests/gtid/data/db1.prepare.sql b/tests/gtid/data/db1.prepare.sql new file mode 100644 index 0000000000..51f43839b8 --- /dev/null +++ b/tests/gtid/data/db1.prepare.sql @@ -0,0 +1,5 @@ +drop database if exists `gtid`; +create database `gtid`; +use `gtid`; +create table t1 (id int PRIMARY KEY); +insert into t1 values (1); \ No newline at end of file diff --git a/tests/gtid/data/db2.increment.sql b/tests/gtid/data/db2.increment.sql new file mode 100644 index 0000000000..e2415de822 --- /dev/null +++ b/tests/gtid/data/db2.increment.sql @@ -0,0 +1,2 @@ +use gtid; +insert into t2 values (2); \ No newline at end of file diff --git a/tests/gtid/data/db2.prepare.sql b/tests/gtid/data/db2.prepare.sql new file mode 100644 index 0000000000..a0a7a5da36 --- /dev/null +++ b/tests/gtid/data/db2.prepare.sql @@ -0,0 +1,5 @@ +drop database if exists `gtid`; +create database `gtid`; +use `gtid`; +create table t2 (id int primary key); +insert into t2 values (1); \ No newline at end of file diff --git a/tests/gtid/run.sh b/tests/gtid/run.sh new file mode 100755 index 0000000000..ea6b2286f6 --- /dev/null +++ b/tests/gtid/run.sh @@ -0,0 +1,85 @@ +#!/bin/bash + +set -eu + +cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd ) +source $cur/../_utils/test_prepare +WORK_DIR=$TEST_DIR/$TEST_NAME +API_VERSION="v1alpha1" +TASK_NAME="test" + +function run() { + run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 + + # start DM worker and master + run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml + check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + # operate mysql config to worker + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 + + dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # test if upstream purged binlog, DM should report error + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + # TODO: when there's a purged gap, if starting gtid set covers gap, there should be no data lost + # if starting gtid set not fully covers gap, behaviour should be same whether we enable relay + # hard to reproduce in CI + + # when there's a not purged gap, there should be no data lost + # we manually `set gtid_next = 'uuid:gtid'`to reproduce + gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2,":",$3}'|tr -d ' ') + gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata|awk -F: '{print $2,":",$3}'|tr -d ' ') + uuid1=$(echo $gtid1 | awk -F: '{print $1}') + uuid2=$(echo $gtid2 | awk -F: '{print $1}') + end_gtid_num1=$(echo $gtid1 | awk -F: '{print $2}' | awk -F- '{print $2}') + end_gtid_num2=$(echo $gtid2 | awk -F: '{print $2}' | awk -F- '{print $2}') + new_gtid1=${uuid1}:$((end_gtid_num1 + 3)) + new_gtid2=${uuid2}:$((end_gtid_num2 + 3)) + echo "new_gtid1 $new_gtid1 new_gtid2 $new_gtid2" + + run_sql "SET gtid_next='$new_gtid1';insert into gtid.t1 values (3);SET gtid_next='AUTOMATIC';" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "SET gtid_next='$new_gtid2';insert into gtid.t2 values (3);SET gtid_next='AUTOMATIC'" $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql "flush logs" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "flush logs" $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql "insert into gtid.t1 values (4)" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "insert into gtid.t2 values (4)" $MYSQL_PORT2 $MYSQL_PASSWORD2 + # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:7 + + # remove relay-dir, to check if relay correctly handle gap + pkill -hup dm-worker.test 2>/dev/null || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + rm -rf $WORK_DIR/worker1/relay_log || true + rm -rf $WORK_DIR/worker2/relay_log || true + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task.yaml"\ + "\"result\": true" 3 + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml +} + +cleanup_data gtid +# also cleanup dm processes in case of last run failed +cleanup_process $* +run $* +cleanup_process $* + +echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>" diff --git a/tests/others_integration.txt b/tests/others_integration.txt index 4631ecf8b5..eff8b3c429 100644 --- a/tests/others_integration.txt +++ b/tests/others_integration.txt @@ -7,3 +7,4 @@ dm_syncer sequence_sharding_optimistic sequence_sharding_removemeta drop_column_with_index +gtid From 1493950bb889bccaadee35799a0622f567b3dc5f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 15:43:27 +0800 Subject: [PATCH 02/11] update test --- tests/gtid/data/db1.prepare.sql | 1 + tests/gtid/data/db2.prepare.sql | 1 + tests/gtid/run.sh | 36 ++++++++++++++++++++------------- 3 files changed, 24 insertions(+), 14 deletions(-) diff --git a/tests/gtid/data/db1.prepare.sql b/tests/gtid/data/db1.prepare.sql index 51f43839b8..e86aeda508 100644 --- a/tests/gtid/data/db1.prepare.sql +++ b/tests/gtid/data/db1.prepare.sql @@ -1,4 +1,5 @@ drop database if exists `gtid`; +reset master; create database `gtid`; use `gtid`; create table t1 (id int PRIMARY KEY); diff --git a/tests/gtid/data/db2.prepare.sql b/tests/gtid/data/db2.prepare.sql index a0a7a5da36..02385f8817 100644 --- a/tests/gtid/data/db2.prepare.sql +++ b/tests/gtid/data/db2.prepare.sql @@ -1,4 +1,5 @@ drop database if exists `gtid`; +reset master; create database `gtid`; use `gtid`; create table t2 (id int primary key); diff --git a/tests/gtid/run.sh b/tests/gtid/run.sh index ea6b2286f6..99d5637689 100755 --- a/tests/gtid/run.sh +++ b/tests/gtid/run.sh @@ -12,29 +12,25 @@ function run() { run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2 - # start DM worker and master + cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml + cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml + sed -i "/from:/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml + + # start DM worker and source one-by-one, make sure the source1 bound to worker1 run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT - # operate mysql config to worker - cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml - cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml - sed -i "/from:/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml - sed -i "/from:/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml - dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1 dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2 dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # test if upstream purged binlog, DM should report error - run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ - "stop-task test"\ - "\"result\": true" 3 - # TODO: when there's a purged gap, if starting gtid set covers gap, there should be no data lost # if starting gtid set not fully covers gap, behaviour should be same whether we enable relay # hard to reproduce in CI @@ -57,9 +53,19 @@ function run() { run_sql "flush logs" $MYSQL_PORT2 $MYSQL_PASSWORD2 run_sql "insert into gtid.t1 values (4)" $MYSQL_PORT1 $MYSQL_PASSWORD1 run_sql "insert into gtid.t2 values (4)" $MYSQL_PORT2 $MYSQL_PASSWORD2 - # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:7 + # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 - # remove relay-dir, to check if relay correctly handle gap + sleep 1 + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + run_sql "insert into gtid.t1 values (5)" $MYSQL_PORT1 $MYSQL_PASSWORD1 + run_sql "insert into gtid.t2 values (5)" $MYSQL_PORT2 $MYSQL_PASSWORD2 + # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-6 + + # remove relay-dir, now relay starting point(syncer checkpoint) should be 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 + # check if relay correctly handle gap pkill -hup dm-worker.test 2>/dev/null || true check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 @@ -74,6 +80,8 @@ function run() { "start-task $cur/conf/dm-task.yaml"\ "\"result\": true" 3 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + # TODO: test if should error, error indeed } cleanup_data gtid From b80492cc69869274ab47fe8caadf566febd8bfa5 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 15:44:06 +0800 Subject: [PATCH 03/11] add code change --- pkg/gtid/gtid.go | 30 ------------------------ pkg/gtid/gtid_test.go | 39 ------------------------------- pkg/utils/db.go | 43 +++++++++++++++++++++++++++++++++++ pkg/v1dbschema/schema.go | 19 ++++------------ pkg/v1dbschema/schema_test.go | 6 +++-- relay/relay.go | 38 +++++++++++-------------------- 6 files changed, 64 insertions(+), 111 deletions(-) diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index 1465eb3400..58ef252828 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -43,9 +43,6 @@ type Set interface { // TODO: support gap Truncate(end Set) error - // ResetStart reset the start of interval to 1 (only meaningful for MySQLGTIDSet), returns true if Set is changed - ResetStart() bool - String() string } @@ -261,28 +258,6 @@ func (g *MySQLGTIDSet) Truncate(end Set) error { return nil } -// ResetStart resets the start part of GTID sets, -// like `00c04543-f584-11e9-a765-0242ac120002:40-60` will be reset to `00c04543-f584-11e9-a765-0242ac120002:1-60`. -// return `true` if reset real happen. -func (g *MySQLGTIDSet) ResetStart() bool { - if g == nil || g.set == nil { - return false - } - - reset := false - for _, set := range g.set.Sets { - for i, inter := range set.Intervals { - if inter.Start > 1 { - inter.Start = 1 - set.Intervals[i] = inter // re-assign - reset = true - } - } - } - - return reset -} - func (g *MySQLGTIDSet) String() string { if g.set == nil { return "" @@ -445,11 +420,6 @@ func (m *MariadbGTIDSet) Truncate(end Set) error { return nil } -// ResetStart does nothing because for MariaDB its GTID set format is `domainID-serverID-SeqNum`. -func (m *MariadbGTIDSet) ResetStart() bool { - return false -} - func (m *MariadbGTIDSet) String() string { if m.set == nil { return "" diff --git a/pkg/gtid/gtid_test.go b/pkg/gtid/gtid_test.go index 1f0084d0e2..e1d8b4086a 100644 --- a/pkg/gtid/gtid_test.go +++ b/pkg/gtid/gtid_test.go @@ -401,42 +401,3 @@ func (s *testGTIDSuite) TestMariaDBGTIDTruncate(c *C) { } } } - -func (s *testGTIDSuite) TestGTIDSetResetStart(c *C) { - var ( - gMaria, _ = ParserGTID("", "1-2-3") - flavor = "mysql" - gNil *MySQLGTIDSet - gEmpty, _ = ParserGTID(flavor, "") - g1, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100") - g2, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:100") - g3, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:50-100") - g4, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100") - g5, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") - g6, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:40-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:50-100") - g7, _ = ParserGTID(flavor, "00c04543-f584-11e9-a765-0242ac120002:10-20:30-100") - ) - - c.Assert(gMaria.ResetStart(), IsFalse) - c.Assert(gNil.ResetStart(), IsFalse) - c.Assert(gEmpty.ResetStart(), IsFalse) - - c.Assert(g1.ResetStart(), IsFalse) - - c.Assert(g2.ResetStart(), IsTrue) - c.Assert(g2.Equal(g1), IsTrue) - - c.Assert(g3.ResetStart(), IsTrue) - c.Assert(g3.Equal(g1), IsTrue) - - c.Assert(g4.ResetStart(), IsFalse) - - c.Assert(g5.ResetStart(), IsTrue) - c.Assert(g5.Equal(g4), IsTrue) - - c.Assert(g6.ResetStart(), IsTrue) - c.Assert(g6.Equal(g4), IsTrue) - - c.Assert(g7.ResetStart(), IsTrue) - // TODO: currently g7 will become "00c04543-f584-11e9-a765-0242ac120002:1-20:1-100", will fix soon -} diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 25fb57921a..290a97badf 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -473,3 +473,46 @@ func ExtractTiDBVersion(version string) (*semver.Version, error) { rawVersion = strings.TrimPrefix(rawVersion, "v") return semver.NewVersion(rawVersion) } + +// AddGSetWithPurged is used to handle this case: https://github.com/pingcap/dm/issues/1418 +// we might get a gtid set from Previous_gtids event in binlog, but that gtid set can't be used to start a gtid sync +// because it doesn't cover all gtid_purged. The error of using it will be +// ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires. +// so we add gtid_purged to it. +func AddGSetWithPurged(ctx context.Context, gset gtid.Set, conn *sql.Conn) (gtid.Set, error) { + if _, ok := gset.(*gtid.MariadbGTIDSet); ok { + // TODO: check MariaDB really don't need this (MariaDB doesn't have a gtid_purged though) + return gset, nil + } + + var ( + gtidStr string + row *sql.Row + err error + ) + + failpoint.Inject("GetGTIDPurged", func(val failpoint.Value) { + str := val.(string) + gtidStr = str + failpoint.Goto("bypass") + }) + row = conn.QueryRowContext(ctx, "select @@GLOBAL.gtid_purged") + err = row.Scan(>idStr) + if err != nil { + log.L().Error("can't get @@GLOBAL.gtid_purged when try to add it to gtid set", zap.Error(err)) + return gset, terror.DBErrorAdapt(err, terror.ErrDBDriverError) + } + failpoint.Label("bypass") + if gtidStr == "" { + return gset, nil + } + + newGset := gset.Origin() + err = newGset.Update(gtidStr) + if err != nil { + return nil, err + } + ret := >id.MySQLGTIDSet{} + _ = ret.Set(newGset) + return ret, nil +} diff --git a/pkg/v1dbschema/schema.go b/pkg/v1dbschema/schema.go index a6d362680a..6c9eb10572 100644 --- a/pkg/v1dbschema/schema.go +++ b/pkg/v1dbschema/schema.go @@ -105,6 +105,10 @@ func updateSyncerCheckpoint(tctx *tcontext.Context, dbConn *conn.BaseConn, taskN if err != nil { return terror.Annotatef(err, "get GTID sets for position %s", pos) } + gs, err = utils.AddGSetWithPurged(tctx.Context(), gs, dbConn.DBConn) + if err != nil { + return terror.Annotatef(err, "get GTID sets for position %s", pos) + } logger.Info("got global checkpoint GTID sets", log.WrapStringerField("GTID sets", gs)) } } @@ -176,21 +180,6 @@ func getGlobalPos(tctx *tcontext.Context, dbConn *conn.BaseConn, tableName, sour // getGTIDsForPos gets the GTID sets for the position. func getGTIDsForPos(tctx *tcontext.Context, pos gmysql.Position, tcpReader reader.Reader) (gs gtid.Set, err error) { - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - defer func() { - if err == nil && gs != nil { - oldGs := gs.Clone() - if gs.ResetStart() { - tctx.L().Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", gs)) - } - } - }() - // NOTE: because we have multiple unit test cases updating/clearing binlog in the upstream, // we may encounter errors when reading binlog event but cleared by another test case. failpoint.Inject("MockGetGTIDsForPos", func(val failpoint.Value) { diff --git a/pkg/v1dbschema/schema_test.go b/pkg/v1dbschema/schema_test.go index 014fbd5446..a5ced3cafd 100644 --- a/pkg/v1dbschema/schema_test.go +++ b/pkg/v1dbschema/schema_test.go @@ -113,13 +113,15 @@ func (t *testSchema) TestSchemaV106ToV20x(c *C) { endGS, _ = gtid.ParserGTID(gmysql.MySQLFlavor, "ccb992ad-a557-11ea-ba6a-0242ac140002:1-16") ) - c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) // need `ResetStart`. + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:10-16")`), IsNil) //nolint:errcheck defer failpoint.Disable("github.com/pingcap/dm/pkg/v1dbschema/MockGetGTIDsForPos") + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("ccb992ad-a557-11ea-ba6a-0242ac140002:1-9")`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged") dbConn, err := t.db.GetBaseConn(tctx.Ctx) c.Assert(err, IsNil) - defer func() { _, err = dbConn.ExecuteSQL(tctx, nil, cfg.Name, []string{ `DROP DATABASE ` + cfg.MetaSchema, diff --git a/relay/relay.go b/relay/relay.go index 65f5e5fdf1..b8147f3df3 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -379,20 +379,13 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser zap.Stringer("from position", latestPos), zap.Stringer("to position", result.LatestPos), log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) if result.LatestGTIDs != nil { - gs := result.LatestGTIDs - oldGs1 := gs.Clone() - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - if gs.ResetStart() { - r.logger.Warn("force to reset the start part of recovered GTID sets", zap.Stringer("from GTID set", oldGs1), zap.Stringer("to GTID set", gs)) - oldGs2 := latestGTID.Clone() - if latestGTID.ResetStart() { - r.logger.Warn("force to reset the start part of latest GTID sets", zap.Stringer("from GTID set", oldGs2), zap.Stringer("to GTID set", latestGTID)) - } + dbConn, err2 := r.db.Conn(ctx) + if err2 != nil { + return err2 + } + result.LatestGTIDs, err2 = utils.AddGSetWithPurged(ctx, result.LatestGTIDs, dbConn) + if err2 != nil { + return err2 } } @@ -400,6 +393,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser r.logger.Warn("some GTIDs are missing in the meta data, this is usually due to the process was interrupted while writing the meta data. force to update GTIDs", log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) latestGTID = result.LatestGTIDs.Clone() + // TODO: check this Truncate support gap } else if err = latestGTID.Truncate(result.LatestGTIDs); err != nil { return err } @@ -965,7 +959,7 @@ func (r *Relay) setSyncConfig() error { } syncerCfg := replication.BinlogSyncerConfig{ - ServerID: uint32(r.cfg.ServerID), + ServerID: r.cfg.ServerID, Flavor: r.cfg.Flavor, Host: r.cfg.From.Host, Port: uint16(r.cfg.From.Port), @@ -1005,15 +999,9 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) return nil, err } - // in MySQL, we expect `PreviousGTIDsEvent` contains ALL previous GTID sets, but in fact it may lack a part of them sometimes, - // e.g we expect `00c04543-f584-11e9-a765-0242ac120002:1-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:1-100`, - // but may be `00c04543-f584-11e9-a765-0242ac120002:50-100,03fc0263-28c7-11e7-a653-6c0b84d59f30:60-100`. - // and when DM requesting MySQL to send binlog events with this EXCLUDED GTID sets, some errors like - // `ERROR 1236 (HY000): The slave is connecting using CHANGE MASTER TO MASTER_AUTO_POSITION = 1, but the master has purged binary logs containing GTIDs that the slave requires.` - // may occur, so we force to reset the START part of any GTID set. - oldGs := resultGs.Clone() - if resultGs.ResetStart() { - r.logger.Warn("force to reset the start part of GTID sets", zap.Stringer("from GTID set", oldGs), zap.Stringer("to GTID set", resultGs)) + dbConn, err2 := r.db.Conn(ctx) + if err2 != nil { + return nil, err2 } - return resultGs, nil + return utils.AddGSetWithPurged(ctx, resultGs, dbConn) } From 58f46bf3739dcc9f962a8ccaaa9bab9f16e6adc9 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 16:52:34 +0800 Subject: [PATCH 04/11] fix CI --- pkg/gtid/gtid.go | 1 - relay/relay_test.go | 13 ++++++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/pkg/gtid/gtid.go b/pkg/gtid/gtid.go index 58ef252828..6381d1df93 100644 --- a/pkg/gtid/gtid.go +++ b/pkg/gtid/gtid.go @@ -40,7 +40,6 @@ type Set interface { // NOTE: the original GTID sets should contain the end GTID sets, otherwise it's invalid. // like truncating `00c04543-f584-11e9-a765-0242ac120002:1-100` with `00c04543-f584-11e9-a765-0242ac120002:40-60` // should become `00c04543-f584-11e9-a765-0242ac120002:1-60`. - // TODO: support gap Truncate(end Set) error String() string diff --git a/relay/relay_test.go b/relay/relay_test.go index ed06e41621..7ae4f46562 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -28,6 +28,7 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/parser" "github.com/siddontang/go-mysql/mysql" gmysql "github.com/siddontang/go-mysql/mysql" @@ -178,6 +179,9 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { relayCfg = newRelayCfg(c, mysql.MySQLFlavor) r = NewRelay(relayCfg).(*Relay) ) + c.Assert(failpoint.Enable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged", `return("406a3f61-690d-11e7-87c5-6c92bf46f384:1-122")`), IsNil) + //nolint:errcheck + defer failpoint.Disable("github.com/pingcap/dm/pkg/utils/GetGTIDPurged") c.Assert(r.Init(context.Background()), IsNil) // purge old relay dir f, err := os.Create(filepath.Join(r.cfg.RelayDir, "old_relay_log")) @@ -256,9 +260,10 @@ func (t *testRelaySuite) TestTryRecoverMeta(c *C) { previousGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-14,53bfca22-690d-11e7-8a62-18ded7a37b78:1-495,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" - recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456 - filename = "mysql-bin.000001" - startPos = gmysql.Position{Name: filename, Pos: 123} + // if no @@gtid_purged, 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 should be not changed + recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" + filename = "mysql-bin.000001" + startPos = gmysql.Position{Name: filename, Pos: 123} parser2 = parser.New() relayCfg = newRelayCfg(c, mysql.MySQLFlavor) @@ -549,6 +554,8 @@ func (t *testRelaySuite) verifyMetadata(c *C, r *Relay, uuidExpected string, c.Assert(err, IsNil) c.Assert(uuid, Equals, uuidExpected) c.Assert(pos, DeepEquals, posExpected) + fmt.Println(gs) + fmt.Println(gsExpected) c.Assert(gs.Equal(gsExpected), IsTrue) indexFile := filepath.Join(r.cfg.RelayDir, utils.UUIDIndexFilename) From ecc47000c628e8c6608b2a2dd83e6dd3dae5a36e Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 17:32:08 +0800 Subject: [PATCH 05/11] try fix CI --- relay/relay.go | 1 - relay/relay_test.go | 2 +- tests/gtid/conf/source1.yaml | 3 ++- tests/gtid/conf/source2.yaml | 2 ++ tests/gtid/run.sh | 49 +++++++++++++++++++++++++++++------- 5 files changed, 45 insertions(+), 12 deletions(-) diff --git a/relay/relay.go b/relay/relay.go index b8147f3df3..a426e9b170 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -393,7 +393,6 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser r.logger.Warn("some GTIDs are missing in the meta data, this is usually due to the process was interrupted while writing the meta data. force to update GTIDs", log.WrapStringerField("from GTID set", latestGTID), log.WrapStringerField("to GTID set", result.LatestGTIDs)) latestGTID = result.LatestGTIDs.Clone() - // TODO: check this Truncate support gap } else if err = latestGTID.Truncate(result.LatestGTIDs); err != nil { return err } diff --git a/relay/relay_test.go b/relay/relay_test.go index 7ae4f46562..0c43ee456f 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -171,7 +171,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456 - greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" + greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" filename = "mysql-bin.000001" startPos = gmysql.Position{Name: filename, Pos: 123} diff --git a/tests/gtid/conf/source1.yaml b/tests/gtid/conf/source1.yaml index 8c4ca2f94e..3c702b3fc2 100644 --- a/tests/gtid/conf/source1.yaml +++ b/tests/gtid/conf/source1.yaml @@ -8,4 +8,5 @@ from: user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3306 - +checker: + check-enable: false diff --git a/tests/gtid/conf/source2.yaml b/tests/gtid/conf/source2.yaml index 3ecf14ac98..c37ddc7ded 100644 --- a/tests/gtid/conf/source2.yaml +++ b/tests/gtid/conf/source2.yaml @@ -8,3 +8,5 @@ from: user: root password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs= port: 3307 +checker: + check-enable: false diff --git a/tests/gtid/run.sh b/tests/gtid/run.sh index 99d5637689..b96ef0dfde 100755 --- a/tests/gtid/run.sh +++ b/tests/gtid/run.sh @@ -47,12 +47,11 @@ function run() { new_gtid2=${uuid2}:$((end_gtid_num2 + 3)) echo "new_gtid1 $new_gtid1 new_gtid2 $new_gtid2" - run_sql "SET gtid_next='$new_gtid1';insert into gtid.t1 values (3);SET gtid_next='AUTOMATIC';" $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "SET gtid_next='$new_gtid2';insert into gtid.t2 values (3);SET gtid_next='AUTOMATIC'" $MYSQL_PORT2 $MYSQL_PASSWORD2 - run_sql "flush logs" $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "flush logs" $MYSQL_PORT2 $MYSQL_PASSWORD2 - run_sql "insert into gtid.t1 values (4)" $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "insert into gtid.t2 values (4)" $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_source1 "SET gtid_next='$new_gtid1';insert into gtid.t1 values (3);SET gtid_next='AUTOMATIC';" + run_sql_source2 "SET gtid_next='$new_gtid2';insert into gtid.t2 values (3);SET gtid_next='AUTOMATIC'" + run_sql_both_source "flush logs" + run_sql_source1 "insert into gtid.t1 values (4)" + run_sql_source2 "insert into gtid.t2 values (4)" # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 sleep 1 @@ -60,8 +59,8 @@ function run() { "stop-task test"\ "\"result\": true" 3 - run_sql "insert into gtid.t1 values (5)" $MYSQL_PORT1 $MYSQL_PASSWORD1 - run_sql "insert into gtid.t2 values (5)" $MYSQL_PORT2 $MYSQL_PASSWORD2 + run_sql_source1 "insert into gtid.t1 values (5)" + run_sql_source2 "insert into gtid.t2 values (5)" # now Previous_gtids event is 09bec856-ba95-11ea-850a-58f2b4af5188:1-6 # remove relay-dir, now relay starting point(syncer checkpoint) should be 09bec856-ba95-11ea-850a-58f2b4af5188:1-4:6 @@ -76,12 +75,44 @@ function run() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + # we didn't lost 09bec856-ba95-11ea-850a-58f2b4af5188:5, which is insert into gtid.tx values (5) run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "start-task $cur/conf/dm-task.yaml"\ "\"result\": true" 3 check_sync_diff $WORK_DIR $cur/conf/diff_config.toml - # TODO: test if should error, error indeed + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "stop-task test"\ + "\"result\": true" 3 + + run_sql_source1 "insert into gtid.t1 values (7)" + run_sql_source2 "insert into gtid.t2 values (7)" + run_sql_both_source "flush logs" + run_sql_source1 "insert into gtid.t1 values (8)" + run_sql_source2 "insert into gtid.t2 values (8)" + sleep 2 + run_sql_both_source "purge binary logs before '`date '+%Y-%m-%d %H:%M:%S'`'" + + # remove relay-dir, now relay starting point(syncer checkpoint) should be 09bec856-ba95-11ea-850a-58f2b4af5188:1-6 + # which is already purged + pkill -hup dm-worker.test 2>/dev/null || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + rm -rf $WORK_DIR/worker1/relay_log || true + rm -rf $WORK_DIR/worker2/relay_log || true + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "start-task $cur/conf/dm-task.yaml" + + # both with and without relay should error + run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ + "query-status $TASK_NAME" \ + "no relay pos match gtid" 1 \ + "but the master has purged binary logs containing GTIDs that the slave requires" 1 } cleanup_data gtid From 112f1d8bb4f360ef066aad247b16d5f458828430 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 17:42:26 +0800 Subject: [PATCH 06/11] keep old behaviour --- relay/relay.go | 4 ++++ relay/relay_test.go | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/relay/relay.go b/relay/relay.go index a426e9b170..f2ff88e4a1 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -387,6 +387,10 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser if err2 != nil { return err2 } + latestGTID, err2 = utils.AddGSetWithPurged(ctx, latestGTID, dbConn) + if err2 != nil { + return err2 + } } if result.LatestGTIDs != nil && !result.LatestGTIDs.Equal(latestGTID) && result.LatestGTIDs.Contain(latestGTID) { diff --git a/relay/relay_test.go b/relay/relay_test.go index 0c43ee456f..7ae4f46562 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -171,7 +171,7 @@ func (t *testRelaySuite) TestTryRecoverLatestFile(c *C) { latestGTIDStr1 = "3ccc475b-2343-11e7-be21-6c0b84d59f30:14" latestGTIDStr2 = "53bfca22-690d-11e7-8a62-18ded7a37b78:495" recoverGTIDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-17,53bfca22-690d-11e7-8a62-18ded7a37b78:1-505,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" // 406a3f61-690d-11e7-87c5-6c92bf46f384:123-456 --> 406a3f61-690d-11e7-87c5-6c92bf46f384:1-456 - greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:1-456" + greaterGITDSetStr = "3ccc475b-2343-11e7-be21-6c0b84d59f30:1-20,53bfca22-690d-11e7-8a62-18ded7a37b78:1-510,406a3f61-690d-11e7-87c5-6c92bf46f384:123-456" filename = "mysql-bin.000001" startPos = gmysql.Position{Name: filename, Pos: 123} From 2718ca95122ac89e397fd6e40c38e70a678001f7 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 18:02:51 +0800 Subject: [PATCH 07/11] fix CI --- tests/gtid/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/gtid/run.sh b/tests/gtid/run.sh index b96ef0dfde..fe4b52cefa 100755 --- a/tests/gtid/run.sh +++ b/tests/gtid/run.sh @@ -109,10 +109,11 @@ function run() { "start-task $cur/conf/dm-task.yaml" # both with and without relay should error + # (different version of MySQL has different error message, only compare error code here) run_dm_ctl_with_retry $WORK_DIR "127.0.0.1:$MASTER_PORT" \ "query-status $TASK_NAME" \ "no relay pos match gtid" 1 \ - "but the master has purged binary logs containing GTIDs that the slave requires" 1 + "ERROR 1236 (HY000)" 1 } cleanup_data gtid From cc02004d82c8c59703b91d8f305224cd1fbd7875 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 5 Feb 2021 18:21:30 +0800 Subject: [PATCH 08/11] remove debug printing --- relay/relay_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/relay/relay_test.go b/relay/relay_test.go index 7ae4f46562..18c801fab2 100644 --- a/relay/relay_test.go +++ b/relay/relay_test.go @@ -554,8 +554,6 @@ func (t *testRelaySuite) verifyMetadata(c *C, r *Relay, uuidExpected string, c.Assert(err, IsNil) c.Assert(uuid, Equals, uuidExpected) c.Assert(pos, DeepEquals, posExpected) - fmt.Println(gs) - fmt.Println(gsExpected) c.Assert(gs.Equal(gsExpected), IsTrue) indexFile := filepath.Join(r.cfg.RelayDir, utils.UUIDIndexFilename) From ad6c30ea514a655a1bf6f7ad8287435fa4b93a3f Mon Sep 17 00:00:00 2001 From: lance6716 Date: Sun, 7 Feb 2021 17:01:23 +0800 Subject: [PATCH 09/11] fix CI --- .github/workflows/check-and-build.yml | 2 +- pkg/utils/db.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/check-and-build.yml b/.github/workflows/check-and-build.yml index 0d102cc5eb..e0271d3d0d 100644 --- a/.github/workflows/check-and-build.yml +++ b/.github/workflows/check-and-build.yml @@ -30,7 +30,7 @@ jobs: uses: actions/checkout@v2 - name: Cache go modules - uses: actions/cache@v2 + uses: actions/cache@v2.1.3 # latest v2 can't work in macOS https://github.com/actions/cache/issues/527 with: path: ~/go/pkg/mod key: ${{ runner.os }}-dm-${{ hashFiles('**/go.sum') }} diff --git a/pkg/utils/db.go b/pkg/utils/db.go index 290a97badf..600d694832 100644 --- a/pkg/utils/db.go +++ b/pkg/utils/db.go @@ -481,7 +481,6 @@ func ExtractTiDBVersion(version string) (*semver.Version, error) { // so we add gtid_purged to it. func AddGSetWithPurged(ctx context.Context, gset gtid.Set, conn *sql.Conn) (gtid.Set, error) { if _, ok := gset.(*gtid.MariadbGTIDSet); ok { - // TODO: check MariaDB really don't need this (MariaDB doesn't have a gtid_purged though) return gset, nil } From 0a89bb888d3d86ba56a60a5fef6b3e8c2ed360ce Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 19 Feb 2021 13:24:24 +0800 Subject: [PATCH 10/11] fix connection not closed --- relay/relay.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/relay/relay.go b/relay/relay.go index f2ff88e4a1..9d37f9ac53 100755 --- a/relay/relay.go +++ b/relay/relay.go @@ -383,6 +383,7 @@ func (r *Relay) tryRecoverLatestFile(ctx context.Context, parser2 *parser.Parser if err2 != nil { return err2 } + defer dbConn.Close() result.LatestGTIDs, err2 = utils.AddGSetWithPurged(ctx, result.LatestGTIDs, dbConn) if err2 != nil { return err2 @@ -1006,5 +1007,6 @@ func (r *Relay) adjustGTID(ctx context.Context, gset gtid.Set) (gtid.Set, error) if err2 != nil { return nil, err2 } + defer dbConn.Close() return utils.AddGSetWithPurged(ctx, resultGs, dbConn) } From bec582d6a6857360e9a9d053c2069e2fd5bb0f30 Mon Sep 17 00:00:00 2001 From: lance6716 Date: Fri, 19 Feb 2021 14:13:34 +0800 Subject: [PATCH 11/11] fix CI --- tests/gtid/conf/diff_config.toml | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/gtid/conf/diff_config.toml b/tests/gtid/conf/diff_config.toml index ca623a38af..82786b1628 100644 --- a/tests/gtid/conf/diff_config.toml +++ b/tests/gtid/conf/diff_config.toml @@ -8,8 +8,6 @@ check-thread-count = 4 sample-percent = 100 -use-rowid = false - use-checksum = true fix-sql-file = "fix.sql"