Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed (ticdc): fix owner stuck when closing a changefeed #6882

Merged
merged 9 commits into from
Aug 25, 2022
84 changes: 54 additions & 30 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ type changefeed struct {
initialized bool
// isRemoved is true if the changefeed is removed
isRemoved bool
// isReleased is true if the changefeed's resource is released
isReleased bool
asddongmen marked this conversation as resolved.
Show resolved Hide resolved

// only used for asyncExecDDL function
// ddlEventCache is not nil when the changefeed is executing
Expand All @@ -103,10 +105,9 @@ type changefeed struct {
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
wg sync.WaitGroup
// The changefeed will start a backend goroutine in the function `initialize` for DDLPuller
// `ddlWg` is used to manage this backend goroutine.
ddlWg sync.WaitGroup

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsGauge prometheus.Gauge
Expand Down Expand Up @@ -375,14 +376,25 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]*
return nil
}

func (c *changefeed) initialize(ctx cdcContext.Context) error {
func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
if c.initialized || c.state.Status == nil {
// If `c.state.Status` is nil it means the changefeed struct is just created, it needs to
// 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or
// 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed.
// And then it can continue to initialize.
return nil
}
c.isReleased = false
defer func() {
if err != nil {
log.Error("an error occur when changefeed initializing, release resource",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err),
)
c.releaseResources(ctx)
}
}()
// clean the errCh
// When the changefeed is resumed after being stopped, the changefeed instance will be reused,
// So we should make sure that the errCh is empty when the changefeed is restarting
Expand Down Expand Up @@ -417,7 +429,7 @@ LOOP:
//
// See more gc doc.
ensureTTL := int64(10 * 60)
err := gc.EnsureChangefeedStartTsSafety(
err = gc.EnsureChangefeedStartTsSafety(
ctx, c.upstream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceInitializing),
c.id, ensureTTL, checkpointTs)
Expand Down Expand Up @@ -453,7 +465,6 @@ LOOP:
c.barriers.Update(ddlJobBarrier, checkpointTs-1)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())

var err error
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
Expand All @@ -475,16 +486,19 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.wg.Add(1)
c.ddlWg.Add(1)
go func() {
defer c.wg.Done()
defer c.ddlWg.Done()
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -513,7 +527,6 @@ LOOP:
}

c.initialized = true

log.Info("changefeed initialized",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
Expand All @@ -524,37 +537,48 @@ LOOP:
return nil
}

// releaseResources is idempotent.
func (c *changefeed) releaseResources(ctx cdcContext.Context) {
asddongmen marked this conversation as resolved.
Show resolved Hide resolved
if c.isReleased {
return
}
// Must clean redo manager before calling cancel, otherwise
// the manager can be closed internally.
c.cleanupRedoManager(ctx)

if !c.initialized {
c.cleanupChangefeedServiceGCSafePoints(ctx)
return
}
c.cleanupChangefeedServiceGCSafePoints(ctx)

c.cancel()
c.cancel = func() {}
c.ddlPuller.Close()
c.schema = nil
c.cleanupChangefeedServiceGCSafePoints(ctx)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("owner close sink failed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))

if c.ddlPuller != nil {
c.ddlPuller.Close()
c.ddlPuller = nil
}
c.ddlWg.Wait()

if c.sink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// TODO(dongmen): remove ctx from func sink.close(), it is useless.
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("owner close sink failed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
}
}

if c.scheduler != nil {
c.scheduler.Close(ctx)
c.scheduler = nil
}
c.wg.Wait()
c.scheduler.Close(ctx)
c.scheduler = nil
c.barriers = nil

c.cleanupMetrics()
c.schema = nil
c.barriers = nil
c.initialized = false
c.isReleased = true
asddongmen marked this conversation as resolved.
Show resolved Hide resolved

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
Expand Down
3 changes: 2 additions & 1 deletion cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,9 +338,10 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64)

func (s *ddlSinkImpl) close(ctx context.Context) (err error) {
s.cancel()
// they will both be nil if changefeed return an error in initializing
if s.sinkV1 != nil {
err = s.sinkV1.Close(ctx)
} else {
} else if s.sinkV2 != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this may be nil in the test.

err = s.sinkV2.Close()
}
if s.syncPointStore != nil {
Expand Down
15 changes: 15 additions & 0 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,21 @@ function run() {
run_cdc_cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY

# make sure initialize changefeed error will not stuck the owner
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changfeedid_3="changefeed-initialize-error"
run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" ""
run_cdc_cli changefeed pause -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" ""
run_cdc_cli changefeed resume -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" ""
run_cdc_cli changefeed remove -c $changefeedid_3
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
Expand Down