Skip to content

Commit

Permalink
syncer: support start from empty gtid config (#5340)
Browse files Browse the repository at this point in the history
close #3731
  • Loading branch information
GMHDBJD authored May 10, 2022
1 parent 57c19c8 commit 0c4c019
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 53 deletions.
27 changes: 27 additions & 0 deletions dm/pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package gtid

import (
"strings"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -80,14 +84,37 @@ func ParserGTID(flavor, gtidStr string) (Set, error) {
case mysql.MariaDBFlavor:
m = &MariadbGTIDSet{}
case mysql.MySQLFlavor:
// check for xxx:0
if IsNilMySQLGTIDSet(gtidStr) {
log.L().Warn("get empty gtid set end with `0`", zap.String("gtid", gtidStr))
return MinGTIDSet(mysql.MySQLFlavor), nil
}
m = &MySQLGTIDSet{}
default:
return nil, terror.ErrNotSupportedFlavor.Generate(flavor)
}

err = m.Set(gtid)
return m, err
}

// check whether a gtid set is nil(start sync from start)
// mysql: uuid:0
// mariadb: 0-0-0(no need to handle)
func IsNilMySQLGTIDSet(gStr string) bool {
sp := strings.Split(gStr, ",")
if len(sp) != 1 {
return false
}

sep := strings.Split(sp[0], ":")
if len(sep) != 2 {
return false
}
interval := strings.TrimSpace(sep[1])
return interval == "0"
}

// MinGTIDSet returns the min GTID set.
func MinGTIDSet(flavor string) Set {
// use mysql as default
Expand Down
11 changes: 11 additions & 0 deletions dm/pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -96,6 +97,16 @@ func (s *testGTIDSuite) TestSortingGTIDSet(c *C) {
c.Assert(gSet.String(), Equals, sortedGTIDSet)
}

func TestIsNilGTIDSet(t *testing.T) {
require.False(t, IsNilMySQLGTIDSet(""))
require.False(t, IsNilMySQLGTIDSet("xxxxx"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:0,yyyy:0"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:1-2"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:0-0"))
require.True(t, IsNilMySQLGTIDSet("xxxxx:0"))
require.True(t, IsNilMySQLGTIDSet(" xxxxx:0 "))
}

func (s *testGTIDSuite) TestMinGTIDSet(c *C) {
gset := MinGTIDSet(mysql.MySQLFlavor)
_, ok := gset.(*MySQLGTIDSet)
Expand Down
4 changes: 2 additions & 2 deletions dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type testReaderSuite struct {
func (t *testReaderSuite) SetUpSuite(c *C) {
var err error
t.lastPos = 0
t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0")
t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
c.Assert(err, IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/relay/SetHeartbeatInterval", "return(10000)"), IsNil)
}
Expand Down Expand Up @@ -613,7 +613,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
{
"ba8f633f-1f15-11eb-b1c7-0242ac110002",
"ba8f633f-1f15-11eb-b1c7-0242ac110002.000001",
"ba8f633f-1f15-11eb-b1c7-0242ac110002:0",
"ba8f633f-1f15-11eb-b1c7-0242ac110002:1",
[]FileEventResult{
{
"mysql.000001",
Expand Down
5 changes: 1 addition & 4 deletions dm/tests/incremental_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: test
task-mode: task-mode-placeholder
task-mode: incremental
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
Expand All @@ -17,8 +17,6 @@ target-database:
mysql-instances:
- source-id: "mysql-replica-01"
meta:
binlog-name: binlog-name-placeholder-1
binlog-pos: binlog-pos-placeholder-1
binlog-gtid: binlog-gtid-placeholder-1
block-allow-list: "instance"
mydumper-config-name: "global"
Expand All @@ -29,7 +27,6 @@ mysql-instances:
meta:
binlog-name: binlog-name-placeholder-2
binlog-pos: binlog-pos-placeholder-2
binlog-gtid: binlog-gtid-placeholder-2
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/incremental_mode/conf/source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ source-id: mysql-replica-01
flavor: 'mysql'
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
relay-binlog-gtid: binlog-gtid-placeholder
enable-relay: false
from:
host: 127.0.0.1
Expand Down
1 change: 1 addition & 0 deletions dm/tests/incremental_mode/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `incremental_mode`;
reset master;
create database `incremental_mode`;
use `incremental_mode`;
create table t1 (id int, name varchar(20), primary key(`id`));
Expand Down
1 change: 1 addition & 0 deletions dm/tests/incremental_mode/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `incremental_mode`;
reset master;
create database `incremental_mode`;
use `incremental_mode`;
create table t2 (id int auto_increment, name varchar(20), primary key (`id`));
Expand Down
66 changes: 20 additions & 46 deletions dm/tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ TASK_NAME="test"

API_VERSION="v1alpha1"

function get_uuid() {
uuid=$(echo "show variables like '%server_uuid%';" | MYSQL_PWD=123456 mysql -uroot -h$1 -P$2 | awk 'FNR == 2 {print $2}')
echo $uuid
}

function get_binlog_name() {
binlog_name=$(echo "SHOW BINARY LOGS;" | MYSQL_PWD=123456 mysql -uroot -h127.0.0.1 -P3307 | awk 'FNR == 2 {print $1}')
echo $binlog_name
}

function run() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'
uuid=($(get_uuid $MYSQL_HOST1 $MYSQL_PORT1))

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/dm/worker/defaultKeepAliveTTL=return(1)"

Expand All @@ -38,6 +49,7 @@ function run() {
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "s/binlog-gtid-placeholder/$uuid:0/g" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
Expand Down Expand Up @@ -67,43 +79,10 @@ function run() {
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# start a task in `full` mode
echo "start task in full mode"
cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/task-mode-placeholder/full/g" $WORK_DIR/dm-task.yaml
# avoid cannot unmarshal !!str `binlog-...` into uint32 error
sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

dmctl_stop_task $TASK_NAME

# $worker1_run_source_1 > 0 means source1 is operated to worker1
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
if [ $worker1_run_source_1 -gt 0 ]; then
name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
else
name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
fi
# kill worker1 and worker2
kill_dm_worker
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

# start a task in `incremental` mode

# using account with limited privileges
run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
Expand All @@ -112,7 +91,6 @@ function run() {

# update mysql config
sed -i "s/root/dm_incremental/g" $WORK_DIR/source1.yaml
sed -i "s/relay-binlog-gtid: ''/relay-binlog-gtid: '$gtid1'/g" $WORK_DIR/source1.yaml
sed -i "s/root/dm_incremental/g" $WORK_DIR/source2.yaml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down Expand Up @@ -142,16 +120,12 @@ function run() {
"start-relay -s $worker2bound worker2" \
"\"result\": true" 2

echo "start task in incremental mode"
cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/task-mode-placeholder/incremental/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-name-placeholder-1//g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-1//g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml

sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
echo "start task in incremental mode with zero gtid/pos"
binlog_name=($(get_binlog_name $MYSQL_HOST2 $MYSQL_PORT2))
sed "s/binlog-gtid-placeholder-1/$uuid:0/g" $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/binlog-name-placeholder-2/$binlog_name/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml

# test graceful display error
export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/GetEventError=return'
Expand Down Expand Up @@ -179,9 +153,9 @@ function run() {
sleep 3
# check not specify binlog name could also update active relay log
if [ $worker1_run_source_1 -gt 0 ]; then
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker1/log/dm-worker.log
grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker2/log/dm-worker.log
else
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker2/log/dm-worker.log
grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker1/log/dm-worker.log
fi

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down

0 comments on commit 0c4c019

Please sign in to comment.