Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

Commit

Permalink
cherry pick #1606 to release-2.0
Browse files Browse the repository at this point in the history
Signed-off-by: ti-srebot <ti-srebot@pingcap.com>
  • Loading branch information
lichunzhu authored and ti-srebot committed Apr 20, 2021
1 parent 0568756 commit 8368a7b
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 12 deletions.
2 changes: 1 addition & 1 deletion dm/dm-ansible/scripts/dm.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The current state of subtasks in instances.\n\n0: Invalid\n\n1: New\n\n2: Running\n\n3: Paused\n\n4: Stopped\n\n5: Finished",
"description": "The current state of subtasks in instances.\n\n0: Invalid\n\n1: New\n\n2: Running\n\n3: Paused",
"fill": 1,
"gridPos": {
"h": 7,
Expand Down
2 changes: 1 addition & 1 deletion dm/dm-ansible/scripts/dm_instances.json
Original file line number Diff line number Diff line change
Expand Up @@ -1096,7 +1096,7 @@
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"description": "The current state of subtasks in the instance.\n\n0: Invalid\n\n1: New\n\n2: Running\n\n3: Paused\n\n4: Stopped\n\n5: Finished",
"description": "The current state of subtasks in the instance.\n\n0: Invalid\n\n1: New\n\n2: Running\n\n3: Paused",
"fill": 1,
"gridPos": {
"h": 7,
Expand Down
4 changes: 0 additions & 4 deletions dm/worker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,3 @@ func InitStatus(lis net.Listener) {
log.L().Error("status server returned", log.ShortError(err))
}
}

func (st *SubTask) removeLabelValuesWithTaskInMetrics(task string, source string) {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": source})
}
21 changes: 15 additions & 6 deletions dm/worker/subtask.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/failpoint"
"github.com/prometheus/client_golang/prometheus"
"github.com/siddontang/go/sync2"
"go.etcd.io/etcd/clientv3"
"go.uber.org/zap"
Expand Down Expand Up @@ -119,7 +120,7 @@ func NewSubTaskWithStage(cfg *config.SubTaskConfig, stage pb.Stage, etcdClient *
cancel: cancel,
etcdClient: etcdClient,
}
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
updateTaskState(st.cfg.Name, st.cfg.SourceID, st.stage)
return &st
}

Expand Down Expand Up @@ -384,14 +385,14 @@ func (st *SubTask) setStage(stage pb.Stage) {
st.Lock()
defer st.Unlock()
st.stage = stage
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
updateTaskState(st.cfg.Name, st.cfg.SourceID, st.stage)
}

func (st *SubTask) setStageAndResult(stage pb.Stage, result *pb.ProcessResult) {
st.Lock()
defer st.Unlock()
st.stage = stage
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
updateTaskState(st.cfg.Name, st.cfg.SourceID, st.stage)
st.result = result
}

Expand All @@ -402,7 +403,7 @@ func (st *SubTask) stageCAS(oldStage, newStage pb.Stage) bool {

if st.stage == oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
updateTaskState(st.cfg.Name, st.cfg.SourceID, st.stage)
return true
}
return false
Expand All @@ -414,7 +415,7 @@ func (st *SubTask) setStageIfNot(oldStage, newStage pb.Stage) bool {
defer st.Unlock()
if st.stage != oldStage {
st.stage = newStage
taskState.WithLabelValues(st.cfg.Name, st.cfg.SourceID).Set(float64(st.stage))
updateTaskState(st.cfg.Name, st.cfg.SourceID, st.stage)
return true
}
return false
Expand Down Expand Up @@ -450,9 +451,9 @@ func (st *SubTask) Close() {

st.cancel()
st.closeUnits() // close all un-closed units
st.removeLabelValuesWithTaskInMetrics(st.cfg.Name, st.cfg.SourceID)
st.wg.Wait()
st.setStageIfNot(pb.Stage_Finished, pb.Stage_Stopped)
updateTaskState(st.cfg.Name, st.cfg.SourceID, pb.Stage_Stopped)
}

// Pause pauses the running sub task.
Expand Down Expand Up @@ -697,3 +698,11 @@ func (st *SubTask) HandleError(ctx context.Context, req *pb.HandleWorkerErrorReq
}
return err
}

func updateTaskState(task, sourceID string, stage pb.Stage) {
if stage == pb.Stage_Stopped || stage == pb.Stage_Finished {
taskState.DeleteAllAboutLabels(prometheus.Labels{"task": task, "source_id": sourceID})
} else {
taskState.WithLabelValues(task, sourceID).Set(float64(stage))
}
}
26 changes: 26 additions & 0 deletions tests/_utils/check_metric_not_contains
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#!/bin/bash
# parameter 1: port
# parameter 2: metric name
# parameter 3: retry count, if check failed we will wait 1s before next retry, until retry time exceeds retry count

set -eu

port=$1
metric_name=$2
retry_count=$3

shift 3

counter=0
while [ $counter -lt $retry_count ]; do
metric=$(curl -s http://127.0.0.1:$port/metrics | grep $metric_name | wc -l)
if [ $metric -eq 0 ]; then
exit 0
fi
((counter+=1))
echo "wait for valid metric for $counter-th time"
sleep 1
done

echo "metric $metric_name has invalid count $metric"
exit 1
12 changes: 12 additions & 0 deletions tests/all_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ function run() {
# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

# check task has started
check_metric $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"$ILLEGAL_CHAR_NAME\"}" 3 1 3
check_metric $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"$ILLEGAL_CHAR_NAME\"}" 3 1 3

# check default session config
check_log_contain_with_retry '\\"tidb_txn_mode\\":\\"optimistic\\"' $WORK_DIR/worker1/log/dm-worker.log
check_log_contain_with_retry '\\"tidb_txn_mode\\":\\"optimistic\\"' $WORK_DIR/worker2/log/dm-worker.log
Expand Down Expand Up @@ -324,6 +328,14 @@ function run() {
"query-status $ILLEGAL_CHAR_NAME" \
"Error 1049: Unknown database" 1

# stop task, task state should be cleaned
run_dm_ctl $WORK_DIR "127.0.0.1:$MASTER_PORT" \
"stop-task $ILLEGAL_CHAR_NAME"\
"\"result\": true" 3
check_metric_not_contains $WORKER1_PORT "dm_worker_task_state{source_id=\"mysql-replica-01\",task=\"$ILLEGAL_CHAR_NAME\"}" 3
check_metric_not_contains $WORKER2_PORT "dm_worker_task_state{source_id=\"mysql-replica-02\",task=\"$ILLEGAL_CHAR_NAME\"}" 3


export GO_FAILPOINTS=''

run_sql_both_source "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'"
Expand Down
6 changes: 6 additions & 0 deletions tests/full_mode/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -179,13 +179,19 @@ function run() {

# start DM task only
dmctl_start_task "$cur/conf/dm-task.yaml" "--remove-meta"
# Don't check task state here. The task may be finished very soon so that we can't get task state here.
# check_metric $WORKER1_PORT 'dm_worker_task_state{source_id="mysql-replica-01",task="test"}' 3 1 3
# check_metric $WORKER2_PORT 'dm_worker_task_state{source_id="mysql-replica-02",task="test"}' 3 1 3

# use sync_diff_inspector to check full dump loader
check_sync_diff $WORK_DIR $cur/conf/diff_config.toml

echo "check dump files have been cleaned"
ls $WORK_DIR/worker1/dumped_data.test && exit 1 || echo "worker1 auto removed dump files"
ls $WORK_DIR/worker2/dumped_data.test && exit 1 || echo "worker2 auto removed dump files"
# check task finished and metric cleaned
check_metric_not_contains $WORKER1_PORT 'dm_worker_task_state{source_id="mysql-replica-01",task="test"}' 3
check_metric_not_contains $WORKER2_PORT 'dm_worker_task_state{source_id="mysql-replica-02",task="test"}' 3
run_sql_both_source "SET @@GLOBAL.SQL_MODE='ONLY_FULL_GROUP_BY,STRICT_TRANS_TABLES,NO_ZERO_IN_DATE,NO_ZERO_DATE,ERROR_FOR_DIVISION_BY_ZERO,NO_ENGINE_SUBSTITUTION'"
}

Expand Down

0 comments on commit 8368a7b

Please sign in to comment.