Skip to content

Commit

Permalink
cherry pick pingcap#1448 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
lance6716 authored and ti-srebot committed Mar 1, 2021
1 parent 39a804b commit 1b9c016
Show file tree
Hide file tree
Showing 16 changed files with 288 additions and 8 deletions.
7 changes: 4 additions & 3 deletions pkg/binlog/common/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ var (
// MaxBinlogSyncerReconnect is the max reconnection times for binlog syncer in go-mysql
MaxBinlogSyncerReconnect = 60
// SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout
SlaveReadTimeout = 1 * time.Minute
masterHeartbeatPeriod = 30 * time.Second // master server send heartbeat period: ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
SlaveReadTimeout = 1 * time.Minute
// MasterHeartbeatPeriod is the master server send heartbeat period, ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
MasterHeartbeatPeriod = 30 * time.Second
)

// SetDefaultReplicationCfg sets some default value for BinlogSyncerConfig
Expand All @@ -37,5 +38,5 @@ func SetDefaultReplicationCfg(cfg *replication.BinlogSyncerConfig, retryCount in
cfg.DisableRetrySync = true
}
cfg.ReadTimeout = SlaveReadTimeout
cfg.HeartbeatPeriod = masterHeartbeatPeriod
cfg.HeartbeatPeriod = MasterHeartbeatPeriod
}
3 changes: 2 additions & 1 deletion pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -27,7 +28,7 @@ import (
)

var (
heartbeatInterval = 30 * time.Second
heartbeatInterval = common.MasterHeartbeatPeriod
)

// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.
Expand Down
1 change: 1 addition & 0 deletions syncer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *Syncer) setInitActiveRelayLog(ctx context.Context) error {
return nil
}
err = s.readerHub.UpdateActiveRelayLog(s.cfg.Name, activeUUID, pos.Name)
s.recordedActiveRelayLog = true
s.tctx.L().Info("current earliest active relay log", log.WrapStringerField("active relay log", s.readerHub.EarliestActiveRelayLog()))
return err
}
Expand Down
19 changes: 18 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type Syncer struct {
heartbeat *Heartbeat

readerHub *streamer.ReaderHub
// when user starts a new task with GTID and no binlog file name, we can't know active relay log at init time
// at this case, we update active relay log when receive fake rotate event
recordedActiveRelayLog bool

errOperatorHolder *operator.Holder

Expand Down Expand Up @@ -216,6 +219,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer {
// only need to sync DDL in sharding mode
syncer.sgk = NewShardingGroupKeeper(syncer.tctx, cfg)
}
syncer.recordedActiveRelayLog = false

return syncer
}
Expand Down Expand Up @@ -1510,6 +1514,19 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext)
if string(ev.NextLogName) <= ec.lastLocation.Position.Name {
return nil // not rotate to the next binlog file, ignore it
}
if !s.recordedActiveRelayLog {
if err := s.updateActiveRelayLog(mysql.Position{
Name: string(ev.NextLogName),
Pos: uint32(ev.Position),
}); err != nil {
ec.tctx.L().Warn("failed to update active relay log, will try to update when flush checkpoint",
zap.ByteString("NextLogName", ev.NextLogName),
zap.Uint64("Position", ev.Position),
zap.Error(err))
} else {
s.recordedActiveRelayLog = true
}
}
}

*ec.currentLocation = binlog.InitLocation(
Expand Down Expand Up @@ -2729,7 +2746,7 @@ func (s *Syncer) setSyncCfg() error {
}

syncCfg := replication.BinlogSyncerConfig{
ServerID: uint32(s.cfg.ServerID),
ServerID: s.cfg.ServerID,
Flavor: s.cfg.Flavor,
Host: s.cfg.From.Host,
Port: uint16(s.cfg.From.Port),
Expand Down
14 changes: 11 additions & 3 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ function run() {
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

# start a task in `incremental` mode
# using account with limited privileges
Expand Down Expand Up @@ -135,7 +133,17 @@ function run() {
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
dmctl_start_task $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml --remove-meta

# check not specify binlog name could also update active relay log
if [ $worker1_run_source_1 -gt 0 ]; then
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker1/log/dm-worker.log
else
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker2/log/dm-worker.log
fi

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
Expand Down
56 changes: 56 additions & 0 deletions tests/only_dml/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "only_dml"
tables = ["~t.*"]

[[table-config]]
schema = "only_dml"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "only_dml"
table = "t1"

[[table-config]]
schema = "only_dml"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "only_dml"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
3 changes: 3 additions & 0 deletions tests/only_dml/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
50 changes: 50 additions & 0 deletions tests/only_dml/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
timezone: "Asia/Shanghai"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["only_dml"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
checkpoint-flush-interval: 3
2 changes: 2 additions & 0 deletions tests/only_dml/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
2 changes: 2 additions & 0 deletions tests/only_dml/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
9 changes: 9 additions & 0 deletions tests/only_dml/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source-id: mysql-replica-01
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
10 changes: 10 additions & 0 deletions tests/only_dml/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
source-id: mysql-replica-02
enable-gtid: true
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
checker:
check-enable: false
6 changes: 6 additions & 0 deletions tests/only_dml/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
drop database if exists `only_dml`;
reset master;
create database `only_dml`;
use `only_dml`;
create table t1 (id int, primary key(`id`));
insert into t1 values (1);
6 changes: 6 additions & 0 deletions tests/only_dml/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
drop database if exists `only_dml`;
reset master;
create database `only_dml`;
use `only_dml`;
create table t2 (id int, primary key (`id`));
insert into t2 values (2);
107 changes: 107 additions & 0 deletions tests/only_dml/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/bin/bash

set -eu

cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
TASK_NAME="test"
SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt"

function purge_relay_success() {
binlog_file=$1
source_id=$2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"purge-relay --filename $binlog_file -s $source_id" \
"\"result\": true" 2
}

function run_sql_silent() {
TIDB_PORT=4000
user="root"
if [[ "$2" = $TIDB_PORT ]]; then
user="test"
fi
mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >> /dev/null
}

function insert_data() {
i=1

while true; do
sleep 1
run_sql_silent "insert into only_dml.t1 values ($(($i*2+1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "insert into only_dml.t2 values ($(($i*2+2)));" $MYSQL_PORT2 $MYSQL_PASSWORD2
((i++))
run_sql_silent "insert into only_dml.t1 values ($(($i*2+1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "insert into only_dml.t2 values ($(($i*2+2)));" $MYSQL_PORT2 $MYSQL_PASSWORD2
((i++))
run_sql_silent "flush logs;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "flush logs;" $MYSQL_PORT2 $MYSQL_PASSWORD2
done
}

function run() {

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 1 row affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 1 row affected'

# bound source1 to worker1, source2 to worker2
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start a task in all mode, and when enter incremental mode, we only execute DML
dmctl_start_task $cur/conf/dm-task.yaml
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

insert_data &
pid=$!
echo "PID of insert_data is $pid"

# check twice, make sure update active relay log could work for first time and later
for i in {1..2}; do
sleep 6
server_uuid1=$(tail -n 1 $WORK_DIR/worker1/relay-dir/server-uuid.index)
run_sql_source1 "show binary logs\G"
max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}')
earliest_relay_log1=`ls $WORK_DIR/worker1/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
purge_relay_success $max_binlog_name $SOURCE_ID1
earliest_relay_log2=`ls $WORK_DIR/worker1/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
echo "earliest_relay_log1: $earliest_relay_log1 earliest_relay_log2: $earliest_relay_log2"
[ "$earliest_relay_log1" != "$earliest_relay_log2" ]

server_uuid1=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
run_sql_source2 "show binary logs\G"
max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}')
earliest_relay_log1=`ls $WORK_DIR/worker2/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
purge_relay_success $max_binlog_name $SOURCE_ID2
earliest_relay_log2=`ls $WORK_DIR/worker2/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
echo "earliest_relay_log1: $earliest_relay_log1 earliest_relay_log2: $earliest_relay_log2"
[ "$earliest_relay_log1" != "$earliest_relay_log2" ]
done

kill $pid
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions tests/others_integration.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ sequence_sharding_optimistic
sequence_sharding_removemeta
drop_column_with_index
gtid
only_dml

0 comments on commit 1b9c016

Please sign in to comment.