From edccee634d1a6d01f130b9abdd3dd8afb81fc2ac Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 25 Aug 2022 14:31:10 +0800 Subject: [PATCH 1/5] owner (ticdc): fix owner stuck --- cdc/owner/changefeed.go | 84 ++++++++++++------- cdc/owner/ddl_sink.go | 3 +- .../integration_tests/changefeed_error/run.sh | 15 ++++ 3 files changed, 71 insertions(+), 31 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index f948f3bdcaa..f3862ba927a 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -87,6 +87,8 @@ type changefeed struct { initialized bool // isRemoved is true if the changefeed is removed isRemoved bool + // isReleased is true if the changefeed's resource is released + isReleased bool // only used for asyncExecDDL function // ddlEventCache is not nil when the changefeed is executing @@ -103,10 +105,9 @@ type changefeed struct { // cancel the running goroutine start by `DDLPuller` cancel context.CancelFunc - // The changefeed will start some backend goroutines in the function `initialize`, - // such as DDLPuller, DDLSink, etc. - // `wg` is used to manage those backend goroutines. - wg sync.WaitGroup + // The changefeed will start a backend goroutine in the function `initialize` for DDLPuller + // `ddlWg` is used to manage this backend goroutine. + ddlWg sync.WaitGroup metricsChangefeedBarrierTsGauge prometheus.Gauge metricsChangefeedCheckpointTsGauge prometheus.Gauge @@ -375,10 +376,21 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* return nil } -func (c *changefeed) initialize(ctx cdcContext.Context) error { +func (c *changefeed) initialize(ctx cdcContext.Context) (err error) { if c.initialized { return nil } + c.isReleased = false + defer func() { + if err != nil { + log.Error("an error occur when changefeed initializing, release resource", + zap.String("namespace", c.id.Namespace), + zap.String("changefeed", c.id.ID), + zap.Error(err), + ) + c.releaseResources(ctx) + } + }() // clean the errCh // When the changefeed is resumed after being stopped, the changefeed instance will be reused, // So we should make sure that the errCh is empty when the changefeed is restarting @@ -416,7 +428,7 @@ LOOP: // // See more gc doc. ensureTTL := int64(10 * 60) - err := gc.EnsureChangefeedStartTsSafety( + err = gc.EnsureChangefeedStartTsSafety( ctx, c.upstream.PDClient, ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceInitializing), c.id, ensureTTL, checkpointTs) @@ -451,7 +463,6 @@ LOOP: c.barriers.Update(ddlJobBarrier, checkpointTs-1) c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs()) - var err error // Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed. // So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires // the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize @@ -473,9 +484,9 @@ LOOP: if err != nil { return errors.Trace(err) } - c.wg.Add(1) + c.ddlWg.Add(1) go func() { - defer c.wg.Done() + defer c.ddlWg.Done() ctx.Throw(c.ddlPuller.Run(cancelCtx)) }() @@ -483,6 +494,9 @@ LOOP: redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh) mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts) c.redoManager = mgr + failpoint.Inject("ChangefeedNewRedoManagerError", func() { + err = errors.New("changefeed new redo manager injected error") + }) if err != nil { return err } @@ -511,7 +525,6 @@ LOOP: } c.initialized = true - log.Info("changefeed initialized", zap.String("namespace", c.state.ID.Namespace), zap.String("changefeed", c.state.ID.ID), @@ -522,37 +535,48 @@ LOOP: return nil } +// releaseResources is idempotent. func (c *changefeed) releaseResources(ctx cdcContext.Context) { + if c.isReleased { + return + } // Must clean redo manager before calling cancel, otherwise // the manager can be closed internally. c.cleanupRedoManager(ctx) - - if !c.initialized { - c.cleanupChangefeedServiceGCSafePoints(ctx) - return - } + c.cleanupChangefeedServiceGCSafePoints(ctx) c.cancel() c.cancel = func() {} - c.ddlPuller.Close() - c.schema = nil - c.cleanupChangefeedServiceGCSafePoints(ctx) - canceledCtx, cancel := context.WithCancel(context.Background()) - cancel() - // We don't need to wait sink Close, pass a canceled context is ok - if err := c.sink.close(canceledCtx); err != nil { - log.Warn("owner close sink failed", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err)) + + if c.ddlPuller != nil { + c.ddlPuller.Close() + c.ddlPuller = nil + } + c.ddlWg.Wait() + + if c.sink != nil { + canceledCtx, cancel := context.WithCancel(context.Background()) + cancel() + // TODO(dongmen): remove ctx from func sink.close(), it is useless. + // We don't need to wait sink Close, pass a canceled context is ok + if err := c.sink.close(canceledCtx); err != nil { + log.Warn("owner close sink failed", + zap.String("namespace", c.id.Namespace), + zap.String("changefeed", c.id.ID), + zap.Error(err)) + } + } + + if c.scheduler != nil { + c.scheduler.Close(ctx) + c.scheduler = nil } - c.wg.Wait() - c.scheduler.Close(ctx) - c.scheduler = nil - c.barriers = nil c.cleanupMetrics() + c.schema = nil + c.barriers = nil c.initialized = false + c.isReleased = true log.Info("changefeed closed", zap.String("namespace", c.id.Namespace), diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index 1007f8cef78..3eb1c38a869 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -338,9 +338,10 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) func (s *ddlSinkImpl) close(ctx context.Context) (err error) { s.cancel() + // they will both be nil if changefeed return an error in initializing if s.sinkV1 != nil { err = s.sinkV1.Close(ctx) - } else { + } else if s.sinkV2 != nil { err = s.sinkV2.Close() } if s.syncPointStore != nil { diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 4aa1a52a9ae..364ccf285ae 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -102,6 +102,21 @@ function run() { run_cdc_cli changefeed remove -c $changefeedid_2 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY + + # make sure initialize changefeed error will not stuck the owner + export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' + run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY + + changfeedid_3="changefeed-initialize-error" + run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" "" + run_cdc_cli changefeed pause -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" "" + run_cdc_cli changefeed resume -c $changefeedid_3 + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" "" + run_cdc_cli changefeed remove -c $changefeedid_3 + export GO_FAILPOINTS='' + cleanup_process $CDC_BINARY } trap stop_tidb_cluster EXIT From d0a0212501dffe69489e777a39eeb5b2cc9009b9 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 25 Aug 2022 15:01:29 +0800 Subject: [PATCH 2/5] fix it error --- tests/integration_tests/changefeed_error/run.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 364ccf285ae..22d389766d3 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -107,7 +107,7 @@ function run() { export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)' run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY - changfeedid_3="changefeed-initialize-error" + changefeedid_3="changefeed-initialize-error" run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" "" run_cdc_cli changefeed pause -c $changefeedid_3 From da409a244abf07f43772d53f24503da482706d0e Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 25 Aug 2022 15:22:56 +0800 Subject: [PATCH 3/5] address comment --- cdc/owner/changefeed.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index db08caffc54..1f93e7b8785 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -85,9 +85,12 @@ type changefeed struct { sink DDLSink ddlPuller puller.DDLPuller initialized bool - // isRemoved is true if the changefeed is removed + // isRemoved is true if the changefeed is removed, + // which means it will be removed from memory forever isRemoved bool // isReleased is true if the changefeed's resource is released + // but it will still be kept in the memory and it will be check + // in every tick isReleased bool // only used for asyncExecDDL function @@ -385,16 +388,6 @@ func (c *changefeed) initialize(ctx cdcContext.Context) (err error) { return nil } c.isReleased = false - defer func() { - if err != nil { - log.Error("an error occur when changefeed initializing, release resource", - zap.String("namespace", c.id.Namespace), - zap.String("changefeed", c.id.ID), - zap.Error(err), - ) - c.releaseResources(ctx) - } - }() // clean the errCh // When the changefeed is resumed after being stopped, the changefeed instance will be reused, // So we should make sure that the errCh is empty when the changefeed is restarting From c15093f312f2943f71b52dbba77141855a806224 Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 25 Aug 2022 15:40:59 +0800 Subject: [PATCH 4/5] fix it error --- tests/integration_tests/changefeed_error/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration_tests/changefeed_error/run.sh b/tests/integration_tests/changefeed_error/run.sh index 22d389766d3..980dc01eaf6 100755 --- a/tests/integration_tests/changefeed_error/run.sh +++ b/tests/integration_tests/changefeed_error/run.sh @@ -109,11 +109,11 @@ function run() { changefeedid_3="changefeed-initialize-error" run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" run_cdc_cli changefeed pause -c $changefeedid_3 ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" "" run_cdc_cli changefeed resume -c $changefeedid_3 - ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "" "" + ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" "" run_cdc_cli changefeed remove -c $changefeedid_3 export GO_FAILPOINTS='' cleanup_process $CDC_BINARY From 555aca28cc3f7bda5e34f327748858cd5ad76f0f Mon Sep 17 00:00:00 2001 From: dongmen <414110582@qq.com> Date: Thu, 25 Aug 2022 16:40:25 +0800 Subject: [PATCH 5/5] fix ut --- cdc/owner/changefeed.go | 4 +++- cdc/owner/changefeed_test.go | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 1f93e7b8785..d593f58a4ce 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -254,6 +254,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, captures map[model.CaptureID]* if !c.feedStateManager.ShouldRunning() { c.isRemoved = c.feedStateManager.ShouldRemoved() + log.Info("fizz should running is false") c.releaseResources(ctx) return nil } @@ -533,8 +534,10 @@ LOOP: // releaseResources is idempotent. func (c *changefeed) releaseResources(ctx cdcContext.Context) { if c.isReleased { + log.Info("fizzzz") return } + log.Info("fizzzz2") // Must clean redo manager before calling cancel, otherwise // the manager can be closed internally. c.cleanupRedoManager(ctx) @@ -545,7 +548,6 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { if c.ddlPuller != nil { c.ddlPuller.Close() - c.ddlPuller = nil } c.ddlWg.Wait() diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index b64e6455974..33a8cbacbaf 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -24,6 +24,7 @@ import ( "testing" "time" + "github.com/labstack/gommon/log" "github.com/pingcap/errors" timodel "github.com/pingcap/tidb/parser/model" "github.com/pingcap/tiflow/cdc/entry" @@ -533,12 +534,14 @@ func testChangefeedReleaseResource( CfID: cf.id, Type: model.AdminRemove, }) + cf.isReleased = false // changefeed tick will release resources err := cf.tick(ctx, captures) require.Nil(t, err) cancel() // check redo log dir is deleted _, err = os.Stat(redoLogDir) + log.Error(err) require.True(t, os.IsNotExist(err)) }