Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

add failpoint test for db operation #206

Merged
merged 27 commits into from
Aug 1, 2019
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
09dd4bd
add failpoint for loader when execute create table
WangXiangUSTC Jul 17, 2019
0eaceba
enable failpoint in test
WangXiangUSTC Jul 17, 2019
e6e2a2d
minor update
WangXiangUSTC Jul 18, 2019
9a606fc
fix check
WangXiangUSTC Jul 19, 2019
580ee08
add failpoint
WangXiangUSTC Jul 22, 2019
0e673c6
add test case
WangXiangUSTC Jul 24, 2019
c9b0b77
add test case for relay failed
WangXiangUSTC Jul 24, 2019
155710f
refine test
WangXiangUSTC Jul 24, 2019
fbf8a16
clean code
WangXiangUSTC Jul 24, 2019
6d65cf3
Merge branch 'master' into xiang/db_test
WangXiangUSTC Jul 24, 2019
738bcd5
update others_integration.txt
WangXiangUSTC Jul 24, 2019
cd9d793
Merge branch 'xiang/db_test' of https://github.com/pingcap/dm into xi…
WangXiangUSTC Jul 24, 2019
b688cb2
fix test
WangXiangUSTC Jul 24, 2019
74c4f97
fix test
WangXiangUSTC Jul 24, 2019
5165e14
address commnet
WangXiangUSTC Jul 25, 2019
fcf20ba
minor fix
WangXiangUSTC Jul 25, 2019
cc04334
Merge branch 'master' into xiang/db_test
IANTHEREAL Jul 29, 2019
3b35795
address comment
WangXiangUSTC Jul 25, 2019
0eb7170
Merge branch 'xiang/db_test' of https://github.com/pingcap/dm into xi…
WangXiangUSTC Jul 30, 2019
44d5641
fix
WangXiangUSTC Jul 30, 2019
3169dab
Merge branch 'master' into xiang/db_test
WangXiangUSTC Jul 30, 2019
f405bd1
Update pkg/utils/db.go
WangXiangUSTC Jul 31, 2019
dfaf6cc
Update loader/db.go
WangXiangUSTC Jul 31, 2019
9d7936a
change cleanup1 to cleanup_data, change cleanup2 to cleanup_process
WangXiangUSTC Jul 31, 2019
84d41a0
minor fix
WangXiangUSTC Jul 31, 2019
9e3900d
Merge branch 'master' into xiang/db_test
IANTHEREAL Aug 1, 2019
05ac82e
Merge branch 'master' into xiang/db_test
IANTHEREAL Aug 1, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (st *SubTask) Init() error {
u.Close()
}

st.initialized.Set(true)
st.initialized.Set(len(needCloseUnits) == 0)
}()

// every unit does base initialization in `Init`, and this must pass before start running the sub task
Expand Down
22 changes: 22 additions & 0 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package loader
import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand All @@ -24,6 +26,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -131,6 +134,25 @@ func (conn *Conn) executeSQLCustomRetry(ctx *tcontext.Context, sqls []string, en

startTime := time.Now()
err = executeSQLImp(ctx, conn.db, sqls)

failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
ctx.L().Fatal("failpoint LoadExecCreateTableFailed's value is invalid", zap.String("val", val.(string)))
}

errCode, err1 := strconv.ParseUint(items[0], 10, 16)
errNum, err2 := strconv.ParseInt(items[1], 10, 16)
if err1 != nil || err2 != nil {
ctx.L().Fatal("failpoint LoadExecCreateTableFailed's value is invalid", zap.String("val", val.(string)), zap.Strings("items", items), zap.Error(err1), zap.Error(err2))
}

if i < int(errNum) && len(sqls) == 1 && strings.Contains(sqls[0], "CREATE TABLE") {
err = tmysql.NewErr(uint16(errCode))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
ctx.L().Warn("executeSQLCustomRetry failed", zap.String("failpoint", "LoadExecCreateTableFailed"), zap.Error(err))
}
})

if err != nil {
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
if isRetryableFn(err) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/dm/pkg/log"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
Expand Down Expand Up @@ -61,6 +63,12 @@ func TrimCtrlChars(s string) string {
// FetchAllDoTables returns all need to do tables after filtered (fetches from upstream MySQL)
func FetchAllDoTables(db *sql.DB, bw *filter.Filter) (map[string][]string, error) {
schemas, err := getSchemas(db, maxRetryCount)

failpoint.Inject("FetchAllDoTablesFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("FetchAllDoTables failed", zap.String("failpoint", "FetchAllDoTablesFailed"), zap.Error(err))
})

if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -115,6 +123,12 @@ func FetchAllDoTables(db *sql.DB, bw *filter.Filter) (map[string][]string, error
func FetchTargetDoTables(db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) {
// fetch tables from source and filter them
sourceTables, err := FetchAllDoTables(db, bw)

failpoint.Inject("FetchTargetDoTablesFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("FetchTargetDoTables failed", zap.String("failpoint", "FetchTargetDoTablesFailed"), zap.Error(err))
})

if err != nil {
return nil, errors.Trace(err)
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
tmysql "github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
)

var (
Expand All @@ -40,6 +44,12 @@ func GetMasterStatus(db *sql.DB, flavor string) (gmysql.Position, gtid.Set, erro
)

rows, err := db.Query(`SHOW MASTER STATUS`)

failpoint.Inject("GetMasterStatusFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
amyangfei marked this conversation as resolved.
Show resolved Hide resolved
log.L().Warn("GetMasterStatus failed", zap.String("failpoint", "GetMasterStatusFailed"), zap.Error(err))
})

if err != nil {
return binlogPos, gs, errors.Trace(err)
}
Expand Down Expand Up @@ -117,6 +127,24 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) {
func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW GLOBAL VARIABLES LIKE '%s'", variable)
rows, err := db.Query(query)

failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}
variableName := items[0]
errCode, err1 := strconv.ParseUint(items[1], 10, 16)
if err1 != nil {
log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}

if variable == variableName {
err = tmysql.NewErr(uint16(errCode))
log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err))
}
})

if err != nil {
return "", errors.Trace(err)
}
Expand Down
8 changes: 8 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

Expand Down Expand Up @@ -489,6 +491,12 @@ func (cp *RemoteCheckPoint) createTable() error {
func (cp *RemoteCheckPoint) Load() error {
query := fmt.Sprintf("SELECT `cp_schema`, `cp_table`, `binlog_name`, `binlog_pos`, `is_global` FROM `%s`.`%s` WHERE `id`='%s'", cp.schema, cp.table, cp.id)
rows, err := cp.db.querySQL(cp.tctx, query, maxRetryCount)

failpoint.Inject("LoadCheckpointFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("Load failed", zap.String("failpoint", "LoadCheckpointFailed"), zap.Error(err))
})

if err != nil {
return errors.Trace(err)
}
Expand Down
13 changes: 13 additions & 0 deletions tests/_utils/kill_dm_master
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

while :
do
dm_master_num=`ps aux > temp && grep "dm-master.test" temp | wc -l && rm temp`
echo "$dm_master_num dm-master alive"
if [ $dm_master_num -ne 0 ]; then
killall dm-master.test || true
sleep 1
else
break
fi
done
13 changes: 13 additions & 0 deletions tests/_utils/kill_dm_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

while :
do
dm_worker_num=`ps aux > temp && grep "dm-worker.test" temp | wc -l && rm temp`
echo "$dm_worker_num dm-worker alive"
if [ $dm_worker_num -ne 0 ]; then
killall dm-worker.test || true
sleep 1
else
break
fi
done
29 changes: 26 additions & 3 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TRACER_PORT=8264
RESET_MASTER=${RESET_MASTER:-true}

# we do clean staff at beginning of each run, so we can keep logs of the latset run
function cleanup1() {
function cleanup_data() {
rm -rf $WORK_DIR
mkdir $WORK_DIR
for target_db in "$@"; do
Expand All @@ -19,10 +19,14 @@ function cleanup1() {
run_sql "drop database if exists dm_meta" $TIDB_PORT
}

function cleanup2() {
function cleanup_process() {
pkill -hup dm-worker.test 2>/dev/null || true
pkill -hup dm-master.test 2>/dev/null || true
pkill -hup dm-tracer.test 2>/dev/null || true

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
wait_process_exit dm-tracer.test
}

if [ "$RESET_MASTER" = true ]; then
Expand Down Expand Up @@ -64,8 +68,27 @@ function dmctl_start_task() {
# shortcut for stop task on two DM-workers
function dmctl_stop_task() {
task_name=$1
dmctl_operate_task $task_name stop-task
}

# shortcut for pause task on two DM-workers
function dmctl_pause_task() {
task_name=$1
dmctl_operate_task $task_name pause-task
}

# shortcut for stop task on two DM-workers
function dmctl_resume_task() {
task_name=$1
dmctl_operate_task $task_name resume-task
}

function dmctl_operate_task() {
task_name=$1
operate=$2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task $task_name" \
"$operate $task_name" \
"\"result\": true" 3 \
"\"worker\": \"127.0.0.1:$WORKER1_PORT\"" 1 \
"\"worker\": \"127.0.0.1:$WORKER2_PORT\"" 1
Expand Down
9 changes: 3 additions & 6 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ function run() {
check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
}

cleanup1 all_mode
cleanup_data all_mode
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/dmctl_advance/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,10 @@ function run() {
usage_and_arg_test
}

cleanup1 dmctl_advance
cleanup_data dmctl_advance
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,10 @@ function run() {
[ "$new_relay_log_count" -eq 1 ]
}

cleanup1 dmctl
cleanup_data dmctl
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,10 @@ function run() {
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup1 $TEST_NAME
cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
40 changes: 40 additions & 0 deletions tests/initial_unit/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# diff Configuration.

log-level = "info"

chunk-size = 10

check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "initial_unit"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = ""
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
5 changes: 5 additions & 0 deletions tests/initial_unit/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Master Configuration.

[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"
Loading