Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#6882
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
  • Loading branch information
asddongmen authored and ti-chi-bot committed Aug 25, 2022
1 parent 719e8e3 commit 874d8e8
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 6 deletions.
152 changes: 146 additions & 6 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,16 @@ type changefeed struct {
sink DDLSink
ddlPuller DDLPuller
initialized bool
<<<<<<< HEAD
=======
// isRemoved is true if the changefeed is removed,
// which means it will be removed from memory forever
isRemoved bool
// isReleased is true if the changefeed's resource is released
// but it will still be kept in the memory and it will be check
// in every tick
isReleased bool
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))

// only used for asyncExecDDL function
// ddlEventCache is not nil when the changefeed is executing a DDL event asynchronously
Expand All @@ -57,10 +67,9 @@ type changefeed struct {
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
wg sync.WaitGroup
// The changefeed will start a backend goroutine in the function `initialize` for DDLPuller
// `ddlWg` is used to manage this backend goroutine.
ddlWg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
Expand Down Expand Up @@ -147,7 +156,17 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
}

if !c.feedStateManager.ShouldRunning() {
<<<<<<< HEAD
c.releaseResources()
=======
c.isRemoved = c.feedStateManager.ShouldRemoved()
log.Info("fizz should running is false")
c.releaseResources(ctx)
return nil
}

if adminJobPending {
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
return nil
}

Expand Down Expand Up @@ -189,10 +208,20 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
return nil
}

<<<<<<< HEAD
func (c *changefeed) initialize(ctx cdcContext.Context) error {
if c.initialized {
=======
func (c *changefeed) initialize(ctx cdcContext.Context) (err error) {
if c.initialized || c.state.Status == nil {
// If `c.state.Status` is nil it means the changefeed struct is just created, it needs to
// 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or
// 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed.
// And then it can continue to initialize.
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
return nil
}
c.isReleased = false
// clean the errCh
// When the changefeed is resumed after being stopped, the changefeed instance will be reused,
// So we should make sure that the errCh is empty when the changefeed is restarting
Expand Down Expand Up @@ -226,8 +255,32 @@ LOOP:
//
// See more gc doc.
ensureTTL := int64(10 * 60)
<<<<<<< HEAD
err := gc.EnsureChangefeedStartTsSafety(
ctx, ctx.GlobalVars().PDClient, c.state.ID, ensureTTL, checkpointTs)
=======
err = gc.EnsureChangefeedStartTsSafety(
ctx, c.upstream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceInitializing),
c.id, ensureTTL, checkpointTs)
if err != nil {
return errors.Trace(err)
}
// clean service GC safepoint '-creating-' and '-resuming-' if there are any.
err = gc.UndoEnsureChangefeedStartTsSafety(
ctx, c.upstream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceCreating),
c.id,
)
if err != nil {
return errors.Trace(err)
}
err = gc.UndoEnsureChangefeedStartTsSafety(
ctx, c.upstream.PDClient,
ctx.GlobalVars().EtcdClient.GetEnsureGCServiceID(gc.EnsureGCServiceResuming),
c.id,
)
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
if err != nil {
return errors.Trace(err)
}
Expand All @@ -240,7 +293,11 @@ LOOP:
// the DDL barrier to the correct start point.
c.barriers.Update(ddlJobBarrier, checkpointTs-1)
c.barriers.Update(finishBarrier, c.state.Info.GetTargetTs())
<<<<<<< HEAD
var err error
=======

>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
// Note that (checkpointTs == ddl.FinishedTs) DOES NOT imply that the DDL has been completed executed.
// So we need to process all DDLs from the range [checkpointTs, ...), but since the semantics of start-ts requires
// the lower bound of an open interval, i.e. (startTs, ...), we pass checkpointTs-1 as the start-ts to initialize
Expand All @@ -261,19 +318,37 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
c.wg.Add(1)
c.ddlWg.Add(1)
go func() {
defer c.wg.Done()
defer c.ddlWg.Done()
ctx.Throw(c.ddlPuller.Run(cancelCtx))
}()

<<<<<<< HEAD
=======
stdCtx := contextutil.PutChangefeedIDInCtx(cancelCtx, c.id)
redoManagerOpts := redo.NewOwnerManagerOptions(c.errCh)
mgr, err := redo.NewManager(stdCtx, c.state.Info.Config.Consistent, redoManagerOpts)
c.redoManager = mgr
failpoint.Inject("ChangefeedNewRedoManagerError", func() {
err = errors.New("changefeed new redo manager injected error")
})
if err != nil {
return err
}
log.Info("owner creates redo manager",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID))

>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
// init metrics
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.WithLabelValues(c.id)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)

c.initialized = true
<<<<<<< HEAD
return nil
}

Expand All @@ -297,6 +372,71 @@ func (c *changefeed) releaseResources() {

changefeedCheckpointTsGauge.DeleteLabelValues(c.id)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id)
=======
log.Info("changefeed initialized",
zap.String("namespace", c.state.ID.Namespace),
zap.String("changefeed", c.state.ID.ID),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", resolvedTs),
zap.Stringer("info", c.state.Info))

return nil
}

// releaseResources is idempotent.
func (c *changefeed) releaseResources(ctx cdcContext.Context) {
if c.isReleased {
log.Info("fizzzz")
return
}
log.Info("fizzzz2")
// Must clean redo manager before calling cancel, otherwise
// the manager can be closed internally.
c.cleanupRedoManager(ctx)
c.cleanupChangefeedServiceGCSafePoints(ctx)

c.cancel()
c.cancel = func() {}

if c.ddlPuller != nil {
c.ddlPuller.Close()
}
c.ddlWg.Wait()

if c.sink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// TODO(dongmen): remove ctx from func sink.close(), it is useless.
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.close(canceledCtx); err != nil {
log.Warn("owner close sink failed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Error(err))
}
}

if c.scheduler != nil {
c.scheduler.Close(ctx)
c.scheduler = nil
}

c.cleanupMetrics()
c.schema = nil
c.barriers = nil
c.initialized = false
c.isReleased = true

log.Info("changefeed closed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Stringer("info", c.state.Info), zap.Bool("isRemoved", c.isRemoved))
}

func (c *changefeed) cleanupMetrics() {
changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

Expand Down
75 changes: 75 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ import (
"sync/atomic"
"time"

<<<<<<< HEAD
"github.com/pingcap/check"
=======
"github.com/labstack/gommon/log"
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
"github.com/pingcap/errors"
timodel "github.com/pingcap/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
Expand Down Expand Up @@ -362,8 +366,79 @@ func (s *changefeedSuite) TestFinished(c *check.C) {
c.Assert(state.Info.State, check.Equals, model.StateFinished)
}

<<<<<<< HEAD
func (s *changefeedSuite) TestAddSpecialComment(c *check.C) {
defer testleak.AfterTest(c)()
=======
func TestRemoveChangefeed(t *testing.T) {
baseCtx, cancel := context.WithCancel(context.Background())
ctx := cdcContext.NewContext4Test(baseCtx, true)
info := ctx.ChangefeedVars().Info
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Info: info,
})
testChangefeedReleaseResource(t, ctx, cancel, dir, true /*expectedInitialized*/)
}

func TestRemovePausedChangefeed(t *testing.T) {
baseCtx, cancel := context.WithCancel(context.Background())
ctx := cdcContext.NewContext4Test(baseCtx, true)
info := ctx.ChangefeedVars().Info
info.State = model.StateStopped
dir := t.TempDir()
info.Config.Consistent = &config.ConsistentConfig{
Level: "eventual",
Storage: filepath.Join("nfs://", dir),
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: ctx.ChangefeedVars().ID,
Info: info,
})
testChangefeedReleaseResource(t, ctx, cancel, dir, false /*expectedInitialized*/)
}

func testChangefeedReleaseResource(
t *testing.T,
ctx cdcContext.Context,
cancel context.CancelFunc,
redoLogDir string,
expectedInitialized bool,
) {
cf, captures, tester := createChangefeed4Test(ctx, t)

// pre check
cf.Tick(ctx, captures)
tester.MustApplyPatches()

// initialize
cf.Tick(ctx, captures)
tester.MustApplyPatches()
require.Equal(t, cf.initialized, expectedInitialized)

// remove changefeed from state manager by admin job
cf.feedStateManager.PushAdminJob(&model.AdminJob{
CfID: cf.id,
Type: model.AdminRemove,
})
cf.isReleased = false
// changefeed tick will release resources
err := cf.tick(ctx, captures)
require.Nil(t, err)
cancel()
// check redo log dir is deleted
_, err = os.Stat(redoLogDir)
log.Error(err)
require.True(t, os.IsNotExist(err))
}

func TestAddSpecialComment(t *testing.T) {
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
testCase := []struct {
input string
result string
Expand Down
8 changes: 8 additions & 0 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,16 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64)

func (s *ddlSinkImpl) close(ctx context.Context) (err error) {
s.cancel()
<<<<<<< HEAD
if s.sink != nil {
err = s.sink.Close(ctx)
=======
// they will both be nil if changefeed return an error in initializing
if s.sinkV1 != nil {
err = s.sinkV1.Close(ctx)
} else if s.sinkV2 != nil {
err = s.sinkV2.Close()
>>>>>>> 48a12ea8a (changefeed (ticdc): fix owner stuck when closing a changefeed (#6882))
}
if s.syncPointStore != nil {
err = s.syncPointStore.Close()
Expand Down
15 changes: 15 additions & 0 deletions tests/integration_tests/changefeed_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@ function run() {
run_cdc_cli changefeed remove -c $changefeedid_2
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY

# make sure initialize changefeed error will not stuck the owner
export GO_FAILPOINTS='github.com/pingcap/tiflow/cdc/owner/ChangefeedNewRedoManagerError=2*return(true)'
run_cdc_server --workdir $WORK_DIR --binary $CDC_BINARY

changefeedid_3="changefeed-initialize-error"
run_cdc_cli changefeed create --start-ts=0 --sink-uri="$SINK_URI" -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed pause -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "stopped" "changefeed new redo manager injected error" ""
run_cdc_cli changefeed resume -c $changefeedid_3
ensure $MAX_RETRIES check_changefeed_state http://${UP_PD_HOST_1}:${UP_PD_PORT_1} ${changefeedid_3} "normal" "null" ""
run_cdc_cli changefeed remove -c $changefeedid_3
export GO_FAILPOINTS=''
cleanup_process $CDC_BINARY
}

trap stop_tidb_cluster EXIT
Expand Down

0 comments on commit 874d8e8

Please sign in to comment.