Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

syncer: update active relay log at fake rotate event #1448

Merged
merged 8 commits into from
Mar 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/binlog/common/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ var (
// MaxBinlogSyncerReconnect is the max reconnection times for binlog syncer in go-mysql
MaxBinlogSyncerReconnect = 60
// SlaveReadTimeout is slave read binlog data timeout, ref: https://dev.mysql.com/doc/refman/8.0/en/replication-options-slave.html#sysvar_slave_net_timeout
SlaveReadTimeout = 1 * time.Minute
masterHeartbeatPeriod = 30 * time.Second // master server send heartbeat period: ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
SlaveReadTimeout = 1 * time.Minute
// MasterHeartbeatPeriod is the master server send heartbeat period, ref: `MASTER_HEARTBEAT_PERIOD` in https://dev.mysql.com/doc/refman/8.0/en/change-master-to.html
MasterHeartbeatPeriod = 30 * time.Second
)

// SetDefaultReplicationCfg sets some default value for BinlogSyncerConfig
Expand All @@ -37,5 +38,5 @@ func SetDefaultReplicationCfg(cfg *replication.BinlogSyncerConfig, retryCount in
cfg.DisableRetrySync = true
}
cfg.ReadTimeout = SlaveReadTimeout
cfg.HeartbeatPeriod = masterHeartbeatPeriod
cfg.HeartbeatPeriod = MasterHeartbeatPeriod
}
3 changes: 2 additions & 1 deletion pkg/streamer/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"time"

"github.com/pingcap/dm/pkg/binlog/common"
"github.com/pingcap/dm/pkg/binlog/event"
"github.com/pingcap/dm/pkg/log"
"github.com/pingcap/dm/pkg/terror"
Expand All @@ -27,7 +28,7 @@ import (
)

var (
heartbeatInterval = 30 * time.Second
heartbeatInterval = common.MasterHeartbeatPeriod
)

// TODO: maybe one day we can make a pull request to go-mysql to support LocalStreamer.
Expand Down
1 change: 1 addition & 0 deletions syncer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func (s *Syncer) setInitActiveRelayLog(ctx context.Context) error {
return nil
}
err = s.readerHub.UpdateActiveRelayLog(s.cfg.Name, activeUUID, pos.Name)
s.recordedActiveRelayLog = true
s.tctx.L().Info("current earliest active relay log", log.WrapStringerField("active relay log", s.readerHub.EarliestActiveRelayLog()))
return err
}
Expand Down
19 changes: 18 additions & 1 deletion syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ type Syncer struct {
heartbeat *Heartbeat

readerHub *streamer.ReaderHub
// when user starts a new task with GTID and no binlog file name, we can't know active relay log at init time
// at this case, we update active relay log when receive fake rotate event
recordedActiveRelayLog bool

errOperatorHolder *operator.Holder

Expand Down Expand Up @@ -216,6 +219,7 @@ func NewSyncer(cfg *config.SubTaskConfig, etcdClient *clientv3.Client) *Syncer {
// only need to sync DDL in sharding mode
syncer.sgk = NewShardingGroupKeeper(syncer.tctx, cfg)
}
syncer.recordedActiveRelayLog = false

return syncer
}
Expand Down Expand Up @@ -1510,6 +1514,19 @@ func (s *Syncer) handleRotateEvent(ev *replication.RotateEvent, ec eventContext)
if string(ev.NextLogName) <= ec.lastLocation.Position.Name {
return nil // not rotate to the next binlog file, ignore it
}
if !s.recordedActiveRelayLog {
if err := s.updateActiveRelayLog(mysql.Position{
Name: string(ev.NextLogName),
Pos: uint32(ev.Position),
}); err != nil {
ec.tctx.L().Warn("failed to update active relay log, will try to update when flush checkpoint",
zap.ByteString("NextLogName", ev.NextLogName),
zap.Uint64("Position", ev.Position),
zap.Error(err))
} else {
s.recordedActiveRelayLog = true
}
}
}

*ec.currentLocation = binlog.InitLocation(
Expand Down Expand Up @@ -2729,7 +2746,7 @@ func (s *Syncer) setSyncCfg() error {
}

syncCfg := replication.BinlogSyncerConfig{
ServerID: uint32(s.cfg.ServerID),
ServerID: s.cfg.ServerID,
Flavor: s.cfg.Flavor,
Host: s.cfg.From.Host,
Port: uint16(s.cfg.From.Port),
Expand Down
14 changes: 11 additions & 3 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,6 @@ function run() {
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

# start a task in `incremental` mode
# using account with limited privileges
Expand Down Expand Up @@ -135,7 +133,17 @@ function run() {
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
dmctl_start_task $WORK_DIR/dm-task.yaml
dmctl_start_task $WORK_DIR/dm-task.yaml --remove-meta

# check not specify binlog name could also update active relay log
if [ $worker1_run_source_1 -gt 0 ]; then
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker1/log/dm-worker.log
else
grep -E ".*current earliest active relay log.*$name1" $WORK_DIR/worker2/log/dm-worker.log
fi

run_sql_file $cur/data/db1.increment.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_file $cur/data/db2.increment.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"query-status test" \
Expand Down
56 changes: 56 additions & 0 deletions tests/only_dml/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# diff Configuration.

log-level = "info"

chunk-size = 1000

check-thread-count = 4

sample-percent = 100

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "only_dml"
tables = ["~t.*"]

[[table-config]]
schema = "only_dml"
table = "t1"

[[table-config.source-tables]]
instance-id = "source-1"
schema = "only_dml"
table = "t1"

[[table-config]]
schema = "only_dml"
table = "t2"

[[table-config.source-tables]]
instance-id = "source-2"
schema = "only_dml"
table = "t2"

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = "123456"
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
3 changes: 3 additions & 0 deletions tests/only_dml/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
50 changes: 50 additions & 0 deletions tests/only_dml/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"
# enable-heartbeat: true
heartbeat-update-interval: 1
heartbeat-report-interval: 1
timezone: "Asia/Shanghai"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

- source-id: "mysql-replica-02"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["only_dml"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
worker-count: 16
batch: 100
checkpoint-flush-interval: 3
2 changes: 2 additions & 0 deletions tests/only_dml/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
2 changes: 2 additions & 0 deletions tests/only_dml/conf/dm-worker2.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker2"
join = "127.0.0.1:8261"
9 changes: 9 additions & 0 deletions tests/only_dml/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source-id: mysql-replica-01
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
10 changes: 10 additions & 0 deletions tests/only_dml/conf/source2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
source-id: mysql-replica-02
enable-gtid: true
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3307
checker:
check-enable: false
6 changes: 6 additions & 0 deletions tests/only_dml/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
drop database if exists `only_dml`;
reset master;
create database `only_dml`;
use `only_dml`;
create table t1 (id int, primary key(`id`));
insert into t1 values (1);
6 changes: 6 additions & 0 deletions tests/only_dml/data/db2.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
drop database if exists `only_dml`;
reset master;
create database `only_dml`;
use `only_dml`;
create table t2 (id int, primary key (`id`));
insert into t2 values (2);
107 changes: 107 additions & 0 deletions tests/only_dml/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#!/bin/bash

set -eu

cur=$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
TASK_NAME="test"
SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt"

function purge_relay_success() {
binlog_file=$1
source_id=$2
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"purge-relay --filename $binlog_file -s $source_id" \
"\"result\": true" 2
}

function run_sql_silent() {
TIDB_PORT=4000
user="root"
if [[ "$2" = $TIDB_PORT ]]; then
user="test"
fi
mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >> /dev/null
}

function insert_data() {
i=1

while true; do
sleep 1
run_sql_silent "insert into only_dml.t1 values ($(($i*2+1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "insert into only_dml.t2 values ($(($i*2+2)));" $MYSQL_PORT2 $MYSQL_PASSWORD2
((i++))
run_sql_silent "insert into only_dml.t1 values ($(($i*2+1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "insert into only_dml.t2 values ($(($i*2+2)));" $MYSQL_PORT2 $MYSQL_PASSWORD2
((i++))
run_sql_silent "flush logs;" $MYSQL_PORT1 $MYSQL_PASSWORD1
run_sql_silent "flush logs;" $MYSQL_PORT2 $MYSQL_PASSWORD2
done
}

function run() {

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 1 row affected'
run_sql_file $cur/data/db2.prepare.sql $MYSQL_HOST2 $MYSQL_PORT2 $MYSQL_PASSWORD2
check_contains 'Query OK, 1 row affected'

# bound source1 to worker1, source2 to worker2
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT

cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
cp $cur/conf/source2.yaml $WORK_DIR/source2.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker2/relay_log" $WORK_DIR/source2.yaml
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT
dmctl_operate_source create $WORK_DIR/source2.yaml $SOURCE_ID2

# start a task in all mode, and when enter incremental mode, we only execute DML
dmctl_start_task $cur/conf/dm-task.yaml
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

insert_data &
pid=$!
echo "PID of insert_data is $pid"

# check twice, make sure update active relay log could work for first time and later
for i in {1..2}; do
sleep 6
server_uuid1=$(tail -n 1 $WORK_DIR/worker1/relay-dir/server-uuid.index)
run_sql_source1 "show binary logs\G"
max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}')
earliest_relay_log1=`ls $WORK_DIR/worker1/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
purge_relay_success $max_binlog_name $SOURCE_ID1
earliest_relay_log2=`ls $WORK_DIR/worker1/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
echo "earliest_relay_log1: $earliest_relay_log1 earliest_relay_log2: $earliest_relay_log2"
[ "$earliest_relay_log1" != "$earliest_relay_log2" ]

server_uuid1=$(tail -n 1 $WORK_DIR/worker2/relay-dir/server-uuid.index)
run_sql_source2 "show binary logs\G"
max_binlog_name=$(grep Log_name "$SQL_RESULT_FILE"| tail -n 1 | awk -F":" '{print $NF}')
earliest_relay_log1=`ls $WORK_DIR/worker2/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
purge_relay_success $max_binlog_name $SOURCE_ID2
earliest_relay_log2=`ls $WORK_DIR/worker2/relay-dir/$server_uuid1 | grep -v 'relay.meta' | sort | head -n 1`
echo "earliest_relay_log1: $earliest_relay_log1 earliest_relay_log2: $earliest_relay_log2"
[ "$earliest_relay_log1" != "$earliest_relay_log2" ]
done

kill $pid
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions tests/others_integration.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ sequence_sharding_optimistic
sequence_sharding_removemeta
drop_column_with_index
gtid
only_dml