Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix a backoff bug & enable pause resume test #498

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .github/workflows/check_and_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ jobs:
pwd && ls -l bin/ && ls -l tools/bin/
export TICDC_NEWARCH=true && make integration_test CASE=charset_gbk

- name: Test changefeed_pause_resume
if: ${{ success() }}
run: |
export TICDC_NEWARCH=true && make integration_test CASE=changefeed_pause_resume

# only upload logs of the last case
- name: Copy logs to hack permission
if: ${{ always() }}
Expand All @@ -99,6 +104,7 @@ jobs:
tail -n 10 $DIR/cdc.log
sudo cp -r -L $DIR/{cdc.log,stdout.log,sync_diff} ./logs/$CASE/
sudo chown -R runner ./logs
sudo tar -czvf ./logs.tar.gz ./logs

# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
- name: Upload logs
Expand All @@ -107,4 +113,4 @@ jobs:
with:
name: upstream-switch-logs
path: |
./logs
./logs.tar.gz
11 changes: 6 additions & 5 deletions coordinator/changefeed/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,7 @@ func (m *Backoff) ShouldRun() bool {
func (m *Backoff) shouldFailWhenRetry() bool {
// NextBackOff() will return -1 once the MaxElapsedTime has elapsed,
// set the changefeed to failed state.
if m.backoffInterval == m.errBackoff.Stop {
return true
}
return false
return m.backoffInterval == m.errBackoff.Stop
}

// resetErrRetry reset the error retry related fields
Expand Down Expand Up @@ -154,6 +151,11 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea
log.Warn("changefeed meets an error, will be stopped",
zap.Any("error", errs))

if !m.isRestarting.Load() {
// errBackoff may be stopped, reset it before the first retry.
m.resetErrRetry()
m.isRestarting.Store(true)
}
// set the next retry time
m.backoffInterval = m.errBackoff.NextBackOff()
m.nextRetryTime = atomic.NewTime(time.Now().Add(m.backoffInterval))
Expand All @@ -169,7 +171,6 @@ func (m *Backoff) HandleError(errs []*heartbeatpb.RunningError) (bool, *heartbea
)
return true, lastError
}
m.isRestarting.Store(true)
// patch the last error to changefeed info
return false, lastError
}
8 changes: 8 additions & 0 deletions coordinator/changefeed/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"time"

"github.com/benbjohnson/clock"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -26,6 +27,13 @@ import (
func TestShouldFailWhenRetry(t *testing.T) {
backoff := NewBackoff(common.NewChangeFeedIDWithName("test"), time.Minute*30, 1)
require.True(t, backoff.ShouldRun())

// stop the backoff
mc := clock.NewMock()
mc.Set(time.Now())
backoff.errBackoff.Clock = mc
mc.Add(backoff.errBackoff.MaxElapsedTime + 1)

changefeed, state, err := backoff.CheckStatus(&heartbeatpb.MaintainerStatus{
CheckpointTs: 1,
Err: []*heartbeatpb.RunningError{
Expand Down
1 change: 1 addition & 0 deletions coordinator/changefeed/changefeed_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (db *ChangefeedDB) StopByChangefeedID(cfID common.ChangeFeedID, remove bool
db.changefeeds[cfID] = cf
db.stopped[cfID] = cf
}
log.Info("stop changefeed", zap.String("changefeed", cfID.String()))

nodeID := cf.GetNodeID()
if cf.GetNodeID() == "" {
Expand Down
42 changes: 15 additions & 27 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ type Controller struct {

bootstrapper *bootstrap.Bootstrapper[heartbeatpb.CoordinatorBootstrapResponse]

stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
operatorControllerHandle *threadpool.TaskHandle
schedulerHandle *threadpool.TaskHandle
backend changefeed.Backend
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
taskHandlers []*threadpool.TaskHandle
backend changefeed.Backend

updatedChangefeedCh chan map[common.ChangeFeedID]*changefeed.Changefeed
stateChangedCh chan *ChangefeedStateChangeEvent
Expand Down Expand Up @@ -125,9 +124,7 @@ func NewController(
for _, msg := range c.bootstrapper.HandleNewNodes(newNodes) {
_ = c.messageCenter.SendCommand(msg)
}
submitScheduledEvent(c.taskScheduler, c.stream, &Event{
eventType: EventPeriod,
}, time.Now().Add(time.Millisecond*500))
c.submitPeriodTask()
return c
}

Expand Down Expand Up @@ -160,9 +157,6 @@ func (c *Controller) onPeriodTask() {
// resend bootstrap message
c.sendMessages(c.bootstrapper.ResendBootstrapMessage())
c.collectMetrics()
submitScheduledEvent(c.taskScheduler, c.stream, &Event{
eventType: EventPeriod,
}, time.Now().Add(time.Millisecond*500))
}

func (c *Controller) onMessage(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -361,17 +355,15 @@ func (c *Controller) FinishBootstrap(workingMap map[common.ChangeFeedID]remoteMa
}

// start operator and scheduler
c.operatorControllerHandle = c.taskScheduler.Submit(c.cfScheduller, time.Now())
c.schedulerHandle = c.taskScheduler.Submit(c.operatorController, time.Now())
operatorControllerHandle := c.taskScheduler.Submit(c.cfScheduller, time.Now())
schedulerHandle := c.taskScheduler.Submit(c.operatorController, time.Now())
c.taskHandlers = append(c.taskHandlers, operatorControllerHandle, schedulerHandle)
c.bootstrapped.Store(true)
}

func (c *Controller) Stop() {
if c.operatorControllerHandle != nil {
c.operatorControllerHandle.Cancel()
}
if c.schedulerHandle != nil {
c.schedulerHandle.Cancel()
for _, h := range c.taskHandlers {
h.Cancel()
}
}

Expand Down Expand Up @@ -505,17 +497,13 @@ func (c *Controller) RemoveNode(id node.ID) {
c.operatorController.OnNodeRemoved(id)
}

// submitScheduledEvent submits a task to controller pool to send a future event
func submitScheduledEvent(
scheduler threadpool.ThreadPool,
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler],
event *Event,
scheduleTime time.Time) {
func (c *Controller) submitPeriodTask() {
task := func() time.Time {
stream.In() <- event
return time.Time{}
c.stream.In() <- &Event{eventType: EventPeriod}
return time.Now().Add(time.Millisecond * 500)
}
scheduler.SubmitFunc(task, scheduleTime)
periodTaskhandler := c.taskScheduler.SubmitFunc(task, time.Now().Add(time.Millisecond*500))
c.taskHandlers = append(c.taskHandlers, periodTaskhandler)
}

func (c *Controller) newBootstrapMessage(id node.ID) *messaging.TargetMessage {
Expand Down
9 changes: 0 additions & 9 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/coordinator/changefeed"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
"github.com/pingcap/ticdc/pkg/config"
Expand Down Expand Up @@ -247,14 +246,6 @@ func (c *coordinator) sendMessages(msgs []*messaging.TargetMessage) {
}
}

func (c *coordinator) newBootstrapMessage(id node.ID) *messaging.TargetMessage {
log.Info("send coordinator bootstrap request", zap.Any("to", id))
return messaging.NewSingleTargetMessage(
id,
messaging.MaintainerManagerTopic,
&heartbeatpb.CoordinatorBootstrapRequest{Version: c.version})
}

func (c *coordinator) updateGCSafepoint(
ctx context.Context,
) error {
Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions coordinator/operator/operator_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,9 @@ func (oc *Controller) AddOperator(op operator.Operator[common.ChangeFeedID, *hea
oc.lock.Lock()
defer oc.lock.Unlock()

if _, ok := oc.operators[op.ID()]; ok {
if pre, ok := oc.operators[op.ID()]; ok {
log.Info("add operator failed, operator already exists",
zap.String("operator", op.String()))
zap.Stringer("operator", op), zap.Stringer("previousOperator", pre.OP))
return false
}
cf := oc.changefeedDB.GetByID(op.ID())
Expand Down
2 changes: 1 addition & 1 deletion coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *Scheduler) Execute() time.Time {
absent, nodeSize := s.changefeedDB.GetWaitingSchedulingChangefeeds(s.absent, availableSize)
// add the absent node to the node size map
// todo: use the bootstrap nodes
for id, _ := range s.nodeManager.GetAliveNodes() {
for id := range s.nodeManager.GetAliveNodes() {
if _, ok := nodeSize[id]; !ok {
nodeSize[id] = 0
}
Expand Down
6 changes: 6 additions & 0 deletions tests/integration_tests/changefeed_pause_resume/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ function run() {
done

for i in $(seq 1 10); do
echo "Run $i test" # && read
cdc cli changefeed pause --changefeed-id=$changefeed_id --pd=$pd_addr

for j in $(seq 1 $TABLE_COUNT); do
Expand All @@ -66,6 +67,11 @@ function run() {
cdc cli changefeed resume --changefeed-id=$changefeed_id --pd=$pd_addr

check_sync_diff $WORK_DIR $CUR/conf/diff_config.toml

# 1. wait checkpoint ts updated to etcd
# 2. wait dispatch closed
# NOTICE: remove this sleep after safemode is supported in dispatcher
sleep 10
done

cleanup_process $CDC_BINARY
Expand Down
Loading