Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dm/syncer : add integration test for dml using downstream schema #3434

Merged
merged 21 commits into from
Nov 19, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
775e639
commit-message: add integration test for dml using downstream schema
WizardXiao Nov 12, 2021
7e04555
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
WizardXiao Nov 12, 2021
f5ba0df
Merge branch 'master' of https://github.com/WizardXiao/ticdc into add…
WizardXiao Nov 15, 2021
021f778
fix DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE
WizardXiao Nov 15, 2021
2c98dec
Merge branch 'master' of https://github.com/WizardXiao/ticdc into add…
WizardXiao Nov 15, 2021
ca7de23
Merge branch 'add_integration_test_for_dml_using_downstream_schema' o…
WizardXiao Nov 15, 2021
d313e04
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
lance6716 Nov 15, 2021
0045e4f
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
WizardXiao Nov 16, 2021
f3e1650
commit-message: add comment
WizardXiao Nov 16, 2021
661e0eb
Merge branch 'master' of https://github.com/WizardXiao/ticdc into add…
WizardXiao Nov 16, 2021
257e095
Merge branch 'add_integration_test_for_dml_using_downstream_schema' o…
WizardXiao Nov 16, 2021
3ab0f34
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
WizardXiao Nov 16, 2021
8fbe603
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
WizardXiao Nov 17, 2021
31fc4e8
Merge branch 'master' of https://github.com/WizardXiao/ticdc into add…
WizardXiao Nov 17, 2021
6a59319
commit-message: add comments
WizardXiao Nov 17, 2021
75b87ef
Merge branch 'add_integration_test_for_dml_using_downstream_schema' o…
WizardXiao Nov 17, 2021
6582f9d
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
ti-chi-bot Nov 18, 2021
1389d27
Merge branch 'master' of https://github.com/WizardXiao/ticdc into add…
WizardXiao Nov 18, 2021
4b6098f
commit-message: use panic in failpoint
WizardXiao Nov 18, 2021
7cabac5
Merge branch 'add_integration_test_for_dml_using_downstream_schema' o…
WizardXiao Nov 18, 2021
ccfbdf3
Merge branch 'master' into add_integration_test_for_dml_using_downstr…
ti-chi-bot Nov 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions dm/syncer/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package syncer

import (
"strconv"
"time"

"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -165,6 +166,17 @@ func (c *compactor) compactJob(j *job) {
}

key := j.dml.identifyKey()

failpoint.Inject("DownstreamIdentifyKeyCheckInCompact", func(v failpoint.Value) {
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
value, err := strconv.Atoi(key)
upper := v.(int)
if err != nil || value > upper {
c.logger.Debug("downstream identifyKey check failed.", zap.Error(err), zap.String("identifyKey", key))
} else {
c.logger.Debug("downstream identifyKey check success.")
}
})

prevPos, ok := tableKeyMap[key]
// if no such key in the buffer, add it
if !ok {
Expand Down
21 changes: 21 additions & 0 deletions dm/tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,27 @@ function run_sql_tidb_with_retry() {
fi
}

# shortcut for run tidb sql and check result with retry
function run_sql_tidb_with_retry_times() {
rc=0
for ((k=1; k<$3; k++)); do
run_sql_tidb "$1"
if grep -Fq "$2" "$TEST_DIR/sql_res.$TEST_NAME.txt"; then
rc=1
break
fi
echo "run tidb sql failed $k-th time, retry later"
sleep 2
done
if [[ $rc = 0 ]]; then
echo "TEST FAILED: OUTPUT DOES NOT CONTAIN '$2'"
echo "____________________________________"
cat "$TEST_DIR/sql_res.$TEST_NAME.txt"
echo "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"
exit 1
fi
}

# shortcut for check log contain with retry
function check_log_contain_with_retry() {
text=$1
Expand Down
95 changes: 95 additions & 0 deletions dm/tests/shardddl1/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,60 @@ function DM_COMPACT() {
"clean_table" ""
}

function DM_COMPACT_USE_DOWNSTREAM_SCHEMA_CASE() {
END=10
# As this kind of sql is no use, like "update tb1 set c=1 where a=100" which is behind of "insert into tb1(a,b,c) values(100,1,1)"
# We should avoid this kind of sql to make sure the count of dmls
for i in $(seq 0 $END); do
run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)"
run_sql_source1 "update ${shardddl1}.${tb1} set c=20 where a=$((i + 100))"
run_sql_source1 "update ${shardddl1}.${tb1} set c=c+1 where a=$((i + 100))"
run_sql_source1 "update ${shardddl1}.${tb1} set b=b+1 where a=$((i + 100))"
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
run_sql_source1 "update ${shardddl1}.${tb1} set a=a+100 where a=$((i + 100))"
run_sql_source1 "delete from ${shardddl1}.${tb1} where a=$((i + 200))"
run_sql_source1 "insert into ${shardddl1}.${tb1}(a,b,c) values($((i + 100)),$i,$i)"
done
run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb};" "count(1): 11" 30
run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key auto_increment, b int unique not null, c int) auto_increment = 100;
Ehco1996 marked this conversation as resolved.
Show resolved Hide resolved
insert into ${shardddl}.${tb}_temp (a, b, c) select a, b, c from ${shardddl}.${tb};
drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml 30
# DownstreamIdentifyKeyCheckInCompact=return(20) will check whether the key value in compact is less than 20.
# This goal is check whether it use downstream schema in compator.
# if use downstream schema, key will be 'b' with value less than 20.
# If use upstream schema, key will be 'a' with value greater than 100.
downstreamSuccess=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check success" | wc -l)
downstreamFail=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "downstream identifyKey check failed" | wc -l)
# As update pk/uk row will be divided into delete and update, the count will be 8 in each loop, so total is 11*8 = 88
if [[ "$downstreamSuccess" -ne 88 || "$downstreamFail" -ne 0 ]]; then
WizardXiao marked this conversation as resolved.
Show resolved Hide resolved
echo "compact use wrong downstream identify key. $downstreamSuccess success should be 88. $downstreamFail failed should be 0."
exit 1
fi
compactCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "finish to compact" | wc -l)
# As compact is affected by "j.tp == flush", the check count of compact use "-le 50"
if [[ "$compactCnt" -le 50 ]]; then
echo "compact $compactCnt dmls which is less than 50"
exit 1
fi
}

function DM_COMPACT_USE_DOWNSTREAM_SCHEMA() {
# downstream pk/uk/column is diffrent with upstream, compact use downstream schema.
ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/SkipFlushCompactor=return();github.com/pingcap/ticdc/dm/syncer/DownstreamIdentifyKeyCheckInCompact=return(20)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_case COMPACT_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique not null, c int);\";
run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique not null, c int, d int primary key auto_increment) auto_increment = 100;\"" \
"clean_table" ""
}

function DM_MULTIPLE_ROWS_CASE() {
END=100
for i in $(seq 1 10 $END); do
Expand Down Expand Up @@ -646,6 +700,16 @@ function DM_MULTIPLE_ROWS_CASE() {
}

function DM_MULTIPLE_ROWS() {

ps aux | grep dm-worker | awk '{print $2}' | xargs kill || true
check_port_offline $WORKER1_PORT 20
check_port_offline $WORKER2_PORT 20
export GO_FAILPOINTS='github.com/pingcap/ticdc/dm/syncer/BlockExecuteSQLs=return(1)'
run_dm_worker $WORK_DIR/worker1 $WORKER1_PORT $cur/conf/dm-worker1.toml
run_dm_worker $WORK_DIR/worker2 $WORKER2_PORT $cur/conf/dm-worker2.toml
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER1_PORT
check_rpc_alive $cur/../bin/check_worker_online 127.0.0.1:$WORKER2_PORT

run_case MULTIPLE_ROWS "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique, c int);\"" \
"clean_table" ""
Expand Down Expand Up @@ -677,13 +741,43 @@ function DM_CAUSALITY() {
"clean_table" ""
}

function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA_CASE() {
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,2)"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(2,3)"
run_sql_source1 "update ${shardddl1}.${tb1} set a=3, b=4 where b=3"
run_sql_source1 "delete from ${shardddl1}.${tb1} where a=1"
run_sql_source1 "insert into ${shardddl1}.${tb1} values(1,3)"

run_sql_tidb_with_retry_times "select count(1) from ${shardddl}.${tb} where a =1 and b=3;" "count(1): 1" 30
run_sql_tidb "create table ${shardddl}.${tb}_temp (a int primary key, b int unique);
insert into ${shardddl}.${tb}_temp (a, b) select a, b from ${shardddl}.${tb};
drop table ${shardddl}.${tb}; rename table ${shardddl}.${tb}_temp to ${shardddl}.${tb};"
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

causalityCnt=$(cat $WORK_DIR/worker1/log/dm-worker.log $WORK_DIR/worker2/log/dm-worker.log | grep "meet causality key, will generate a conflict job to flush all sqls" | wc -l)
if [[ "$causalityCnt" -ne 0 ]]; then
echo "causalityCnt is $causalityCnt, but it should be 0"
exit 1
fi
}

function DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA() {
# downstream pk/uk/column is diffrent with upstream, causality use downstream schema.
run_case CAUSALITY_USE_DOWNSTREAM_SCHEMA "single-source-no-sharding" \
"run_sql_source1 \"create table ${shardddl1}.${tb1} (a int primary key, b int unique);\";
run_sql_tidb \"drop database if exists ${shardddl}; create database ${shardddl}; create table ${shardddl}.${tb} (a int, b int unique, c int primary key auto_increment) auto_increment = 100;\"" \
"clean_table" ""
}

function run() {
init_cluster
init_database

DM_COMPACT
DM_COMPACT_USE_DOWNSTREAM_SCHEMA
DM_MULTIPLE_ROWS
DM_CAUSALITY
DM_CAUSALITY_USE_DOWNSTREAM_SCHEMA
DM_UpdateBARule
DM_RENAME_TABLE
DM_RENAME_COLUMN_OPTIMISTIC
Expand All @@ -697,6 +791,7 @@ function run() {
DM_${i}
sleep 1
done

}

cleanup_data $shardddl
Expand Down