From 834e6c8abd8d62edc1080d7fa4d2e5bce6f737c4 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 26 Aug 2022 09:54:21 +0800 Subject: [PATCH] changefeed (ticdc): fix owner stuck when closing a changefeed (#6882) (#6887) close pingcap/tiflow#6859 --- cdc/owner/changefeed.go | 75 +++++++++++-------- cdc/owner/changefeed_test.go | 3 + go.mod | 3 +- .../integration_tests/changefeed_error/run.sh | 15 ++++ 4 files changed, 65 insertions(+), 31 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 6a1550813d0..466853585bd 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -74,8 +74,13 @@ type changefeed struct { sink DDLSink ddlPuller DDLPuller initialized bool - // isRemoved is true if the changefeed is removed + // isRemoved is true if the changefeed is removed, + // which means it will be removed from memory forever isRemoved bool + // isReleased is true if the changefeed's resource is released + // but it will still be kept in the memory and it will be check + // in every tick + isReleased bool // only used for asyncExecDDL function // ddlEventCache is not nil when the changefeed is executing @@ -92,10 +97,9 @@ type changefeed struct { // cancel the running goroutine start by `DDLPuller` cancel context.CancelFunc - // The changefeed will start some backend goroutines in the function `initialize`, - // such as DDLPuller, DDLSink, etc. - // `wg` is used to manage those backend goroutines. - wg sync.WaitGroup + // The changefeed will start a backend goroutine in the function `initialize` for DDLPuller + // `ddlWg` is used to manage this backend goroutine. + ddlWg sync.WaitGroup metricsChangefeedBarrierTsGauge prometheus.Gauge metricsChangefeedCheckpointTsGauge prometheus.Gauge @@ -330,7 +334,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed return nil } -func (c *changefeed) initialize(ctx cdcContext.Context) error { +func (c *changefeed) initialize(ctx cdcContext.Context) (err error) { if c.initialized || c.state.Status == nil { // If `c.state.Status` is nil it means the changefeed struct is just created, it needs to // 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or @@ -338,6 +342,7 @@ func (c *changefeed) initialize(ctx cdcContext.Context) error { // And then it can continue to initialize. return nil } + c.isReleased = false // clean the errCh // When the changefeed is resumed after being stopped, the changefeed instance will be reused, // So we should make sure that the errCh is empty when the changefeed is restarting @@ -401,7 +406,7 @@ LOOP: // TODO: get DDL barrier based on resolvedTs. c.barriers.Update(ddlJobBarrier, checkpointTs-1) c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) - var err error + // Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed. // So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires // the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize @@ -422,9 +427,9 @@ LOOP: if err != nil { return errors.Trace(err) } - c.wg.Add(1) + c.ddlWg.Add(1) go func() { - defer c.wg.Done() + defer c.ddlWg.Done() ctx.Throw(c.ddlPuller.Run(cancelCtx)) }() @@ -432,6 +437,9 @@ LOOP: redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh) mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts) c.redoManager = mgr + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + err = errors.New("changefeed new redo manager injected error") + }) if err != nil { return err } @@ -470,35 +478,41 @@ LOOP: return nil } +// releaseResources is idempotent. func (c *changefeed) releaseResources(ctx cdcContext.Context) { + if c.isReleased { + return + } // Must clean redo manager before calling cancel, otherwise // the manager can be closed internally. c.cleanupRedoManager(ctx) + c.cleanupChangefeedServiceGCSafePoints(ctx) - if !c.initialized { - c.cleanupChangefeedServiceGCSafePoints(ctx) - return - } - log.Info("close changefeed", - zap.String("namespace", c.state.ID.Namespace), - zap.String("changefeed", c.state.ID.ID), - zap.Stringer("info", c.state.Info), zap.Bool("isRemoved", c.isRemoved)) c.cancel() c.cancel = func() {} - c.ddlPuller.Close() + + if c.ddlPuller != nil { + c.ddlPuller.Close() + } + c.ddlWg.Wait() + + if c.sink != nil { + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() + // We don't need to wait sink Close, pass a canceled context is ok + if err := c.sink.close(canceledCtx); err != nil { + log.Warn("owner close sink failed", + zap.String("namespace", c.id.Namespace), + zap.String("changefeed", c.id.ID), + zap.Error(err)) + } + } + + if c.scheduler != nil { + c.scheduler.Close(ctx) + } + c.schema = nil - c.cleanupChangefeedServiceGCSafePoints(ctx) - canceledCtx, cancel := context.WithCancel(context.Background()) - cancel() - // We don't need to wait sink Close, pass a canceled context is ok - if err := c.sink.close(canceledCtx); err != nil { - log.Warn("Closing sink failed in Owner", - zap.String("namespace", c.state.ID.Namespace), - zap.String("changefeed", c.state.ID.ID), - zap.Error(err)) - } - c.wg.Wait() - c.scheduler.Close(ctx) changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) @@ -516,6 +530,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID) c.metricsChangefeedBarrierTsGauge = nil + c.isReleased = true c.initialized = false } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 3470f2bf745..80e3422696d 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/labstack/gommon/log" "github.com/pingcap/errors" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/entry" @@ -539,12 +540,14 @@ func testChangefeedReleaseResource( CfID: cf.id, Type: model.AdminRemove, }) + cf.isReleased = false // changefeed tick will release resources err := cf.tick(ctx, state, captures) require.Nil(t, err) cancel() // check redo log dir is deleted _, err = os.Stat(redoLogDir) + log.Error(err) require.True(t, os.IsNotExist(err)) } diff --git a/go.mod b/go.mod index 727a34c2b08..a63437fef67 100644 --- a/go.mod +++ b/go.mod @@ -95,6 +95,8 @@ require ( upper.io/db.v3 v3.7.1+incompatible ) +require github.com/labstack/gommon v0.3.0 + require ( cloud.google.com/go v0.100.2 // indirect cloud.google.com/go/compute v1.2.0 // indirect @@ -182,7 +184,6 @@ require ( github.com/kr/pretty v0.3.0 // indirect github.com/kr/text v0.2.0 // indirect github.com/labstack/echo/v4 v4.2.1 // indirect - github.com/labstack/gommon v0.3.0 // indirect github.com/leodido/go-urn v1.2.1 // indirect github.com/lufia/plan9stats v0.0.0-20220326011226-f1430873d8db // indirect github.com/mailru/easyjson v0.7.7 // indirect diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 9a0d1b9f13b..8c1f5487e80 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -97,6 +97,21 @@ function run() { run_cdc_cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + + # make sure initialize changefeed error will not stuck the owner + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + changefeedid_3="changefeed-initialize-error" + run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" + run_cdc_cli changefeed pause -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" "" + run_cdc_cli changefeed resume -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" + run_cdc_cli changefeed remove -c $changefeedid_3 + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY } trap stop_tidb_cluster EXIT