Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(dm): add async checkpoint flush integration test #4538

Merged
merged 7 commits into from
Mar 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions dm/syncer/checkpoint_flush_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package syncer
import (
"fmt"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-tools/pkg/filter"
"go.uber.org/atomic"
"go.uber.org/zap"
Expand Down Expand Up @@ -88,6 +90,13 @@ func (w *checkpointFlushWorker) Run(ctx *tcontext.Context) {
}

err = w.cp.FlushPointsExcept(ctx, task.snapshotInfo.id, task.exceptTables, task.shardMetaSQLs, task.shardMetaArgs)
failpoint.Inject("AsyncCheckpointFlushThrowError", func() {
if isAsyncFlush {
ctx.L().Warn("async checkpoint flush error triggered", zap.String("failpoint", "AsyncCheckpointFlushThrowError"))
err = errors.New("async checkpoint flush throw error")
}
})

if err != nil {
ctx.L().Warn(fmt.Sprintf("%s checkpoint snapshot failed, ignore this error", flushLogMsg), zap.Any("flushCpTask", task), zap.Error(err))
// async flush error will be skipped here but sync flush error will raised up
Expand Down
29 changes: 29 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# diff Configuration.

check-thread-count = 4

export-fix-sql = true

check-struct-only = false

[task]
output-dir = "/tmp/ticdc_dm_test/output"

source-instances = ["mysql1"]

target-instance = "tidb0"

target-check-tables = ["async_checkpoint_flush.t?*"]

[data-sources]
[data-sources.mysql1]
host = "127.0.0.1"
port = 3306
user = "root"
password = "123456"

[data-sources.tidb0]
host = "127.0.0.1"
port = 4000
user = "test"
password = "123456"
4 changes: 4 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Master Configuration.
master-addr = ":8261"
advertise-addr = "127.0.0.1:8261"
auto-compaction-retention = "3s"
40 changes: 40 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-task.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
---
name: test
task-mode: all
is-sharding: false
meta-schema: "dm_meta"

target-database:
host: "127.0.0.1"
port: 4000
user: "root"
password: ""

mysql-instances:
- source-id: "mysql-replica-01"
block-allow-list: "instance"
mydumper-config-name: "global"
loader-config-name: "global"
syncer-config-name: "global"

block-allow-list:
instance:
do-dbs: ["async_checkpoint_flush"]

mydumpers:
global:
threads: 4
chunk-filesize: 64
skip-tz-utc: true
extra-args: ""

loaders:
global:
pool-size: 16
dir: "./dumped_data"

syncers:
global:
checkpoint-flush-interval: 10
experimental:
async-checkpoint-flush: true
2 changes: 2 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/dm-worker1.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
name = "worker1"
join = "127.0.0.1:8261"
9 changes: 9 additions & 0 deletions dm/tests/async_checkpoint_flush/conf/source1.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
source-id: mysql-replica-01
enable-relay: true
from:
host: 127.0.0.1
user: root
password: /Q7B9DizNLLTTfiZHv9WoEAKamfpIUs=
port: 3306
checker:
check-enable: false
5 changes: 5 additions & 0 deletions dm/tests/async_checkpoint_flush/data/db1.prepare.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
drop database if exists `async_checkpoint_flush`;
reset master;
create database `async_checkpoint_flush`;
use `async_checkpoint_flush`;
create table t1 (id int, primary key(`id`));
80 changes: 80 additions & 0 deletions dm/tests/async_checkpoint_flush/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/bin/bash

set -eu

cur=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
source $cur/../_utils/test_prepare
WORK_DIR=$TEST_DIR/$TEST_NAME
TASK_NAME="test"
SQL_RESULT_FILE="$TEST_DIR/sql_res.$TEST_NAME.txt"

function run_sql_silent() {
TIDB_PORT=4000
user="root"
if [[ "$2" = $TIDB_PORT ]]; then
user="test"
fi
mysql -u$user -h127.0.0.1 -P$2 -p$3 --default-character-set utf8 -E -e "$1" >>/dev/null
}

function insert_data() {
i=1

while true; do
run_sql_silent "insert into async_checkpoint_flush.t1 values ($(($i * 2 + 1)));" $MYSQL_PORT1 $MYSQL_PASSWORD1
((i++))
done
}

function run() {
export GO_FAILPOINTS="github.com/pingcap/tiflow/dm/syncer/AsyncCheckpointFlushThrowError=return(true)"

run_sql_file $cur/data/db1.prepare.sql $MYSQL_HOST1 $MYSQL_PORT1 $MYSQL_PASSWORD1
check_contains 'Query OK, 1 row affected'

# run dm master
run_dm_master $WORK_DIR/master $MASTER_PORT $cur/conf/dm-master.toml
check_rpc_alive $cur/../bin/check_master_online 127.0.0.1:$MASTER_PORT
check_metric $MASTER_PORT 'start_leader_counter' 3 0 2

# copy config file
cp $cur/conf/source1.yaml $WORK_DIR/source1.yaml
sed -i "/relay-binlog-name/i\relay-dir: $WORK_DIR/worker1/relay_log" $WORK_DIR/source1.yaml

# bound source1 to worker1, source2 to worker2
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
dmctl_operate_source create $WORK_DIR/source1.yaml $SOURCE_ID1

# check dm-workers metrics unit: relay file index must be 1.
check_metric $WORKER1_PORT "dm_relay_binlog_file" 3 0 2

# start a task in all mode, and when enter incremental mode, we only execute DML
dmctl_start_task_standalone $cur/conf/dm-task.yaml

# check task has started state=2 running
check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"$TASK_NAME\",worker=\"worker1\"}" 10 1 3

# check diff
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

insert_data &
pid=$!
echo "PID of insert_data is $pid"

sleep 30

kill $pid
check_log_contain_with_retry 'async flush checkpoint snapshot failed, ignore this error' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry 'sync flush checkpoint snapshot successfully' $WORK_DIR/worker1/log/dm-worker.log
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
export GO_FAILPOINTS=""
}

cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup_process $*
run $*
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
1 change: 1 addition & 0 deletions dm/tests/others_integration_1.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ lightning_mode
slow_relay_writer
sync_collation
s3_dumpling_lighting
async_checkpoint_flush