Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

syncer: support start from empty gtid config (#5340) #5376

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions dm/pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package gtid

import (
"strings"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"go.uber.org/zap"

"github.com/pingcap/tiflow/dm/pkg/log"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand Down Expand Up @@ -80,14 +84,37 @@ func ParserGTID(flavor, gtidStr string) (Set, error) {
case mysql.MariaDBFlavor:
m = &MariadbGTIDSet{}
case mysql.MySQLFlavor:
// check for xxx:0
if IsNilMySQLGTIDSet(gtidStr) {
log.L().Warn("get empty gtid set end with `0`", zap.String("gtid", gtidStr))
return MinGTIDSet(mysql.MySQLFlavor), nil
}
m = &MySQLGTIDSet{}
default:
return nil, terror.ErrNotSupportedFlavor.Generate(flavor)
}

err = m.Set(gtid)
return m, err
}

// check whether a gtid set is nil(start sync from start)
// mysql: uuid:0
// mariadb: 0-0-0(no need to handle)
func IsNilMySQLGTIDSet(gStr string) bool {
sp := strings.Split(gStr, ",")
if len(sp) != 1 {
return false
}

sep := strings.Split(sp[0], ":")
if len(sep) != 2 {
return false
}
interval := strings.TrimSpace(sep[1])
return interval == "0"
}

// MinGTIDSet returns the min GTID set.
func MinGTIDSet(flavor string) Set {
// use mysql as default
Expand Down
11 changes: 11 additions & 0 deletions dm/pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/go-mysql-org/go-mysql/mysql"
. "github.com/pingcap/check"
"github.com/stretchr/testify/require"

"github.com/pingcap/tiflow/dm/pkg/terror"
)
Expand Down Expand Up @@ -96,6 +97,16 @@ func (s *testGTIDSuite) TestSortingGTIDSet(c *C) {
c.Assert(gSet.String(), Equals, sortedGTIDSet)
}

func TestIsNilGTIDSet(t *testing.T) {
require.False(t, IsNilMySQLGTIDSet(""))
require.False(t, IsNilMySQLGTIDSet("xxxxx"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:0,yyyy:0"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:1-2"))
require.False(t, IsNilMySQLGTIDSet("xxxxx:0-0"))
require.True(t, IsNilMySQLGTIDSet("xxxxx:0"))
require.True(t, IsNilMySQLGTIDSet(" xxxxx:0 "))
}

func (s *testGTIDSuite) TestMinGTIDSet(c *C) {
gset := MinGTIDSet(mysql.MySQLFlavor)
_, ok := gset.(*MySQLGTIDSet)
Expand Down
4 changes: 2 additions & 2 deletions dm/relay/local_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type testReaderSuite struct {
func (t *testReaderSuite) SetUpSuite(c *C) {
var err error
t.lastPos = 0
t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:0")
t.lastGTID, err = gtid.ParserGTID(gmysql.MySQLFlavor, "ba8f633f-1f15-11eb-b1c7-0242ac110002:1")
c.Assert(err, IsNil)
c.Assert(failpoint.Enable("github.com/pingcap/tiflow/dm/relay/SetHeartbeatInterval", "return(10000)"), IsNil)
}
Expand Down Expand Up @@ -613,7 +613,7 @@ func (t *testReaderSuite) TestStartSyncByGTID(c *C) {
{
"ba8f633f-1f15-11eb-b1c7-0242ac110002",
"ba8f633f-1f15-11eb-b1c7-0242ac110002.000001",
"ba8f633f-1f15-11eb-b1c7-0242ac110002:0",
"ba8f633f-1f15-11eb-b1c7-0242ac110002:1",
[]FileEventResult{
{
"mysql.000001",
Expand Down
5 changes: 1 addition & 4 deletions dm/tests/incremental_mode/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
name: test
task-mode: task-mode-placeholder
task-mode: incremental
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
Expand All @@ -17,8 +17,6 @@ target-database:
mysql-instances:
- source-id: "mysql-replica-01"
meta:
binlog-name: binlog-name-placeholder-1
binlog-pos: binlog-pos-placeholder-1
binlog-gtid: binlog-gtid-placeholder-1
block-allow-list: "instance"
mydumper-config-name: "global"
Expand All @@ -29,7 +27,6 @@ mysql-instances:
meta:
binlog-name: binlog-name-placeholder-2
binlog-pos: binlog-pos-placeholder-2
binlog-gtid: binlog-gtid-placeholder-2
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
Expand Down
2 changes: 1 addition & 1 deletion dm/tests/incremental_mode/conf/source1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ source-id: mysql-replica-01
flavor: 'mysql'
enable-gtid: true
relay-binlog-name: ''
relay-binlog-gtid: ''
relay-binlog-gtid: binlog-gtid-placeholder
enable-relay: false
from:
host: 127.0.0.1
Expand Down
1 change: 1 addition & 0 deletions dm/tests/incremental_mode/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `incremental_mode`;
reset master;
create database `incremental_mode`;
use `incremental_mode`;
create table t1 (id int, name varchar(20), primary key(`id`));
Expand Down
1 change: 1 addition & 0 deletions dm/tests/incremental_mode/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
drop database if exists `incremental_mode`;
reset master;
create database `incremental_mode`;
use `incremental_mode`;
create table t2 (id int auto_increment, name varchar(20), primary key (`id`));
Expand Down
66 changes: 20 additions & 46 deletions dm/tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,22 @@ TASK_NAME="test"

API_VERSION="v1alpha1"

function get_uuid() {
uuid=$(echo "show variables like '%server_uuid%';" | MYSQL_PWD=123456 mysql -uroot -h$1 -P$2 | awk 'FNR == 2 {print $2}')
echo $uuid
}

function get_binlog_name() {
binlog_name=$(echo "SHOW BINARY LOGS;" | MYSQL_PWD=123456 mysql -uroot -h127.0.0.1 -P3307 | awk 'FNR == 2 {print $1}')
echo $binlog_name
}

function run() {
run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 2 rows affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 3 rows affected'
uuid=($(get_uuid $MYSQL_HOST1 $MYSQL_PORT1))

export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/dm/worker/defaultKeepAliveTTL=return(1)"

Expand All @@ -38,6 +49,7 @@ function run() {
# operate mysql config to worker
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "s/binlog-gtid-placeholder/$uuid:0/g" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1
Expand Down Expand Up @@ -67,43 +79,10 @@ function run() {
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

# start a task in `full` mode
echo "start task in full mode"
cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/task-mode-placeholder/full/g" $WORK_DIR/dm-task.yaml
# avoid cannot unmarshal !!str `binlog-...` into uint32 error
sed -i "s/binlog-pos-placeholder-1/4/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml

check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

dmctl_stop_task $TASK_NAME

# $worker1_run_source_1 > 0 means source1 is operated to worker1
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
if [ $worker1_run_source_1 -gt 0 ]; then
name1=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos1=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid1=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
name2=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos2=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid2=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
else
name2=$(grep "Log: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos2=$(grep "Pos: " $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid2=$(grep "GTID:" $WORK_DIR/worker1/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
name1=$(grep "Log: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
pos1=$(grep "Pos: " $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2}' | tr -d ' ')
gtid1=$(grep "GTID:" $WORK_DIR/worker2/dumped_data.$TASK_NAME/metadata | awk -F: '{print $2,":",$3}' | tr -d ' ')
fi
# kill worker1 and worker2
kill_dm_worker
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

# start a task in `incremental` mode

# using account with limited privileges
run_sql_file $cur/data/db1.prepare.user.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_count 'Query OK, 0 rows affected' 7
Expand All @@ -112,7 +91,6 @@ function run() {

# update mysql config
sed -i "s/root/dm_incremental/g" $WORK_DIR/source1.yaml
sed -i "s/relay-binlog-gtid: ''/relay-binlog-gtid: '$gtid1'/g" $WORK_DIR/source1.yaml
sed -i "s/root/dm_incremental/g" $WORK_DIR/source2.yaml

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
Expand Down Expand Up @@ -142,16 +120,12 @@ function run() {
"start-relay -s $worker2bound worker2" \
"\"result\": true" 2

echo "start task in incremental mode"
cat $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/task-mode-placeholder/incremental/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-name-placeholder-1//g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-1//g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-gtid-placeholder-1/$gtid1/g" $WORK_DIR/dm-task.yaml

sed -i "s/binlog-name-placeholder-2/$name2/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/$pos2/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-gtid-placeholder-2/$gtid2/g" $WORK_DIR/dm-task.yaml
worker1_run_source_1=$(sed "s/$SOURCE_ID1/$SOURCE_ID1\n/g" $WORK_DIR/worker1/log/dm-worker.log | grep -c "$SOURCE_ID1") || true
echo "start task in incremental mode with zero gtid/pos"
binlog_name=($(get_binlog_name $MYSQL_HOST2 $MYSQL_PORT2))
sed "s/binlog-gtid-placeholder-1/$uuid:0/g" $cur/conf/dm-task.yaml >$WORK_DIR/dm-task.yaml
sed -i "s/binlog-name-placeholder-2/$binlog_name/g" $WORK_DIR/dm-task.yaml
sed -i "s/binlog-pos-placeholder-2/4/g" $WORK_DIR/dm-task.yaml

# test graceful display error
export GO_FAILPOINTS='github.com/pingcap/tiflow/dm/syncer/GetEventError=return'
Expand Down Expand Up @@ -179,9 +153,9 @@ function run() {
sleep 3
# check not specify binlog name could also update active relay log
if [ $worker1_run_source_1 -gt 0 ]; then
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker1/log/dm-worker.log
grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker2/log/dm-worker.log
else
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker2/log/dm-worker.log
grep -E ".*current earliest active relay log.*$binlog_name" $WORK_DIR/worker1/log/dm-worker.log
fi

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
Expand Down