From 775e63942b989b46d12e5ad65b12e54956030a84 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Fri, 12 Nov 2021 16:51:07 +0800 Subject: [PATCH 1/5] commit-message: add integration test for dml using downstream schema --- dm/syncer/compactor.go | 12 +++++ dm/tests/_utils/test_prepare | 21 +++++++++ dm/tests/shardddl1/run.sh | 86 ++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+) diff --git a/dm/syncer/compactor.go b/dm/syncer/compactor.go index f9875e63dac..0e71fb2652c 100644 --- a/dm/syncer/compactor.go +++ b/dm/syncer/compactor.go @@ -14,6 +14,7 @@ package syncer import ( + "strconv" "time" "github.com/pingcap/failpoint" @@ -165,6 +166,17 @@ func (c *compactor) compactJob(j *job) { } key := j.dml.identifyKey() + + failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) { + value, err := strconv.Atoi(key) + upper := v.(int) + if err != nil || value > upper { + c.logger.Debug("downstream identifyKey check failed.", zap.Error(err), zap.String("identifyKey", key)) + } else { + c.logger.Debug("downstream identifyKey check success.") + } + }) + prevPos, ok := tableKeyMap[key] // if no such key in the buffer, add it if !ok { diff --git a/dm/tests/_utils/test_prepare b/dm/tests/_utils/test_prepare index de838513889..d188065a442 100644 --- a/dm/tests/_utils/test_prepare +++ b/dm/tests/_utils/test_prepare @@ -229,6 +229,27 @@ function run_sql_tidb_with_retry() { fi } +# shortcut for run tidb sql and check result with retry +function run_sql_tidb_with_retry_times() { + rc=0 + for ((k=1; k<$3; k++)); do + run_sql_tidb "$1" + if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then + rc=1 + break + fi + echo "run tidb sql failed $k-th time, retry later" + sleep 2 + done + if [[ $rc = 0 ]]; then + echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'" + echo "____________________________________" + cat "$TEST_DIR/sql_res.$TEST_NAME.txt" + echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^" + exit 1 + fi +} + # shortcut for check log contain with retry function check_log_contain_with_retry() { text=$1 diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 4a7583d0976..543c41bf734 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -611,6 +611,61 @@ function DM_COMPACT() { "clean_table" "" } +function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { + END=10 + for i in $(seq 0 $END); do + run_sql_source1 "insert into ${shardddl1}.${tb1}(b,c) values($i,$i)" + run_sql_source1 "update ${shardddl1}.${tb1} set c=1 where a=$((i * 2 + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i * 2 + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i * 2 + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i * 2 + 100))" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i * 2 + 200))" + run_sql_source1 "insert into ${shardddl1}.${tb1}(b,c) values($i,$i)" + done + run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb};" "count(1): 11" 30 + run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100; + insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 + downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l) + downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l) + if [[ "$downstreamSuccess" -le 50 || "$downstreamFail" -ne 0 ]]; then + echo "compact use wrong downstream identify key" + exit 1 + fi + compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l) + if [[ "$compactCnt" -le 50 ]]; then + echo "compact $compactCnt dmls which is less than 50" + exit 1 + fi +} + +function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { + # mock downstream pk/uk/column is diffrent with upstream, compact use downstream schema. + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/SkipFlushCompactor=return();github.com/pingcap/ticdc/dm/syncer/DownstreamIdentifyKeyCheckInCompact=return(20)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + + run_case COMPACT_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100;\"; + run_sql_tidb \"create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \ + "clean_table" "" + + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT +} + function DM_MULTIPLE_ROWS_CASE() { END=100 for i in $(seq 1 10 $END); do @@ -677,13 +732,43 @@ function DM_CAUSALITY() { "clean_table" "" } +function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() { + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)" + run_sql_source1 "update ${shardddl1}.${tb1} set a=3, b=4 where b=3" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=1" + run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,3)" + + run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb} where a =1 and b=3;" "count(1): 1" 30 + run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key, b int unique); + insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + check_sync_diff $WORK_DIR $cur/conf/diff_config.toml + + causalityCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "meet causality key, will generate a conflict job to flush all sqls" | wc -l) + if [[ "$causalityCnt" -ne 0 ]]; then + echo "causalityCnt is $causalityCnt, but it should be 0" + exit 1 + fi +} + +function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() { + # mock downstream pk/uk/column is diffrent with upstream, causality use downstream schema. + run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\"; + run_sql_tidb \"create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \ + "clean_table" "" +} + function run() { init_cluster init_database DM_COMPACT + DM_COMPACT_USE_DOWNSTREAM_SCHEMA DM_MULTIPLE_ROWS DM_CAUSALITY + DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA DM_UpdateBARule DM_RENAME_TABLE DM_RENAME_COLUMN_OPTIMISTIC @@ -697,6 +782,7 @@ function run() { DM_${i} sleep 1 done + } cleanup_data $shardddl From 021f7786c19041466a280dd81a5ed05db276306e Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Mon, 15 Nov 2021 17:04:19 +0800 Subject: [PATCH 2/5] fix DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE --- dm/tests/shardddl1/run.sh | 55 +++++++++++++++++++++------------------ 1 file changed, 30 insertions(+), 25 deletions(-) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index 543c41bf734..eb616ca1982 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -614,26 +614,30 @@ function DM_COMPACT() { function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { END=10 for i in $(seq 0 $END); do - run_sql_source1 "insert into ${shardddl1}.${tb1}(b,c) values($i,$i)" - run_sql_source1 "update ${shardddl1}.${tb1} set c=1 where a=$((i * 2 + 100))" - run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i * 2 + 100))" - run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i * 2 + 100))" - run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i * 2 + 100))" - run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i * 2 + 200))" - run_sql_source1 "insert into ${shardddl1}.${tb1}(b,c) values($i,$i)" + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" + run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i + 100))" + run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i + 100))" + run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 200))" + run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" done run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb};" "count(1): 11" 30 run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100; - insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; - drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 + # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20 + # As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)" + # We should avoid this kind of sql to make sure the count of $downstreamSuccess downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l) downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l) - if [[ "$downstreamSuccess" -le 50 || "$downstreamFail" -ne 0 ]]; then - echo "compact use wrong downstream identify key" + if [[ "$downstreamSuccess" -ne 88 || "$downstreamFail" -ne 0 ]]; then + echo "compact use wrong downstream identify key. $downstreamSuccess success should be 88. $downstreamFail failed should be 0." exit 1 fi compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l) + # As compact is affected by "j.tp == flush", the check count of compact use "-le 50" if [[ "$compactCnt" -le 50 ]]; then echo "compact $compactCnt dmls which is less than 50" exit 1 @@ -652,18 +656,9 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT run_case COMPACT_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ - "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100;\"; - run_sql_tidb \"create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \ + "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique not null, c int);\"; + run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \ "clean_table" "" - - ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true - check_port_offline $WORKER1_PORT 20 - check_port_offline $WORKER2_PORT 20 - export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/BlockExecuteSQLs=return(1)' - run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml - run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT - check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT } function DM_MULTIPLE_ROWS_CASE() { @@ -701,6 +696,16 @@ function DM_MULTIPLE_ROWS_CASE() { } function DM_MULTIPLE_ROWS() { + + ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true + check_port_offline $WORKER1_PORT 20 + check_port_offline $WORKER2_PORT 20 + export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/BlockExecuteSQLs=return(1)' + run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml + run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT + check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT + run_case MULTIPLE_ROWS "single-source-no-sharding" \ "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique, c int);\"" \ "clean_table" "" @@ -741,8 +746,8 @@ function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() { run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb} where a =1 and b=3;" "count(1): 1" 30 run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key, b int unique); - insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb}; - drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" + insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb}; + drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml causalityCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "meet causality key, will generate a conflict job to flush all sqls" | wc -l) @@ -756,7 +761,7 @@ function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() { # mock downstream pk/uk/column is diffrent with upstream, causality use downstream schema. run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\"; - run_sql_tidb \"create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \ + run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \ "clean_table" "" } From f3e16506250f0b2da0c54449376eb7f7f2aa1069 Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Tue, 16 Nov 2021 18:21:03 +0800 Subject: [PATCH 3/5] commit-message: add comment --- dm/tests/shardddl1/run.sh | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index eb616ca1982..ee515063df4 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -613,6 +613,8 @@ function DM_COMPACT() { function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { END=10 + # As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)" + # We should avoid this kind of sql to make sure the count of dmls for i in $(seq 0 $END); do run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))" @@ -627,11 +629,13 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 - # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20 - # As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)" - # We should avoid this kind of sql to make sure the count of $downstreamSuccess + # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20. + # This goal is check whether it use downstream schema in compator. + # if use downstream schema, key will be 'b' with value less than 20. + # If use upstream schema, key will be 'a' with value greater than 100. downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l) downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l) + # As update pk/uk row will be divided into delete and update, the count will be 8 in each loop, so total is 11*8 = 88 if [[ "$downstreamSuccess" -ne 88 || "$downstreamFail" -ne 0 ]]; then echo "compact use wrong downstream identify key. $downstreamSuccess success should be 88. $downstreamFail failed should be 0." exit 1 @@ -645,7 +649,7 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { } function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { - # mock downstream pk/uk/column is diffrent with upstream, compact use downstream schema. + # downstream pk/uk/column is diffrent with upstream, compact use downstream schema. ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 @@ -758,7 +762,7 @@ function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() { } function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() { - # mock downstream pk/uk/column is diffrent with upstream, causality use downstream schema. + # downstream pk/uk/column is diffrent with upstream, causality use downstream schema. run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \ "run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\"; run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \ From 6a593197cf5dce11ff6915d5448ee12cf51d571c Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Wed, 17 Nov 2021 10:40:33 +0800 Subject: [PATCH 4/5] commit-message: add comments --- dm/tests/shardddl1/run.sh | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index f11ff0685e9..ace428feb19 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -619,6 +619,7 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)" run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))" run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i + 100))" + # Use downstream uk 'b' as key and this sql which modifiies 'b' will be splited to two job(delete+insert) run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i + 100))" run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i + 100))" run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 200))" @@ -635,7 +636,7 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { # If use upstream schema, key will be 'a' with value greater than 100. downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l) downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l) - # As update pk/uk row will be divided into delete and update, the count will be 8 in each loop, so total is 11*8 = 88 + # As update pk/uk row will be splited to delete and update, the count will be 8 in each loop, so total is 11*8 = 88 if [[ "$downstreamSuccess" -ne 88 || "$downstreamFail" -ne 0 ]]; then echo "compact use wrong downstream identify key. $downstreamSuccess success should be 88. $downstreamFail failed should be 0." exit 1 From 4b6098f901e08aa2ec8f715c558389fc1dcb944b Mon Sep 17 00:00:00 2001 From: WizardXiao Date: Thu, 18 Nov 2021 12:11:47 +0800 Subject: [PATCH 5/5] commit-message: use panic in failpoint --- dm/syncer/compactor.go | 5 ++--- dm/tests/shardddl1/run.sh | 15 ++++----------- 2 files changed, 6 insertions(+), 14 deletions(-) diff --git a/dm/syncer/compactor.go b/dm/syncer/compactor.go index 0e71fb2652c..73084374b89 100644 --- a/dm/syncer/compactor.go +++ b/dm/syncer/compactor.go @@ -14,6 +14,7 @@ package syncer import ( + "fmt" "strconv" "time" @@ -171,9 +172,7 @@ func (c *compactor) compactJob(j *job) { value, err := strconv.Atoi(key) upper := v.(int) if err != nil || value > upper { - c.logger.Debug("downstream identifyKey check failed.", zap.Error(err), zap.String("identifyKey", key)) - } else { - c.logger.Debug("downstream identifyKey check success.") + panic(fmt.Sprintf("downstream identifyKey check failed. key value %v should less than %v", value, upper)) } }) diff --git a/dm/tests/shardddl1/run.sh b/dm/tests/shardddl1/run.sh index ace428feb19..0a1c9ac57f9 100644 --- a/dm/tests/shardddl1/run.sh +++ b/dm/tests/shardddl1/run.sh @@ -630,17 +630,6 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() { insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb}; drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};" check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30 - # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20. - # This goal is check whether it use downstream schema in compator. - # if use downstream schema, key will be 'b' with value less than 20. - # If use upstream schema, key will be 'a' with value greater than 100. - downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l) - downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l) - # As update pk/uk row will be splited to delete and update, the count will be 8 in each loop, so total is 11*8 = 88 - if [[ "$downstreamSuccess" -ne 88 || "$downstreamFail" -ne 0 ]]; then - echo "compact use wrong downstream identify key. $downstreamSuccess success should be 88. $downstreamFail failed should be 0." - exit 1 - fi compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l) # As compact is affected by "j.tp == flush", the check count of compact use "-le 50" if [[ "$compactCnt" -le 50 ]]; then @@ -654,6 +643,10 @@ function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() { ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true check_port_offline $WORKER1_PORT 20 check_port_offline $WORKER2_PORT 20 + # DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20, if false, it will be panic. + # This goal is check whether it use downstream schema in compator. + # if use downstream schema, key will be 'b' with value less than 20. + # If use upstream schema, key will be 'a' with value greater than 100. export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/SkipFlushCompactor=return();github.com/pingcap/ticdc/dm/syncer/DownstreamIdentifyKeyCheckInCompact=return(20)' run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml