From c807e892497c4d97d20564503f962c6239ca044d Mon Sep 17 00:00:00 2001 From: leoppro Date: Tue, 15 Jun 2021 21:38:36 +0800 Subject: [PATCH] owner: deprecate the removed changefeed state (#1990) --- cdc/model/changefeed.go | 2 +- cdc/owner/feed_state_manager.go | 19 ++++++++++--------- cdc/owner/feed_state_manager_test.go | 28 +--------------------------- tests/cli/run.sh | 19 +------------------ tests/gc_safepoint/run.sh | 2 -- 5 files changed, 13 insertions(+), 57 deletions(-) diff --git a/cdc/model/changefeed.go b/cdc/model/changefeed.go index f144b7fb892..59edeaccd3b 100644 --- a/cdc/model/changefeed.go +++ b/cdc/model/changefeed.go @@ -48,7 +48,7 @@ const ( StateError FeedState = "error" StateFailed FeedState = "failed" StateStopped FeedState = "stopped" - StateRemoved FeedState = "removed" + StateRemoved FeedState = "removed" // deprecated, will be removed in the next version StateFinished FeedState = "finished" ) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index a21b79c1f19..4527949a9a1 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -112,15 +112,16 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) { m.shouldBeRunning = false jobsPending = true m.patchState(model.StateRemoved) - if job.Opts != nil && job.Opts.ForceRemove { - // remove changefeed info and state - m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { - return nil, true, nil - }) - m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { - return nil, true, nil - }) - } + // remove changefeed info and state + m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { + return nil, true, nil + }) + m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { + return nil, true, nil + }) + checkpointTs := m.state.Info.GetCheckpointTs(m.state.Status) + log.Info("the changefeed removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs)) + case model.AdminResume: switch m.state.Info.State { case model.StateFailed, model.StateError, model.StateStopped, model.StateFinished: diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 61c8f2f2eac..f16cf41594d 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -96,33 +96,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) { manager.Tick(state) tester.MustApplyPatches() c.Assert(manager.ShouldRunning(), check.IsFalse) - c.Assert(state.Info.State, check.Equals, model.StateRemoved) - c.Assert(state.Info.AdminJobType, check.Equals, model.AdminRemove) - c.Assert(state.Status.AdminJobType, check.Equals, model.AdminRemove) - - // a removed changefeed can not be stop - manager.PushAdminJob(&model.AdminJob{ - CfID: ctx.ChangefeedVars().ID, - Type: model.AdminStop, - }) - manager.Tick(state) - tester.MustApplyPatches() - c.Assert(manager.ShouldRunning(), check.IsFalse) - c.Assert(state.Info.State, check.Equals, model.StateRemoved) - c.Assert(state.Info.AdminJobType, check.Equals, model.AdminRemove) - c.Assert(state.Status.AdminJobType, check.Equals, model.AdminRemove) - - // force remove a changefeed - manager.PushAdminJob(&model.AdminJob{ - CfID: ctx.ChangefeedVars().ID, - Type: model.AdminRemove, - Opts: &model.AdminJobOption{ForceRemove: true}, - }) - manager.Tick(state) - tester.MustApplyPatches() - c.Assert(manager.ShouldRunning(), check.IsFalse) - c.Assert(state.Info, check.IsNil) - c.Assert(state.Status, check.IsNil) + c.Assert(state.Exist(), check.IsFalse) } func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) { diff --git a/tests/cli/run.sh b/tests/cli/run.sh index 1804091a7b6..d60113eb83f 100644 --- a/tests/cli/run.sh +++ b/tests/cli/run.sh @@ -136,25 +136,8 @@ EOF # Remove changefeed run_cdc_cli changefeed --changefeed-id $uuid remove && sleep 3 - jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1) - if [[ $jobtype != 3 ]]; then - echo "[$(date)] <<<<< unexpect admin job type! expect 3 got ${jobtype} >>>>>" - exit 1 - fi - check_changefeed_state $uuid "removed" - - set +e - # Make sure changefeed can not be created if a removed changefeed with the same name exists - create_log=$(run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="$uuid" 2>&1) - set -e - exists=$(echo $create_log | grep -oE 'already exists') - if [[ -z $exists ]]; then - echo "[$(date)] <<<<< unexpect output got ${create_log} >>>>>" - exit 1 - fi + check_changefeed_count http://${UP_PD_HOST_1}:${UP_PD_PORT_1} 0 - # force remove the changefeed, and re create a new one with the same name - run_cdc_cli changefeed --changefeed-id $uuid remove --force && sleep 3 run_cdc_cli changefeed create --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" && sleep 3 check_changefeed_state $uuid "normal" diff --git a/tests/gc_safepoint/run.sh b/tests/gc_safepoint/run.sh index d498abd2c72..c49c060d783 100755 --- a/tests/gc_safepoint/run.sh +++ b/tests/gc_safepoint/run.sh @@ -114,12 +114,10 @@ function run() { # remove paused changefeed, the safe_point forward will recover cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr - ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed" ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id # remove all changefeeds, the safe_point will be cleared cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr - ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed" ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id cleanup_process $CDC_BINARY