Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): asynchronously create sink (#3598) #3961

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 0 additions & 187 deletions cdc/owner/async_sink.go

This file was deleted.

31 changes: 15 additions & 16 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type changefeed struct {
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
sink DDLSink
ddlPuller DDLPuller
initialized bool

Expand All @@ -51,14 +51,13 @@ type changefeed struct {
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent

errCh chan error
errCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, Sink, etc.
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
// But it only manages the DDLPuller for now.
// TODO: manage the Sink and other backend goroutines.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
Expand All @@ -67,7 +66,7 @@ type changefeed struct {
metricsChangefeedResolvedTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func(ctx cdcContext.Context) (AsyncSink, error)
newSink func() DDLSink
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
Expand All @@ -82,15 +81,15 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
}
c.newSink = newAsyncSink
return c
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -163,7 +162,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
default:
}

c.sink.EmitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(ctx, checkpointTs)
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -246,12 +245,12 @@ LOOP:
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel
c.sink, err = c.newSink(cancelCtx)
if err != nil {
return errors.Trace(err)
}

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
Expand Down Expand Up @@ -287,7 +286,7 @@ func (c *changefeed) releaseResources() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
if err := c.sink.Close(ctx); err != nil {
if err := c.sink.close(ctx); err != nil {
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down Expand Up @@ -397,7 +396,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil {
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
Expand Down Expand Up @@ -438,7 +437,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job))
return true, nil
}
done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache)
done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache)
if err != nil {
return false, err
}
Expand Down
Loading