Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
add failpoint test for db operation (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
WangXiangUSTC authored and IANTHEREAL committed Aug 1, 2019
1 parent e8246e5 commit ffebdd3
Show file tree
Hide file tree
Showing 35 changed files with 723 additions and 73 deletions.
2 changes: 1 addition & 1 deletion dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (st *SubTask) Init() error {
u.Close()
}

st.initialized.Set(true)
st.initialized.Set(len(needCloseUnits) == 0)
}()

// every unit does base initialization in `Init`, and this must pass before start running the sub task
Expand Down
22 changes: 22 additions & 0 deletions loader/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package loader
import (
"database/sql"
"fmt"
"strconv"
"strings"
"time"

"github.com/pingcap/dm/dm/config"
Expand All @@ -24,6 +26,7 @@ import (

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -131,6 +134,25 @@ func (conn *Conn) executeSQLCustomRetry(ctx *tcontext.Context, sqls []string, en

startTime := time.Now()
err = executeSQLImp(ctx, conn.db, sqls)

failpoint.Inject("LoadExecCreateTableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
ctx.L().Fatal("failpoint LoadExecCreateTableFailed's value is invalid", zap.String("val", val.(string)))
}

errCode, err1 := strconv.ParseUint(items[0], 10, 16)
errNum, err2 := strconv.ParseInt(items[1], 10, 16)
if err1 != nil || err2 != nil {
ctx.L().Fatal("failpoint LoadExecCreateTableFailed's value is invalid", zap.String("val", val.(string)), zap.Strings("items", items), zap.Error(err1), zap.Error(err2))
}

if i < int(errNum) && len(sqls) == 1 && strings.Contains(sqls[0], "CREATE TABLE") {
err = tmysql.NewErr(uint16(errCode))
ctx.L().Warn("executeSQLCustomRetry failed", zap.String("failpoint", "LoadExecCreateTableFailed"), zap.Error(err))
}
})

if err != nil {
tidbExecutionErrorCounter.WithLabelValues(conn.cfg.Name).Inc()
if isRetryableFn(err) {
Expand Down
14 changes: 14 additions & 0 deletions pkg/utils/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"github.com/pingcap/dm/pkg/log"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-tools/pkg/dbutil"
"github.com/pingcap/tidb-tools/pkg/filter"
"github.com/pingcap/tidb-tools/pkg/table-router"
Expand Down Expand Up @@ -61,6 +63,12 @@ func TrimCtrlChars(s string) string {
// FetchAllDoTables returns all need to do tables after filtered (fetches from upstream MySQL)
func FetchAllDoTables(db *sql.DB, bw *filter.Filter) (map[string][]string, error) {
schemas, err := getSchemas(db, maxRetryCount)

failpoint.Inject("FetchAllDoTablesFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("FetchAllDoTables failed", zap.String("failpoint", "FetchAllDoTablesFailed"), zap.Error(err))
})

if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -115,6 +123,12 @@ func FetchAllDoTables(db *sql.DB, bw *filter.Filter) (map[string][]string, error
func FetchTargetDoTables(db *sql.DB, bw *filter.Filter, router *router.Table) (map[string][]*filter.Table, error) {
// fetch tables from source and filter them
sourceTables, err := FetchAllDoTables(db, bw)

failpoint.Inject("FetchTargetDoTablesFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("FetchTargetDoTables failed", zap.String("failpoint", "FetchTargetDoTablesFailed"), zap.Error(err))
})

if err != nil {
return nil, errors.Trace(err)
}
Expand Down
28 changes: 28 additions & 0 deletions pkg/utils/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@ import (
"database/sql"
"fmt"
"strconv"
"strings"

"github.com/pingcap/dm/pkg/gtid"
"github.com/pingcap/dm/pkg/log"

"github.com/go-sql-driver/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser"
tmysql "github.com/pingcap/parser/mysql"
gmysql "github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"
)

var (
Expand All @@ -40,6 +44,12 @@ func GetMasterStatus(db *sql.DB, flavor string) (gmysql.Position, gtid.Set, erro
)

rows, err := db.Query(`SHOW MASTER STATUS`)

failpoint.Inject("GetMasterStatusFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("GetMasterStatus failed", zap.String("failpoint", "GetMasterStatusFailed"), zap.Error(err))
})

if err != nil {
return binlogPos, gs, errors.Trace(err)
}
Expand Down Expand Up @@ -117,6 +127,24 @@ func GetMariaDBGTID(db *sql.DB) (gtid.Set, error) {
func GetGlobalVariable(db *sql.DB, variable string) (value string, err error) {
query := fmt.Sprintf("SHOW GLOBAL VARIABLES LIKE '%s'", variable)
rows, err := db.Query(query)

failpoint.Inject("GetGlobalVariableFailed", func(val failpoint.Value) {
items := strings.Split(val.(string), ",")
if len(items) != 2 {
log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}
variableName := items[0]
errCode, err1 := strconv.ParseUint(items[1], 10, 16)
if err1 != nil {
log.L().Fatal("failpoint GetGlobalVariableFailed's value is invalid", zap.String("val", val.(string)))
}

if variable == variableName {
err = tmysql.NewErr(uint16(errCode))
log.L().Warn("GetGlobalVariable failed", zap.String("variable", variable), zap.String("failpoint", "GetGlobalVariableFailed"), zap.Error(err))
}
})

if err != nil {
return "", errors.Trace(err)
}
Expand Down
8 changes: 8 additions & 0 deletions syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
tmysql "github.com/pingcap/parser/mysql"
"github.com/siddontang/go-mysql/mysql"
"go.uber.org/zap"

Expand Down Expand Up @@ -489,6 +491,12 @@ func (cp *RemoteCheckPoint) createTable() error {
func (cp *RemoteCheckPoint) Load() error {
query := fmt.Sprintf("SELECT `cp_schema`, `cp_table`, `binlog_name`, `binlog_pos`, `is_global` FROM `%s`.`%s` WHERE `id`='%s'", cp.schema, cp.table, cp.id)
rows, err := cp.db.querySQL(cp.tctx, query, maxRetryCount)

failpoint.Inject("LoadCheckpointFailed", func(val failpoint.Value) {
err = tmysql.NewErr(uint16(val.(int)))
log.L().Warn("Load failed", zap.String("failpoint", "LoadCheckpointFailed"), zap.Error(err))
})

if err != nil {
return errors.Trace(err)
}
Expand Down
13 changes: 13 additions & 0 deletions tests/_utils/kill_dm_master
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

while :
do
dm_master_num=`ps aux > temp && grep "dm-master.test" temp | wc -l && rm temp`
echo "$dm_master_num dm-master alive"
if [ $dm_master_num -ne 0 ]; then
killall dm-master.test || true
sleep 1
else
break
fi
done
13 changes: 13 additions & 0 deletions tests/_utils/kill_dm_worker
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#!/bin/sh

while :
do
dm_worker_num=`ps aux > temp && grep "dm-worker.test" temp | wc -l && rm temp`
echo "$dm_worker_num dm-worker alive"
if [ $dm_worker_num -ne 0 ]; then
killall dm-worker.test || true
sleep 1
else
break
fi
done
29 changes: 26 additions & 3 deletions tests/_utils/test_prepare
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ TRACER_PORT=8264
RESET_MASTER=${RESET_MASTER:-true}

# we do clean staff at beginning of each run, so we can keep logs of the latset run
function cleanup1() {
function cleanup_data() {
rm -rf $WORK_DIR
mkdir $WORK_DIR
for target_db in "$@"; do
Expand All @@ -19,10 +19,14 @@ function cleanup1() {
run_sql "drop database if exists dm_meta" $TIDB_PORT
}

function cleanup2() {
function cleanup_process() {
pkill -hup dm-worker.test 2>/dev/null || true
pkill -hup dm-master.test 2>/dev/null || true
pkill -hup dm-tracer.test 2>/dev/null || true

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
wait_process_exit dm-tracer.test
}

if [ "$RESET_MASTER" = true ]; then
Expand Down Expand Up @@ -64,8 +68,27 @@ function dmctl_start_task() {
# shortcut for stop task on two DM-workers
function dmctl_stop_task() {
task_name=$1
dmctl_operate_task $task_name stop-task
}

# shortcut for pause task on two DM-workers
function dmctl_pause_task() {
task_name=$1
dmctl_operate_task $task_name pause-task
}

# shortcut for stop task on two DM-workers
function dmctl_resume_task() {
task_name=$1
dmctl_operate_task $task_name resume-task
}

function dmctl_operate_task() {
task_name=$1
operate=$2

run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task $task_name" \
"$operate $task_name" \
"\"result\": true" 3 \
"\"worker\": \"127.0.0.1:$WORKER1_PORT\"" 1 \
"\"worker\": \"127.0.0.1:$WORKER2_PORT\"" 1
Expand Down
9 changes: 3 additions & 6 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,10 @@ function run() {
check_metric $WORKER2_PORT 'dm_syncer_replication_lag{task="test"}' 3 0 1
}

cleanup1 all_mode
cleanup_data all_mode
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/dmctl_advance/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,10 @@ function run() {
usage_and_arg_test
}

cleanup1 dmctl_advance
cleanup_data dmctl_advance
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/dmctl_basic/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,10 @@ function run() {
[ "$new_relay_log_count" -eq 1 ]
}

cleanup1 dmctl
cleanup_data dmctl
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
9 changes: 3 additions & 6 deletions tests/incremental_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,10 @@ function run() {
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml
}

cleanup1 $TEST_NAME
cleanup_data $TEST_NAME
# also cleanup dm processes in case of last run failed
cleanup2 $*
cleanup_process $*
run $*
cleanup2 $*

wait_process_exit dm-master.test
wait_process_exit dm-worker.test
cleanup_process $*

echo "[$(date)] <<<<<< test case $TEST_NAME success! >>>>>>"
40 changes: 40 additions & 0 deletions tests/initial_unit/conf/diff_config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# diff Configuration.

log-level = "info"

chunk-size = 10

check-thread-count = 4

sample-percent = 100

use-rowid = false

use-checksum = true

fix-sql-file = "fix.sql"

# tables need to check.
[[check-tables]]
schema = "initial_unit"
tables = ["~.*"]

[[source-db]]
host = "127.0.0.1"
port = 3306
user = "root"
password = ""
instance-id = "source-1"

[[source-db]]
host = "127.0.0.1"
port = 3307
user = "root"
password = ""
instance-id = "source-2"

[target-db]
host = "127.0.0.1"
port = 4000
user = "root"
password = ""
5 changes: 5 additions & 0 deletions tests/initial_unit/conf/dm-master.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Master Configuration.

[[deploy]]
source-id = "mysql-replica-01"
dm-worker = "127.0.0.1:8262"
Loading

0 comments on commit ffebdd3

Please sign in to comment.