Skip to content

Commit

Permalink
owner: deprecate the removed changefeed state (#1990)
Browse files Browse the repository at this point in the history
  • Loading branch information
leoppro authored and ti-chi-bot committed Jun 23, 2021
1 parent cdc59e8 commit c807e89
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 57 deletions.
2 changes: 1 addition & 1 deletion cdc/model/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
StateError FeedState = "error"
StateFailed FeedState = "failed"
StateStopped FeedState = "stopped"
StateRemoved FeedState = "removed"
StateRemoved FeedState = "removed" // deprecated, will be removed in the next version
StateFinished FeedState = "finished"
)

Expand Down
19 changes: 10 additions & 9 deletions cdc/owner/feed_state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,15 +112,16 @@ func (m *feedStateManager) handleAdminJob() (jobsPending bool) {
m.shouldBeRunning = false
jobsPending = true
m.patchState(model.StateRemoved)
if job.Opts != nil && job.Opts.ForceRemove {
// remove changefeed info and state
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, true, nil
})
m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, true, nil
})
}
// remove changefeed info and state
m.state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) {
return nil, true, nil
})
m.state.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) {
return nil, true, nil
})
checkpointTs := m.state.Info.GetCheckpointTs(m.state.Status)
log.Info("the changefeed removed", zap.String("changefeed-id", m.state.ID), zap.Uint64("checkpoint-ts", checkpointTs))

case model.AdminResume:
switch m.state.Info.State {
case model.StateFailed, model.StateError, model.StateStopped, model.StateFinished:
Expand Down
28 changes: 1 addition & 27 deletions cdc/owner/feed_state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,7 @@ func (s *feedStateManagerSuite) TestHandleJob(c *check.C) {
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(state.Info.State, check.Equals, model.StateRemoved)
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminRemove)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminRemove)

// a removed changefeed can not be stop
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminStop,
})
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(state.Info.State, check.Equals, model.StateRemoved)
c.Assert(state.Info.AdminJobType, check.Equals, model.AdminRemove)
c.Assert(state.Status.AdminJobType, check.Equals, model.AdminRemove)

// force remove a changefeed
manager.PushAdminJob(&model.AdminJob{
CfID: ctx.ChangefeedVars().ID,
Type: model.AdminRemove,
Opts: &model.AdminJobOption{ForceRemove: true},
})
manager.Tick(state)
tester.MustApplyPatches()
c.Assert(manager.ShouldRunning(), check.IsFalse)
c.Assert(state.Info, check.IsNil)
c.Assert(state.Status, check.IsNil)
c.Assert(state.Exist(), check.IsFalse)
}

func (s *feedStateManagerSuite) TestMarkFinished(c *check.C) {
Expand Down
19 changes: 1 addition & 18 deletions tests/cli/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -136,25 +136,8 @@ EOF

# Remove changefeed
run_cdc_cli changefeed --changefeed-id $uuid remove && sleep 3
jobtype=$(run_cdc_cli changefeed --changefeed-id $uuid query 2>&1 | grep 'admin-job-type' | grep -oE '[0-9]' | head -1)
if [[ $jobtype != 3 ]]; then
echo "[$(date)] <<<<< unexpect admin job type! expect 3 got ${jobtype} >>>>>"
exit 1
fi
check_changefeed_state $uuid "removed"

set +e
# Make sure changefeed can not be created if a removed changefeed with the same name exists
create_log=$(run_cdc_cli changefeed create --start-ts=$start_ts --sink-uri="$SINK_URI" --changefeed-id="$uuid" 2>&1)
set -e
exists=$(echo $create_log | grep -oE 'already exists')
if [[ -z $exists ]]; then
echo "[$(date)] <<<<< unexpect output got ${create_log} >>>>>"
exit 1
fi
check_changefeed_count http://${UP_PD_HOST_1}:${UP_PD_PORT_1} 0

# force remove the changefeed, and re create a new one with the same name
run_cdc_cli changefeed --changefeed-id $uuid remove --force && sleep 3
run_cdc_cli changefeed create --sink-uri="$SINK_URI" --tz="Asia/Shanghai" -c="$uuid" && sleep 3
check_changefeed_state $uuid "normal"

Expand Down
2 changes: 0 additions & 2 deletions tests/gc_safepoint/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,10 @@ function run() {

# remove paused changefeed, the safe_point forward will recover
cdc cli changefeed remove --changefeed-id=$changefeed_id --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id "removed"
ensure $MAX_RETRIES check_safepoint_forward $pd_addr $pd_cluster_id

# remove all changefeeds, the safe_point will be cleared
cdc cli changefeed remove --changefeed-id=$changefeed_id2 --pd=$pd_addr
ensure $MAX_RETRIES check_changefeed_state $pd_addr $changefeed_id2 "removed"
ensure $MAX_RETRIES check_safepoint_cleared $pd_addr $pd_cluster_id

cleanup_process $CDC_BINARY
Expand Down

0 comments on commit c807e89

Please sign in to comment.