Skip to content

Commit

Permalink
revert the CI test for the new owner and fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and ti-chi-bot committed Jun 23, 2021
1 parent c807e89 commit 4f76708
Show file tree
Hide file tree
Showing 23 changed files with 102 additions and 77 deletions.
14 changes: 14 additions & 0 deletions cdc/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,20 @@ func (s *Server) handleChangefeedQuery(w http.ResponseWriter, req *http.Request)
tm := oracle.GetTimeFromTS(cfStatus.CheckpointTs)
resp.Checkpoint = tm.Format("2006-01-02 15:04:05.000")
}
if !config.NewReplicaImpl && cfStatus != nil {
switch cfStatus.AdminJobType {
case model.AdminNone, model.AdminResume:
if cfInfo != nil && cfInfo.Error != nil {
resp.FeedState = string(model.StateFailed)
}
case model.AdminStop:
resp.FeedState = string(model.StateStopped)
case model.AdminRemove:
resp.FeedState = string(model.StateRemoved)
case model.AdminFinish:
resp.FeedState = string(model.StateFinished)
}
}
writeData(w, resp)
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
// ErrorHistoryThreshold represents failure upper limit in time window.
// Before a changefeed is initialized, check the the failure count of this
// changefeed, if it is less than ErrorHistoryThreshold, then initialize it.
ErrorHistoryThreshold = 3
ErrorHistoryThreshold = 5
)

// ChangeFeedInfo describes the detail of a ChangeFeed
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) {
mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockAsyncSink := cf.sink.(*mockAsyncSink)
// add 5s to resolvedTs
mockDDLPuller.resolvedTs = oracle.GoTimeToTS(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5 * time.Second))
mockDDLPuller.resolvedTs = oracle.ComposeTS(oracle.GetPhysical(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5*time.Second)), 0)
// tick 20 times
for i := 0; i <= 20; i++ {
cf.Tick(ctx, state, captures)
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/gc_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (s *gcManagerSuite) TestCheckStaleCheckpointTs(c *check.C) {
err := gcManager.CheckStaleCheckpointTs(ctx, 10)
c.Assert(cerror.ErrGCTTLExceeded.Equal(errors.Cause(err)), check.IsTrue)

err = gcManager.CheckStaleCheckpointTs(ctx, oracle.GoTimeToTS(time.Now()))
err = gcManager.CheckStaleCheckpointTs(ctx, oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0))
c.Assert(err, check.IsNil)

gcManager.isTiCDCBlockGC = false
Expand Down
6 changes: 3 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) {
owner, state, tester := createOwner4Test(ctx, c)
changefeedID := "test-changefeed"
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
}
changefeedStr, err := changefeedInfo.Marshal()
Expand Down Expand Up @@ -98,7 +98,7 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) {
owner, state, tester := createOwner4Test(ctx, c)
changefeedID := "test-changefeed"
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
}
changefeedStr, err := changefeedInfo.Marshal()
Expand Down Expand Up @@ -145,7 +145,7 @@ func (s *ownerSuite) TestCheckClusterVersion(c *check.C) {

changefeedID := "test-changefeed"
changefeedInfo := &model.ChangeFeedInfo{
StartTs: oracle.GoTimeToTS(time.Now()),
StartTs: oracle.ComposeTS(oracle.GetPhysical(time.Now()), 0),
Config: config.GetDefaultReplicaConfig(),
}
changefeedStr, err := changefeedInfo.Marshal()
Expand Down
9 changes: 4 additions & 5 deletions cdc/owner/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/util/testleak"
"github.com/pingcap/tidb/store/tikv/oracle"
)

var _ = check.Suite(&schemaSuite{})
Expand All @@ -35,7 +34,7 @@ func (s *schemaSuite) TestAllPhysicalTables(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
ver, err := helper.Storage().CurrentVersion()
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -82,7 +81,7 @@ func (s *schemaSuite) TestIsIneligibleTableID(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
ver, err := helper.Storage().CurrentVersion()
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
Expand All @@ -102,7 +101,7 @@ func (s *schemaSuite) TestBuildDDLEvent(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
ver, err := helper.Storage().CurrentVersion()
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -151,7 +150,7 @@ func (s *schemaSuite) TestSinkTableInfos(c *check.C) {
defer testleak.AfterTest(c)()
helper := entry.NewSchemaTestHelper(c)
defer helper.Close()
ver, err := helper.Storage().CurrentVersion(oracle.GlobalTxnScope)
ver, err := helper.Storage().CurrentVersion()
c.Assert(err, check.IsNil)
schema, err := newSchemaWrap4Owner(helper.Storage(), ver.Ver, config.GetDefaultReplicaConfig())
c.Assert(err, check.IsNil)
Expand Down
2 changes: 1 addition & 1 deletion errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ unified sorter backend is terminating

["CDC:ErrUnifiedSorterIOError"]
error = '''
unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s
unified sorter IO error
'''

["CDC:ErrUnknownKVEventType"]
Expand Down
11 changes: 5 additions & 6 deletions tests/availability/owner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ function test_owner_ha() {
test_hang_up_owner
test_expire_owner
test_owner_cleanup_stale_tasks
# FIXME: this test case should be owner crashed during task cleanup
# test_owner_cleanup_stale_tasks
test_owner_retryable_error
test_gap_between_watch_capture
}
Expand Down Expand Up @@ -159,8 +161,7 @@ function test_owner_cleanup_stale_tasks() {
function test_owner_retryable_error() {
echo "run test case test_owner_retryable_error"

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture/capture-campaign-compacted-error=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/capture-campaign-compacted-error=1*return(true)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server1
Expand All @@ -172,8 +173,7 @@ function test_owner_retryable_error() {
echo "owner pid:" $owner_pid
echo "owner id" $owner_id

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture/capture-resign-failed=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner-run-with-error=1*return(true);github.com/pingcap/ticdc/cdc/capture-resign-failed=1*return(true)'

# run another server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_owner_retryable_error.server2 --addr "127.0.0.1:8301"
Expand All @@ -198,8 +198,7 @@ function test_owner_retryable_error() {
function test_gap_between_watch_capture() {
echo "run test case test_gap_between_watch_capture"

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sleep-before-watch-capture=1*sleep(6000)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/sleep-in-owner-tick=1*sleep(6000)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/sleep-before-watch-capture=1*sleep(6000)'

# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_gap_between_watch_capture.server1
Expand Down
2 changes: 1 addition & 1 deletion tests/availability/processor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function test_processor_ha() {
function test_stop_processor() {
echo "run test case test_stop_processor"
# start a capture server
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --logsuffix test_stop_processor
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
# ensure the server become the owner
ensure $MAX_RETRIES "$CDC_BINARY cli capture list 2>&1 | grep '\"is-owner\": true'"
owner_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')
Expand Down
32 changes: 15 additions & 17 deletions tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@ source $CUR/../_utils/test_prepare
WORK_DIR=$OUT_DIR/$TEST_NAME
CDC_BINARY=cdc.test
SINK_TYPE=$1
MAX_RETRIES=20
MAX_RETRIES=10

function check_changefeed_mark_error() {
function check_changefeed_mark_failed() {
endpoints=$1
changefeedid=$2
error_msg=$3
info=$(cdc cli changefeed query --pd=$endpoints -c $changefeedid -s)
echo "$info"
state=$(echo $info|jq -r '.state')
if [[ ! "$state" == "error" ]]; then
echo "changefeed state $state does not equal to error"
if [[ ! "$state" == "failed" ]]; then
echo "changefeed state $state does not equal to failed"
exit 1
fi
message=$(echo $info|jq -r '.error.message')
Expand Down Expand Up @@ -97,7 +97,7 @@ function check_no_capture() {
fi
}

export -f check_changefeed_mark_error
export -f check_changefeed_mark_failed
export -f check_changefeed_mark_failed_regex
export -f check_changefeed_mark_stopped_regex
export -f check_changefeed_mark_stopped
Expand All @@ -114,8 +114,7 @@ function run() {
start_ts=$(run_cdc_cli tso query --pd=http://$UP_PD_HOST_1:$UP_PD_PORT_1)
run_sql "CREATE DATABASE changefeed_error;" ${UP_TIDB_HOST} ${UP_TIDB_PORT}
go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedNoRetryError=1*return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/NewChangefeedNoRetryError=1*return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedNoRetryError=1*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
capture_pid=$(ps -C $CDC_BINARY -o pid= | awk '{print $1}')

Expand All @@ -130,20 +129,21 @@ function run() {
fi

ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} ".*CDC:ErrStartTsBeforeGC.*"
cdc cli changefeed resume -c $changefeedid
changefeed_info=$(ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} get /tidb/cdc/changefeed/info/${changefeedid}|tail -n 1)
new_info=$(echo $changefeed_info|sed 's/"state":"failed"/"state":"normal"/g')
ETCDCTL_API=3 etcdctl --endpoints=${UP_PD_HOST_1}:${UP_PD_PORT_1} put /tidb/cdc/changefeed/info/${changefeedid} "$new_info"

check_table_exists "changefeed_error.USERTABLE" ${DOWN_TIDB_HOST} ${DOWN_TIDB_PORT}
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

go-ycsb load mysql -P $CUR/conf/workload -p mysql.host=${UP_TIDB_HOST} -p mysql.port=${UP_TIDB_PORT} -p mysql.user=root -p mysql.db=changefeed_error
check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedRetryError=return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/NewChangefeedRetryError=return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/NewChangefeedRetryError=return(true)'
kill $capture_pid
ensure $MAX_RETRIES check_no_capture http://${UP_PD_HOST_1}:${UP_PD_PORT_1}
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
ensure $MAX_RETRIES check_changefeed_mark_error http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failpoint injected retriable error"
ensure $MAX_RETRIES check_changefeed_mark_failed http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid} "failpoint injected retriable error"

cdc cli changefeed remove -c $changefeedid
ensure $MAX_RETRIES check_no_changefeed ${UP_PD_HOST_1}:${UP_PD_PORT_1}
Expand All @@ -152,24 +152,22 @@ function run() {
cleanup_process $CDC_BINARY

# owner DDL error case
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectChangefeedDDLError=return(true)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectChangefeedDDLError=return(true)' # new owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectChangefeedDDLError=return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY
changefeedid_1=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

run_sql "CREATE table changefeed_error.DDLERROR(id int primary key, val int);"
ensure $MAX_RETRIES check_changefeed_mark_error http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_1} "[CDC:ErrExecDDLFailed]exec DDL failed"
ensure $MAX_RETRIES check_changefeed_mark_stopped http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_1} "[CDC:ErrExecDDLFailed]exec DDL failed"

cdc cli changefeed remove -c $changefeedid_1
cleanup_process $CDC_BINARY

# updating GC safepoint failure case
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectActualGCSafePoint=return(9223372036854775807)' # new owner
# export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(9223372036854775807)' # old owner
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/InjectActualGCSafePoint=return(9223372036854775807)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_2=$(cdc cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
ensure $MAX_RETRIES check_changefeed_mark_failed_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "[CDC:ErrStartTsBeforeGC]"
ensure $MAX_RETRIES check_changefeed_mark_stopped_regex http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_2} "service safepoint lost"

cdc cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
Expand Down
11 changes: 7 additions & 4 deletions tests/changefeed_finish/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ MAX_RETRIES=10
function check_changefeed_is_finished() {
pd=$1
changefeed=$2
query=$(cdc cli changefeed query -c=$changefeed)
echo "$query"
state=$(echo "$query"|jq ".state"|tr -d '"')
state=$(cdc cli changefeed query -s -c=$changefeed|jq ".state"|tr -d '"')
if [[ ! "$state" -eq "finished" ]]; then
echo "state $state is not finished"
exit 1
fi

info=$(cdc cli changefeed query -c=$changefeed|sed "/has been deleted/d"|jq ".info")
if [[ ! "$info" -eq "null" ]]; then
echo "unexpected changefeed info $info, should be null"
exit 1
fi

status_length=$(echo "$query"|sed "/has been deleted/d"|jq '."task-status"|length')
status_length=$(cdc cli changefeed query -c=$changefeed|sed "/has been deleted/d"|jq '."task-status"|length')
if [[ ! "$status_length" -eq "0" ]]; then
echo "unexpected task status length $status_length, should be 0"
exit 1
Expand Down
23 changes: 20 additions & 3 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -128,16 +128,33 @@ EOF
# Resume changefeed
run_cdc_cli changefeed --changefeed-id $uuid resume && sleep 3
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 0 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 0 got ${jobtype} >>>>>"
if [[ $jobtype != 2 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 2 got ${jobtype} >>>>>"
exit 1
fi
check_changefeed_state $uuid "normal"

# Remove changefeed
run_cdc_cli changefeed --changefeed-id $uuid remove && sleep 3
check_changefeed_count http://${UP_PD_HOST_1}:${UP_PD_PORT_1} 0
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 3 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 3 got ${jobtype} >>>>>"
exit 1
fi
check_changefeed_state $uuid "removed"

set +e
# Make sure changefeed can not be created if a removed changefeed with the same name exists
create_log=$(run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="$uuid" 2>&1)
set -e
exists=$(echo $create_log | grep -oE 'already exists')
if [[ -z $exists ]]; then
echo "[$(date)] <<<<< unexpect output got ${create_log} >>>>>"
exit 1
fi

# force remove the changefeed, and re create a new one with the same name
run_cdc_cli changefeed --changefeed-id $uuid remove --force && sleep 3
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" && sleep 3
check_changefeed_state $uuid "normal"

Expand Down
9 changes: 3 additions & 6 deletions tests/cyclic_abc/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,14 @@ function run() {
--pd "http://${UP_PD_HOST_1}:${UP_PD_PORT_1}" \
--cyclic-replica-id 1 \
--cyclic-filter-replica-ids 2 \
--cyclic-sync-ddl true \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl true

run_cdc_cli changefeed create --start-ts=$start_ts \
--sink-uri="mysql://root@${TLS_TIDB_HOST}:${TLS_TIDB_PORT}/?safe-mode=false&ssl-ca=${TLS_DIR}/ca.pem&ssl-cert=${TLS_DIR}/server.pem?ssl-key=${TLS_DIR}/server-key.pem" \
--pd "http://${DOWN_PD_HOST}:${DOWN_PD_PORT}" \
--cyclic-replica-id 2 \
--cyclic-filter-replica-ids 3 \
--cyclic-sync-ddl true \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl true

run_cdc_cli changefeed create --start-ts=$start_ts \
--sink-uri="mysql://root@${UP_TIDB_HOST}:${UP_TIDB_PORT}/?safe-mode=false" \
Expand All @@ -122,8 +120,7 @@ function run() {
--key=$TLS_DIR/client-key.pem \
--cyclic-replica-id 3 \
--cyclic-filter-replica-ids 1 \
--cyclic-sync-ddl false \
--config $CUR/conf/changefeed.toml
--cyclic-sync-ddl false

for i in $(seq 6 15); do {
sqlup="START TRANSACTION;"
Expand Down
2 changes: 1 addition & 1 deletion tests/ddl_reentrant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ function ddl_test() {

echo $ddl > ${WORK_DIR}/ddl_temp.sql
ensure 10 check_ddl_executed "${WORK_DIR}/cdc.log" "${WORK_DIR}/ddl_temp.sql" true
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"StartTs\\":[0-9]{18}'|awk -F: '{print $(NF)}')
ddl_start_ts=$(grep "Execute DDL succeeded" ${WORK_DIR}/cdc.log|tail -n 1|grep -oE '"start_ts\\":[0-9]{18}'|awk -F: '{print $(NF)}')
cdc cli changefeed remove --changefeed-id=${changefeedid}
changefeedid=$(cdc cli changefeed create --no-confirm --start-ts=${ddl_start_ts} --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')
echo "create new changefeed ${changefeedid} from ${ddl_start_ts}"
Expand Down
3 changes: 2 additions & 1 deletion tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ function run() {
if [ "$SINK_TYPE" == "kafka" ]; then
run_kafka_consumer $WORK_DIR "kafka://127.0.0.1:9092/$TOPIC_NAME?partition-num=4&version=${KAFKA_VERSION}"
fi
export GO_FAILPOINTS='github.com/pingcap/ticdc/cdc/owner/InjectGcSafepointUpdateInterval=return(500)' # new owner
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY --addr "127.0.0.1:8300" --pd $pd_addr
changefeed_id=$(cdc cli changefeed create --pd=$pd_addr --sink-uri="$SINK_URI" 2>&1|tail -n2|head -n1|awk '{print $2}')

Expand Down Expand Up @@ -114,10 +113,12 @@ function run() {

# remove paused changefeed, the safe_point forward will recover
cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# remove all changefeeds, the safe_point will be cleared
cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed"
ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id

cleanup_process $CDC_BINARY
Expand Down
Loading

0 comments on commit 4f76708

Please sign in to comment.