Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

owner(ticdc): asynchronously create sink (#3598) #3960

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type changefeed struct {
gcManager gc.Manager

schema *schemaWrap4Owner
sink AsyncSink
sink DDLSink
ddlPuller DDLPuller
initialized bool

Expand All @@ -51,21 +51,25 @@ type changefeed struct {
// After the DDL event has been executed, ddlEventCache will be set to nil.
ddlEventCache *model.DDLEvent

errCh chan error
errCh chan error
// cancel the running goroutine start by `DDLPuller`
cancel context.CancelFunc

// The changefeed will start some backend goroutines in the function `initialize`,
// such as DDLPuller, Sink, etc.
// such as DDLPuller, DDLSink, etc.
// `wg` is used to manage those backend goroutines.
// But it only manages the DDLPuller for now.
// TODO: manage the Sink and other backend goroutines.
wg sync.WaitGroup

metricsChangefeedCheckpointTsGauge prometheus.Gauge
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
<<<<<<< HEAD
newSink func(ctx cdcContext.Context) (AsyncSink, error)
=======
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
>>>>>>> b5a932dfb (owner(ticdc): asynchronously create sink (#3598))
}

func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
Expand All @@ -80,15 +84,20 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed {
cancel: func() {},

newDDLPuller: newDDLPuller,
newSink: newDDLSink,
}
<<<<<<< HEAD
c.newSink = newAsyncSink
=======
c.newScheduler = newScheduler
>>>>>>> b5a932dfb (owner(ticdc): asynchronously create sink (#3598))
return c
}

func newChangefeed4Test(
id model.ChangeFeedID, gcManager gc.Manager,
newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error),
newSink func(ctx cdcContext.Context) (AsyncSink, error),
newSink func() DDLSink,
) *changefeed {
c := newChangefeed(id, gcManager)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -161,7 +170,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor
default:
}

c.sink.EmitCheckpointTs(ctx, checkpointTs)
c.sink.emitCheckpointTs(ctx, checkpointTs)
barrierTs, err := c.handleBarrier(ctx)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -244,8 +253,10 @@ LOOP:
if err != nil {
return errors.Trace(err)
}

cancelCtx, cancel := cdcContext.WithCancel(ctx)
c.cancel = cancel
<<<<<<< HEAD
c.sink, err = c.newSink(cancelCtx)
if err != nil {
return errors.Trace(err)
Expand All @@ -254,6 +265,12 @@ LOOP:
if err != nil {
return errors.Trace(err)
}
=======

c.sink = c.newSink()
c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info)

>>>>>>> b5a932dfb (owner(ticdc): asynchronously create sink (#3598))
// Refer to the previous comment on why we use (checkpointTs-1).
c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1)
if err != nil {
Expand Down Expand Up @@ -285,7 +302,11 @@ func (c *changefeed) releaseResources() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
// We don't need to wait sink Close, pass a canceled context is ok
<<<<<<< HEAD
if err := c.sink.Close(ctx); err != nil {
=======
if err := c.sink.close(canceledCtx); err != nil {
>>>>>>> b5a932dfb (owner(ticdc): asynchronously create sink (#3598))
log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err))
}
c.wg.Wait()
Expand Down Expand Up @@ -388,7 +409,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) {
return barrierTs, nil
}
nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval))
if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil {
if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil {
return 0, errors.Trace(err)
}
c.barriers.Update(syncPointBarrier, nextSyncPointTs)
Expand Down Expand Up @@ -429,7 +450,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don
log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job))
return true, nil
}
done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache)
done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache)
if err != nil {
return false, err
}
Expand Down
59 changes: 36 additions & 23 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ package owner

import (
"context"
<<<<<<< HEAD
=======
"os"
"path/filepath"
"sync"
>>>>>>> b5a932dfb (owner(ticdc): asynchronously create sink (#3598))
"sync/atomic"
"time"

Expand Down Expand Up @@ -62,22 +68,32 @@ func (m *mockDDLPuller) Run(ctx cdcContext.Context) error {
return nil
}

type mockAsyncSink struct {
// AsyncSink
type mockDDLSink struct {
// DDLSink
ddlExecuting *model.DDLEvent
ddlDone bool
checkpointTs model.Ts
syncPoint model.Ts
syncPointHis []model.Ts

wg sync.WaitGroup
}

func (m *mockDDLSink) run(ctx cdcContext.Context, _ model.ChangeFeedID, _ *model.ChangeFeedInfo) {
m.wg.Add(1)
go func() {
<-ctx.Done()
m.wg.Done()
}()
}

func (m *mockAsyncSink) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
func (m *mockDDLSink) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) {
m.ddlExecuting = ddl
defer func() { m.ddlDone = false }()
return m.ddlDone, nil
}

func (m *mockAsyncSink) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error {
func (m *mockDDLSink) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error {
if checkpointTs == m.syncPoint {
return nil
}
Expand All @@ -86,19 +102,16 @@ func (m *mockAsyncSink) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint6
return nil
}

func (m *mockAsyncSink) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error {
return nil
}

func (m *mockAsyncSink) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) {
func (m *mockDDLSink) emitCheckpointTs(ctx cdcContext.Context, ts uint64) {
atomic.StoreUint64(&m.checkpointTs, ts)
}

func (m *mockAsyncSink) Close(ctx context.Context) error {
func (m *mockDDLSink) close(ctx context.Context) error {
m.wg.Wait()
return nil
}

func (m *mockAsyncSink) Barrier(ctx context.Context) error {
func (m *mockDDLSink) Barrier(ctx context.Context) error {
return nil
}

Expand All @@ -116,8 +129,8 @@ func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *mo
gcManager := gc.NewManager(ctx.GlobalVars().PDClient)
cf := newChangefeed4Test(ctx.ChangefeedVars().ID, gcManager, func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) {
return &mockDDLPuller{resolvedTs: startTs - 1}, nil
}, func(ctx cdcContext.Context) (AsyncSink, error) {
return &mockAsyncSink{}, nil
}, func() DDLSink {
return &mockDDLSink{}
})
state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID)
tester := orchestrator.NewReactorStateTester(c, state, nil)
Expand Down Expand Up @@ -248,7 +261,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
// ddl puller resolved ts grow uo
mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockDDLPuller.resolvedTs = startTs
mockAsyncSink := cf.sink.(*mockAsyncSink)
mockDDLSink := cf.sink.(*mockDDLSink)
job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
// three tick to make sure all barriers set in initialize is handled
Expand All @@ -258,7 +271,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 0)

// executing the ddl finished
mockAsyncSink.ddlDone = true
mockDDLSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
Expand All @@ -270,10 +283,10 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create database test1")

// executing the ddl finished
mockAsyncSink.ddlDone = true
mockDDLSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
Expand All @@ -285,10 +298,10 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) {
mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job)
tickThreeTime()
c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs)
c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")
c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)")

// executing the ddl finished
mockAsyncSink.ddlDone = true
mockDDLSink.ddlDone = true
mockDDLPuller.resolvedTs += 1000
tickThreeTime()
c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, check.HasKey, job.TableID)
Expand All @@ -311,19 +324,19 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) {
tester.MustApplyPatches()

mockDDLPuller := cf.ddlPuller.(*mockDDLPuller)
mockAsyncSink := cf.sink.(*mockAsyncSink)
mockDDLSink := cf.sink.(*mockDDLSink)
// add 5s to resolvedTs
mockDDLPuller.resolvedTs = oracle.GoTimeToTS(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5 * time.Second))
// tick 20 times
for i := 0; i <= 20; i++ {
cf.Tick(ctx, state, captures)
tester.MustApplyPatches()
}
for i := 1; i < len(mockAsyncSink.syncPointHis); i++ {
for i := 1; i < len(mockDDLSink.syncPointHis); i++ {
// check the time interval between adjacent sync points is less or equal than one second
c.Assert(mockAsyncSink.syncPointHis[i]-mockAsyncSink.syncPointHis[i-1], check.LessEqual, uint64(1000<<18))
c.Assert(mockDDLSink.syncPointHis[i]-mockDDLSink.syncPointHis[i-1], check.LessEqual, uint64(1000<<18))
}
c.Assert(len(mockAsyncSink.syncPointHis), check.GreaterEqual, 5)
c.Assert(len(mockDDLSink.syncPointHis), check.GreaterEqual, 5)
}

func (s *changefeedSuite) TestFinished(c *check.C) {
Expand Down
1 change: 1 addition & 0 deletions cdc/owner/ddl_puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,5 +163,6 @@ func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) {
}

func (h *ddlPullerImpl) Close() {
log.Info("Close the ddl puller")
h.cancel()
}
Loading