Skip to content

Commit

Permalink
changefeed (ticdc): fix owner stuck when closing a changefeed (#6882) (
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Aug 26, 2022
1 parent 8db8104 commit 834e6c8
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 31 deletions.
75 changes: 45 additions & 30 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,13 @@ type changefeed struct {
sink DDLSink
ddlPuller DDLPuller
initialized bool
// isRemoved is true if the changefeed is removed
// isRemoved is true if the changefeed is removed,
// which means it will be removed from memory forever
isRemoved bool
// isReleased is true if the changefeed's resource is released
// but it will still be kept in the memory and it will be check
// in every tick
isReleased bool

// only used for asyncExecDDL function
// ddlEventCache is not nil when the changefeed is executing
Expand All @@ -92,10 +97,9 @@ type changefeed struct {
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
wg sync.WaitGroup
// The changefeed will start a backend goroutine in the function `initialize` for DDLPuller
// `ddlWg` is used to manage this backend goroutine.
ddlWg sync.WaitGroup

metricsChangefeedBarrierTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsGauge prometheus.Gauge
Expand Down Expand Up @@ -330,14 +334,15 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
return nil
}

func (c *changefeed) initialize(ctx cdcContext.Context) error {
func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
if c.initialized || c.state.Status == nil {
// If `c.state.Status` is nil it means the changefeed struct is just created, it needs to
// 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or
// 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed.
// And then it can continue to initialize.
return nil
}
c.isReleased = false
// clean the errCh
// When the changefeed is resumed after being stopped, the changefeed instance will be reused,
// So we should make sure that the errCh is empty when the changefeed is restarting
Expand Down Expand Up @@ -401,7 +406,7 @@ LOOP:
// TODO: get DDL barrier based on resolvedTs.
c.barriers.Update(ddlJobBarrier, checkpointTs-1)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
var err error

// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
Expand All @@ -422,16 +427,19 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.wg.Add(1)
c.ddlWg.Add(1)
go func() {
defer c.wg.Done()
defer c.ddlWg.Done()
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
Expand Down Expand Up @@ -470,35 +478,41 @@ LOOP:
return nil
}

// releaseResources is idempotent.
func (c *changefeed) releaseResources(ctx cdcContext.Context) {
if c.isReleased {
return
}
// Must clean redo manager before calling cancel, otherwise
// the manager can be closed internally.
c.cleanupRedoManager(ctx)
c.cleanupChangefeedServiceGCSafePoints(ctx)

if !c.initialized {
c.cleanupChangefeedServiceGCSafePoints(ctx)
return
}
log.Info("close changefeed",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Stringer("info", c.state.Info), zap.Bool("isRemoved", c.isRemoved))
c.cancel()
c.cancel = func() {}
c.ddlPuller.Close()

if c.ddlPuller != nil {
c.ddlPuller.Close()
}
c.ddlWg.Wait()

if c.sink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("owner close sink failed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
}
}

if c.scheduler != nil {
c.scheduler.Close(ctx)
}

c.schema = nil
c.cleanupChangefeedServiceGCSafePoints(ctx)
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("Closing sink failed in Owner",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Error(err))
}
c.wg.Wait()
c.scheduler.Close(ctx)

changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
Expand All @@ -516,6 +530,7 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedBarrierTsGauge = nil

c.isReleased = true
c.initialized = false
}

Expand Down
3 changes: 3 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

"github.com/labstack/gommon/log"
"github.com/pingcap/errors"
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
Expand Down Expand Up @@ -539,12 +540,14 @@ func testChangefeedReleaseResource(
CfID: cf.id,
Type: model.AdminRemove,
})
cf.isReleased = false
// changefeed tick will release resources
err := cf.tick(ctx, state, captures)
require.Nil(t, err)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))
}

Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ require (
upper.io/db.v3 v3.7.1+incompatible
)

require github.com/labstack/gommon v0.3.0

require (
cloud.google.com/go v0.100.2 // indirect
cloud.google.com/go/compute v1.2.0 // indirect
Expand Down Expand Up @@ -182,7 +184,6 @@ require (
github.com/kr/pretty v0.3.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/labstack/echo/v4 v4.2.1 // indirect
github.com/labstack/gommon v0.3.0 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/lufia/plan9stats v0.0.0-20220326011226-f1430873d8db // indirect
github.com/mailru/easyjson v0.7.7 // indirect
Expand Down
15 changes: 15 additions & 0 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,21 @@ function run() {
run_cdc_cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY

# make sure initialize changefeed error will not stuck the owner
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_3="changefeed-initialize-error"
run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed pause -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" ""
run_cdc_cli changefeed resume -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed remove -c $changefeedid_3
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
Expand Down

0 comments on commit 834e6c8

Please sign in to comment.