diff --git a/cdc/owner/async_sink.go b/cdc/owner/async_sink.go deleted file mode 100644 index c2dc5459e0f..00000000000 --- a/cdc/owner/async_sink.go +++ /dev/null @@ -1,187 +0,0 @@ -// Copyright 2021 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// See the License for the specific language governing permissions and -// limitations under the License. - -package owner - -import ( - "context" - "sync" - "sync/atomic" - "time" - - "github.com/pingcap/errors" - "github.com/pingcap/failpoint" - "github.com/pingcap/log" - "github.com/pingcap/tiflow/cdc/model" - "github.com/pingcap/tiflow/cdc/sink" - cdcContext "github.com/pingcap/tiflow/pkg/context" - cerror "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/filter" - "go.uber.org/zap" -) - -const ( - defaultErrChSize = 1024 -) - -// AsyncSink is a async sink design for owner -// The EmitCheckpointTs and EmitDDLEvent is asynchronous function for now -// Other functions are still synchronization -type AsyncSink interface { - // EmitCheckpointTs emits the checkpoint Ts to downstream data source - // this function will return after recording the checkpointTs specified in memory immediately - // and the recorded checkpointTs will be sent and updated to downstream data source every second - EmitCheckpointTs(ctx cdcContext.Context, ts uint64) - // EmitDDLEvent emits DDL event asynchronously and return true if the DDL is executed - // the DDL event will be sent to another goroutine and execute to downstream - // the caller of this function can call again and again until a true returned - EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) - SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error - Close(ctx context.Context) error -} - -type asyncSinkImpl struct { - sink sink.Sink - syncpointStore sink.SyncpointStore - - checkpointTs model.Ts - - lastSyncPoint model.Ts - - ddlCh chan *model.DDLEvent - ddlFinishedTs model.Ts - ddlSentTs model.Ts - - cancel context.CancelFunc - wg sync.WaitGroup - errCh chan error -} - -func newAsyncSink(ctx cdcContext.Context) (AsyncSink, error) { - ctx, cancel := cdcContext.WithCancel(ctx) - changefeedID := ctx.ChangefeedVars().ID - changefeedInfo := ctx.ChangefeedVars().Info - filter, err := filter.NewFilter(changefeedInfo.Config) - if err != nil { - return nil, errors.Trace(err) - } - errCh := make(chan error, defaultErrChSize) - s, err := sink.NewSink(ctx, changefeedID, changefeedInfo.SinkURI, filter, changefeedInfo.Config, changefeedInfo.Opts, errCh) - if err != nil { - return nil, errors.Trace(err) - } - asyncSink := &asyncSinkImpl{ - sink: s, - ddlCh: make(chan *model.DDLEvent, 1), - errCh: errCh, - cancel: cancel, - } - if changefeedInfo.SyncPointEnabled { - asyncSink.syncpointStore, err = sink.NewSyncpointStore(ctx, changefeedID, changefeedInfo.SinkURI) - if err != nil { - return nil, errors.Trace(err) - } - if err := asyncSink.syncpointStore.CreateSynctable(ctx); err != nil { - return nil, errors.Trace(err) - } - } - asyncSink.wg.Add(1) - go asyncSink.run(ctx) - return asyncSink, nil -} - -func (s *asyncSinkImpl) run(ctx cdcContext.Context) { - defer s.wg.Done() - // TODO make the tick duration configurable - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - var lastCheckpointTs model.Ts - for { - select { - case <-ctx.Done(): - return - case err := <-s.errCh: - ctx.Throw(err) - return - case <-ticker.C: - checkpointTs := atomic.LoadUint64(&s.checkpointTs) - if checkpointTs == 0 || checkpointTs <= lastCheckpointTs { - continue - } - lastCheckpointTs = checkpointTs - if err := s.sink.EmitCheckpointTs(ctx, checkpointTs); err != nil { - ctx.Throw(errors.Trace(err)) - return - } - case ddl := <-s.ddlCh: - err := s.sink.EmitDDLEvent(ctx, ddl) - failpoint.Inject("InjectChangefeedDDLError", func() { - err = cerror.ErrExecDDLFailed.GenWithStackByArgs() - }) - if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { - log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl)) - atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) - } else { - // If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed - log.Error("Execute DDL failed", - zap.String("ChangeFeedID", ctx.ChangefeedVars().ID), - zap.Error(err), - zap.Reflect("ddl", ddl)) - ctx.Throw(errors.Trace(err)) - return - } - } - } -} - -func (s *asyncSinkImpl) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) { - atomic.StoreUint64(&s.checkpointTs, ts) -} - -func (s *asyncSinkImpl) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { - ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs) - if ddl.CommitTs <= ddlFinishedTs { - // the DDL event is executed successfully, and done is true - return true, nil - } - if ddl.CommitTs <= s.ddlSentTs { - // the DDL event is executing and not finished yes, return false - return false, nil - } - select { - case <-ctx.Done(): - return false, errors.Trace(ctx.Err()) - case s.ddlCh <- ddl: - } - s.ddlSentTs = ddl.CommitTs - return false, nil -} - -func (s *asyncSinkImpl) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error { - if checkpointTs == s.lastSyncPoint { - return nil - } - s.lastSyncPoint = checkpointTs - // TODO implement async sink syncpoint - return s.syncpointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs) -} - -func (s *asyncSinkImpl) Close(ctx context.Context) (err error) { - s.cancel() - err = s.sink.Close(ctx) - if s.syncpointStore != nil { - err = s.syncpointStore.Close() - } - s.wg.Wait() - return -} diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 161320d019a..7e10ba35161 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -42,7 +42,7 @@ type changefeed struct { gcManager gc.Manager schema *schemaWrap4Owner - sink AsyncSink + sink DDLSink ddlPuller DDLPuller initialized bool @@ -51,14 +51,13 @@ type changefeed struct { // After the DDL event has been executed, ddlEventCache will be set to nil. ddlEventCache *model.DDLEvent - errCh chan error + errCh chan error + // cancel the running goroutine start by `DDLPuller` cancel context.CancelFunc // The changefeed will start some backend goroutines in the function `initialize`, - // such as DDLPuller, Sink, etc. + // such as DDLPuller, DDLSink, etc. // `wg` is used to manage those backend goroutines. - // But it only manages the DDLPuller for now. - // TODO: manage the Sink and other backend goroutines. wg sync.WaitGroup metricsChangefeedCheckpointTsGauge prometheus.Gauge @@ -67,7 +66,7 @@ type changefeed struct { metricsChangefeedResolvedTsLagGauge prometheus.Gauge newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) - newSink func(ctx cdcContext.Context) (AsyncSink, error) + newSink func() DDLSink } func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { @@ -82,15 +81,15 @@ func newChangefeed(id model.ChangeFeedID, gcManager gc.Manager) *changefeed { cancel: func() {}, newDDLPuller: newDDLPuller, + newSink: newDDLSink, } - c.newSink = newAsyncSink return c } func newChangefeed4Test( id model.ChangeFeedID, gcManager gc.Manager, newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), - newSink func(ctx cdcContext.Context) (AsyncSink, error), + newSink func() DDLSink, ) *changefeed { c := newChangefeed(id, gcManager) c.newDDLPuller = newDDLPuller @@ -163,7 +162,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *model.ChangefeedReactor default: } - c.sink.EmitCheckpointTs(ctx, checkpointTs) + c.sink.emitCheckpointTs(ctx, checkpointTs) barrierTs, err := c.handleBarrier(ctx) if err != nil { return errors.Trace(err) @@ -246,12 +245,12 @@ LOOP: if err != nil { return errors.Trace(err) } + cancelCtx, cancel := cdcContext.WithCancel(ctx) c.cancel = cancel - c.sink, err = c.newSink(cancelCtx) - if err != nil { - return errors.Trace(err) - } + + c.sink = c.newSink() + c.sink.run(cancelCtx, cancelCtx.ChangefeedVars().ID, cancelCtx.ChangefeedVars().Info) // Refer to the previous comment on why we use (checkpointTs-1). c.ddlPuller, err = c.newDDLPuller(cancelCtx, checkpointTs-1) @@ -287,7 +286,7 @@ func (c *changefeed) releaseResources() { ctx, cancel := context.WithCancel(context.Background()) cancel() // We don't need to wait sink Close, pass a canceled context is ok - if err := c.sink.Close(ctx); err != nil { + if err := c.sink.close(ctx); err != nil { log.Warn("Closing sink failed in Owner", zap.String("changefeedID", c.state.ID), zap.Error(err)) } c.wg.Wait() @@ -397,7 +396,7 @@ func (c *changefeed) handleBarrier(ctx cdcContext.Context) (uint64, error) { return barrierTs, nil } nextSyncPointTs := oracle.GoTimeToTS(oracle.GetTimeFromTS(barrierTs).Add(c.state.Info.SyncPointInterval)) - if err := c.sink.SinkSyncpoint(ctx, barrierTs); err != nil { + if err := c.sink.emitSyncPoint(ctx, barrierTs); err != nil { return 0, errors.Trace(err) } c.barriers.Update(syncPointBarrier, nextSyncPointTs) @@ -438,7 +437,7 @@ func (c *changefeed) asyncExecDDL(ctx cdcContext.Context, job *timodel.Job) (don log.Warn("ignore the DDL job of ineligible table", zap.Reflect("job", job)) return true, nil } - done, err = c.sink.EmitDDLEvent(ctx, c.ddlEventCache) + done, err = c.sink.emitDDLEvent(ctx, c.ddlEventCache) if err != nil { return false, err } diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 7cb56fde56c..799b421322a 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -15,6 +15,7 @@ package owner import ( "context" + "sync" "sync/atomic" "time" @@ -62,22 +63,32 @@ func (m *mockDDLPuller) Run(ctx cdcContext.Context) error { return nil } -type mockAsyncSink struct { - // AsyncSink +type mockDDLSink struct { + // DDLSink ddlExecuting *model.DDLEvent ddlDone bool checkpointTs model.Ts syncPoint model.Ts syncPointHis []model.Ts + + wg sync.WaitGroup +} + +func (m *mockDDLSink) run(ctx cdcContext.Context, _ model.ChangeFeedID, _ *model.ChangeFeedInfo) { + m.wg.Add(1) + go func() { + <-ctx.Done() + m.wg.Done() + }() } -func (m *mockAsyncSink) EmitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { +func (m *mockDDLSink) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { m.ddlExecuting = ddl defer func() { m.ddlDone = false }() return m.ddlDone, nil } -func (m *mockAsyncSink) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint64) error { +func (m *mockDDLSink) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error { if checkpointTs == m.syncPoint { return nil } @@ -86,19 +97,16 @@ func (m *mockAsyncSink) SinkSyncpoint(ctx cdcContext.Context, checkpointTs uint6 return nil } -func (m *mockAsyncSink) Initialize(ctx cdcContext.Context, tableInfo []*model.SimpleTableInfo) error { - return nil -} - -func (m *mockAsyncSink) EmitCheckpointTs(ctx cdcContext.Context, ts uint64) { +func (m *mockDDLSink) emitCheckpointTs(ctx cdcContext.Context, ts uint64) { atomic.StoreUint64(&m.checkpointTs, ts) } -func (m *mockAsyncSink) Close(ctx context.Context) error { +func (m *mockDDLSink) close(ctx context.Context) error { + m.wg.Wait() return nil } -func (m *mockAsyncSink) Barrier(ctx context.Context) error { +func (m *mockDDLSink) Barrier(ctx context.Context) error { return nil } @@ -117,8 +125,8 @@ func createChangefeed4Test(ctx cdcContext.Context, c *check.C) (*changefeed, *mo gcManager := gc.NewManager(ctx.GlobalVars().PDClient) cf := newChangefeed4Test(ctx.ChangefeedVars().ID, gcManager, func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { return &mockDDLPuller{resolvedTs: startTs - 1}, nil - }, func(ctx cdcContext.Context) (AsyncSink, error) { - return &mockAsyncSink{}, nil + }, func() DDLSink { + return &mockDDLSink{} }) state := model.NewChangefeedReactorState(ctx.ChangefeedVars().ID) tester := orchestrator.NewReactorStateTester(c, state, nil) @@ -249,7 +257,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { // ddl puller resolved ts grow uo mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) mockDDLPuller.resolvedTs = startTs - mockAsyncSink := cf.sink.(*mockAsyncSink) + mockDDLSink := cf.sink.(*mockDDLSink) job.BinlogInfo.FinishedTS = mockDDLPuller.resolvedTs mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) // three tick to make sure all barriers set in initialize is handled @@ -259,7 +267,7 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { c.Assert(cf.schema.AllPhysicalTables(), check.HasLen, 0) // executing the ddl finished - mockAsyncSink.ddlDone = true + mockDDLSink.ddlDone = true mockDDLPuller.resolvedTs += 1000 tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) @@ -271,10 +279,10 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create database test1") + c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create database test1") // executing the ddl finished - mockAsyncSink.ddlDone = true + mockDDLSink.ddlDone = true mockDDLPuller.resolvedTs += 1000 tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) @@ -286,10 +294,10 @@ func (s *changefeedSuite) TestExecDDL(c *check.C) { mockDDLPuller.ddlQueue = append(mockDDLPuller.ddlQueue, job) tickThreeTime() c.Assert(state.Status.CheckpointTs, check.Equals, mockDDLPuller.resolvedTs) - c.Assert(mockAsyncSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)") + c.Assert(mockDDLSink.ddlExecuting.Query, check.Equals, "create table test1.test1(id int primary key)") // executing the ddl finished - mockAsyncSink.ddlDone = true + mockDDLSink.ddlDone = true mockDDLPuller.resolvedTs += 1000 tickThreeTime() c.Assert(state.TaskStatuses[ctx.GlobalVars().CaptureInfo.ID].Tables, check.HasKey, job.TableID) @@ -312,7 +320,7 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) { tester.MustApplyPatches() mockDDLPuller := cf.ddlPuller.(*mockDDLPuller) - mockAsyncSink := cf.sink.(*mockAsyncSink) + mockDDLSink := cf.sink.(*mockDDLSink) // add 5s to resolvedTs mockDDLPuller.resolvedTs = oracle.GoTimeToTS(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5 * time.Second)) // tick 20 times @@ -320,11 +328,11 @@ func (s *changefeedSuite) TestSyncPoint(c *check.C) { cf.Tick(ctx, state, captures) tester.MustApplyPatches() } - for i := 1; i < len(mockAsyncSink.syncPointHis); i++ { + for i := 1; i < len(mockDDLSink.syncPointHis); i++ { // check the time interval between adjacent sync points is less or equal than one second - c.Assert(mockAsyncSink.syncPointHis[i]-mockAsyncSink.syncPointHis[i-1], check.LessEqual, uint64(1000<<18)) + c.Assert(mockDDLSink.syncPointHis[i]-mockDDLSink.syncPointHis[i-1], check.LessEqual, uint64(1000<<18)) } - c.Assert(len(mockAsyncSink.syncPointHis), check.GreaterEqual, 5) + c.Assert(len(mockDDLSink.syncPointHis), check.GreaterEqual, 5) } func (s *changefeedSuite) TestFinished(c *check.C) { diff --git a/cdc/owner/ddl_puller.go b/cdc/owner/ddl_puller.go index a7c0f2c60b2..a3bd3d1ff61 100644 --- a/cdc/owner/ddl_puller.go +++ b/cdc/owner/ddl_puller.go @@ -191,5 +191,6 @@ func (h *ddlPullerImpl) PopFrontDDL() (uint64, *timodel.Job) { } func (h *ddlPullerImpl) Close() { + log.Info("Close the ddl puller") h.cancel() } diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go new file mode 100644 index 00000000000..00646f19823 --- /dev/null +++ b/cdc/owner/ddl_sink.go @@ -0,0 +1,218 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package owner + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/sink" + cdcContext "github.com/pingcap/tiflow/pkg/context" + cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/filter" + "go.uber.org/zap" +) + +const ( + defaultErrChSize = 1024 +) + +// DDLSink is a wrapper of the `Sink` interface for the owner +// DDLSink should send `DDLEvent` and `CheckpointTs` to downstream sink, +// If `SyncPointEnabled`, also send `syncPoint` to downstream. +type DDLSink interface { + // run the DDLSink + run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) + // emitCheckpointTs emits the checkpoint Ts to downstream data source + // this function will return after recording the checkpointTs specified in memory immediately + // and the recorded checkpointTs will be sent and updated to downstream data source every second + emitCheckpointTs(ctx cdcContext.Context, ts uint64) + // emitDDLEvent emits DDL event and return true if the DDL is executed + // the DDL event will be sent to another goroutine and execute to downstream + // the caller of this function can call again and again until a true returned + emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) + emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error + // close the sink, cancel running goroutine. + close(ctx context.Context) error +} + +type ddlSinkImpl struct { + lastSyncPoint model.Ts + syncPointStore sink.SyncpointStore + + checkpointTs model.Ts + ddlFinishedTs model.Ts + ddlSentTs model.Ts + + ddlCh chan *model.DDLEvent + errCh chan error + + sink sink.Sink + // `sinkInitHandler` can be helpful in unit testing. + sinkInitHandler ddlSinkInitHandler + + // cancel would be used to cancel the goroutine start by `run` + cancel context.CancelFunc + wg sync.WaitGroup +} + +func newDDLSink() DDLSink { + return &ddlSinkImpl{ + ddlCh: make(chan *model.DDLEvent, 1), + errCh: make(chan error, defaultErrChSize), + sinkInitHandler: ddlSinkInitializer, + cancel: func() {}, + } +} + +type ddlSinkInitHandler func(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error + +func ddlSinkInitializer(ctx cdcContext.Context, a *ddlSinkImpl, id model.ChangeFeedID, info *model.ChangeFeedInfo) error { + filter, err := filter.NewFilter(info.Config) + if err != nil { + return errors.Trace(err) + } + + s, err := sink.NewSink(ctx, id, info.SinkURI, filter, info.Config, info.Opts, a.errCh) + if err != nil { + return errors.Trace(err) + } + a.sink = s + + if !info.SyncPointEnabled { + return nil + } + syncPointStore, err := sink.NewSyncpointStore(ctx, id, info.SinkURI) + if err != nil { + return errors.Trace(err) + } + a.syncPointStore = syncPointStore + + if err := a.syncPointStore.CreateSynctable(ctx); err != nil { + return errors.Trace(err) + } + return nil +} + +func (s *ddlSinkImpl) run(ctx cdcContext.Context, id model.ChangeFeedID, info *model.ChangeFeedInfo) { + ctx, cancel := cdcContext.WithCancel(ctx) + s.cancel = cancel + + s.wg.Add(1) + go func() { + defer s.wg.Done() + + start := time.Now() + if err := s.sinkInitHandler(ctx, s, id, info); err != nil { + log.Warn("ddl sink initialize failed", zap.Duration("elapsed", time.Since(start))) + ctx.Throw(err) + return + } + log.Info("ddl sink initialized, start processing...", zap.Duration("elapsed", time.Since(start))) + + // TODO make the tick duration configurable + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + var lastCheckpointTs model.Ts + for { + select { + case <-ctx.Done(): + return + case err := <-s.errCh: + ctx.Throw(err) + return + case <-ticker.C: + checkpointTs := atomic.LoadUint64(&s.checkpointTs) + if checkpointTs == 0 || checkpointTs <= lastCheckpointTs { + continue + } + lastCheckpointTs = checkpointTs + if err := s.sink.EmitCheckpointTs(ctx, checkpointTs); err != nil { + ctx.Throw(errors.Trace(err)) + return + } + case ddl := <-s.ddlCh: + err := s.sink.EmitDDLEvent(ctx, ddl) + failpoint.Inject("InjectChangefeedDDLError", func() { + err = cerror.ErrExecDDLFailed.GenWithStackByArgs() + }) + if err == nil || cerror.ErrDDLEventIgnored.Equal(errors.Cause(err)) { + log.Info("Execute DDL succeeded", zap.String("changefeed", ctx.ChangefeedVars().ID), zap.Bool("ignored", err != nil), zap.Reflect("ddl", ddl)) + atomic.StoreUint64(&s.ddlFinishedTs, ddl.CommitTs) + continue + } + // If DDL executing failed, and the error can not be ignored, throw an error and pause the changefeed + log.Error("Execute DDL failed", + zap.String("ChangeFeedID", ctx.ChangefeedVars().ID), + zap.Error(err), + zap.Reflect("ddl", ddl)) + ctx.Throw(errors.Trace(err)) + return + } + } + }() +} + +func (s *ddlSinkImpl) emitCheckpointTs(ctx cdcContext.Context, ts uint64) { + atomic.StoreUint64(&s.checkpointTs, ts) +} + +func (s *ddlSinkImpl) emitDDLEvent(ctx cdcContext.Context, ddl *model.DDLEvent) (bool, error) { + ddlFinishedTs := atomic.LoadUint64(&s.ddlFinishedTs) + if ddl.CommitTs <= ddlFinishedTs { + // the DDL event is executed successfully, and done is true + return true, nil + } + if ddl.CommitTs <= s.ddlSentTs { + // the DDL event is executing and not finished yet, return false + return false, nil + } + select { + case <-ctx.Done(): + return false, errors.Trace(ctx.Err()) + case s.ddlCh <- ddl: + s.ddlSentTs = ddl.CommitTs + default: + // if this hit, we think that ddlCh is full, + // just return false and send the ddl in the next round. + } + return false, nil +} + +func (s *ddlSinkImpl) emitSyncPoint(ctx cdcContext.Context, checkpointTs uint64) error { + if checkpointTs == s.lastSyncPoint { + return nil + } + s.lastSyncPoint = checkpointTs + // TODO implement async sink syncPoint + return s.syncPointStore.SinkSyncpoint(ctx, ctx.ChangefeedVars().ID, checkpointTs) +} + +func (s *ddlSinkImpl) close(ctx context.Context) (err error) { + s.cancel() + if s.sink != nil { + err = s.sink.Close(ctx) + } + if s.syncPointStore != nil { + err = s.syncPointStore.Close() + } + s.wg.Wait() + return err +} diff --git a/cdc/owner/async_sink_test.go b/cdc/owner/ddl_sink_test.go similarity index 56% rename from cdc/owner/async_sink_test.go rename to cdc/owner/ddl_sink_test.go index 795cd066267..94dff72381b 100644 --- a/cdc/owner/async_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -23,17 +23,15 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sink" - "github.com/pingcap/tiflow/pkg/config" cdcContext "github.com/pingcap/tiflow/pkg/context" cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/util/testleak" ) -var _ = check.Suite(&asyncSinkSuite{}) +var _ = check.Suite(&ddlSinkSuite{}) -type asyncSinkSuite struct { -} +type ddlSinkSuite struct{} type mockSink struct { sink.Sink @@ -70,23 +68,26 @@ func (m *mockSink) GetDDL() *model.DDLEvent { return m.ddl } -func newAsyncSink4Test(ctx cdcContext.Context, c *check.C) (cdcContext.Context, AsyncSink, *mockSink) { - ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ - ID: "test-changefeed", - Info: &model.ChangeFeedInfo{SinkURI: "blackhole://", Config: config.GetDefaultReplicaConfig()}, - }) - sink, err := newAsyncSink(ctx) - c.Assert(err, check.IsNil) +func newDDLSink4Test() (DDLSink, *mockSink) { mockSink := &mockSink{} - sink.(*asyncSinkImpl).sink = mockSink - return ctx, sink, mockSink + ddlSink := newDDLSink() + ddlSink.(*ddlSinkImpl).sinkInitHandler = func(ctx cdcContext.Context, a *ddlSinkImpl, _ model.ChangeFeedID, _ *model.ChangeFeedInfo) error { + a.sink = mockSink + return nil + } + return ddlSink, mockSink } -func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { +func (s *ddlSinkSuite) TestCheckpoint(c *check.C) { defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - ctx, sink, mSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) + ddlSink, mSink := newDDLSink4Test() + ctx := cdcContext.NewBackendContext4Test(true) + ctx, cancel := cdcContext.WithCancel(ctx) + defer func() { + cancel() + ddlSink.close(ctx) + }() + ddlSink.run(ctx, ctx.ChangefeedVars().ID, ctx.ChangefeedVars().Info) waitCheckpointGrowingUp := func(m *mockSink, targetTs model.Ts) error { return retry.Do(context.Background(), func() error { @@ -96,88 +97,92 @@ func (s *asyncSinkSuite) TestCheckpoint(c *check.C) { return nil }, retry.WithBackoffBaseDelay(100), retry.WithMaxTries(30)) } - sink.EmitCheckpointTs(ctx, 1) + ddlSink.emitCheckpointTs(ctx, 1) c.Assert(waitCheckpointGrowingUp(mSink, 1), check.IsNil) - sink.EmitCheckpointTs(ctx, 10) + ddlSink.emitCheckpointTs(ctx, 10) c.Assert(waitCheckpointGrowingUp(mSink, 10), check.IsNil) } -func (s *asyncSinkSuite) TestExecDDL(c *check.C) { +func (s *ddlSinkSuite) TestExecDDL(c *check.C) { defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - ctx, sink, mSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) - ddl1 := &model.DDLEvent{CommitTs: 1} - for { - done, err := sink.EmitDDLEvent(ctx, ddl1) - c.Assert(err, check.IsNil) - if done { - c.Assert(mSink.GetDDL(), check.DeepEquals, ddl1) - break - } + ddlSink, mSink := newDDLSink4Test() + ctx := cdcContext.NewBackendContext4Test(true) + ctx, cancel := cdcContext.WithCancel(ctx) + defer func() { + cancel() + ddlSink.close(ctx) + }() + ddlSink.run(ctx, ctx.ChangefeedVars().ID, ctx.ChangefeedVars().Info) + + ddlEvents := []*model.DDLEvent{ + {CommitTs: 1}, + {CommitTs: 2}, + {CommitTs: 3}, } - ddl2 := &model.DDLEvent{CommitTs: 2} - ddl3 := &model.DDLEvent{CommitTs: 3} - _, err := sink.EmitDDLEvent(ctx, ddl2) - c.Assert(err, check.IsNil) - for { - done, err := sink.EmitDDLEvent(ctx, ddl2) - c.Assert(err, check.IsNil) - if done { - c.Assert(mSink.GetDDL(), check.DeepEquals, ddl2) - break - } - } - _, err = sink.EmitDDLEvent(ctx, ddl3) - c.Assert(err, check.IsNil) - for { - done, err := sink.EmitDDLEvent(ctx, ddl3) - c.Assert(err, check.IsNil) - if done { - c.Assert(mSink.GetDDL(), check.DeepEquals, ddl3) - break + + for _, event := range ddlEvents { + for { + done, err := ddlSink.emitDDLEvent(ctx, event) + c.Assert(err, check.IsNil) + if done { + c.Assert(mSink.GetDDL(), check.DeepEquals, event) + break + } } } } -func (s *asyncSinkSuite) TestExecDDLError(c *check.C) { +func (s *ddlSinkSuite) TestExecDDLError(c *check.C) { defer testleak.AfterTest(c)() - ctx := cdcContext.NewBackendContext4Test(false) - var resultErr error - var resultErrMu sync.Mutex - getResultErr := func() error { + ctx := cdcContext.NewBackendContext4Test(true) + + var ( + resultErr error + resultErrMu sync.Mutex + ) + readResultErr := func() error { resultErrMu.Lock() defer resultErrMu.Unlock() return resultErr } + + ddlSink, mSink := newDDLSink4Test() ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { resultErrMu.Lock() defer resultErrMu.Unlock() resultErr = err return nil }) - ctx, sink, mSink := newAsyncSink4Test(ctx, c) - defer sink.Close(ctx) + ctx, cancel := cdcContext.WithCancel(ctx) + defer func() { + cancel() + ddlSink.close(ctx) + }() + + ddlSink.run(ctx, ctx.ChangefeedVars().ID, ctx.ChangefeedVars().Info) + mSink.ddlError = cerror.ErrDDLEventIgnored.GenWithStackByArgs() ddl1 := &model.DDLEvent{CommitTs: 1} for { - done, err := sink.EmitDDLEvent(ctx, ddl1) + done, err := ddlSink.emitDDLEvent(ctx, ddl1) c.Assert(err, check.IsNil) if done { c.Assert(mSink.GetDDL(), check.DeepEquals, ddl1) break } } - c.Assert(getResultErr(), check.IsNil) + c.Assert(resultErr, check.IsNil) + mSink.ddlError = cerror.ErrExecDDLFailed.GenWithStackByArgs() ddl2 := &model.DDLEvent{CommitTs: 2} for { - done, err := sink.EmitDDLEvent(ctx, ddl2) + done, err := ddlSink.emitDDLEvent(ctx, ddl2) c.Assert(err, check.IsNil) - if done || getResultErr() != nil { + + if done || readResultErr() != nil { c.Assert(mSink.GetDDL(), check.DeepEquals, ddl2) break } } - c.Assert(cerror.ErrExecDDLFailed.Equal(errors.Cause(getResultErr())), check.IsTrue) + c.Assert(cerror.ErrExecDDLFailed.Equal(readResultErr()), check.IsTrue) } diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 53ba3bbf62e..6b79c407125 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -97,7 +97,7 @@ func NewOwner(pdClient pd.Client) *Owner { // NewOwner4Test creates a new Owner for test func NewOwner4Test( newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error), - newSink func(ctx cdcContext.Context) (AsyncSink, error), + newSink func() DDLSink, pdClient pd.Client, ) *Owner { o := NewOwner(pdClient) diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 98cf891b0db..39a8a269cab 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -57,8 +57,8 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.Global } cf := NewOwner4Test(func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) { return &mockDDLPuller{resolvedTs: startTs - 1}, nil - }, func(ctx cdcContext.Context) (AsyncSink, error) { - return &mockAsyncSink{}, nil + }, func() DDLSink { + return &mockDDLSink{} }, ctx.GlobalVars().PDClient, ) @@ -79,6 +79,9 @@ func createOwner4Test(ctx cdcContext.Context, c *check.C) (*Owner, *model.Global func (s *ownerSuite) TestCreateRemoveChangefeed(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() + owner, state, tester := createOwner4Test(ctx, c) changefeedID := "test-changefeed" changefeedInfo := &model.ChangeFeedInfo{ @@ -146,6 +149,9 @@ func (s *ownerSuite) TestStopChangefeed(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() + changefeedID := "test-changefeed" changefeedInfo := &model.ChangeFeedInfo{ StartTs: oracle.GoTimeToTS(time.Now()), @@ -229,6 +235,9 @@ func (s *ownerSuite) TestCheckClusterVersion(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) owner, state, tester := createOwner4Test(ctx, c) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() + tester.MustUpdate("/tidb/cdc/capture/6bbc01c8-0605-4f86-a0f9-b3119109b225", []byte(`{"id":"6bbc01c8-0605-4f86-a0f9-b3119109b225","address":"127.0.0.1:8300","version":"v6.0.0"}`)) changefeedID := "test-changefeed" @@ -263,6 +272,9 @@ func (s *ownerSuite) TestCheckClusterVersion(c *check.C) { func (s *ownerSuite) TestAdminJob(c *check.C) { defer testleak.AfterTest(c)() ctx := cdcContext.NewBackendContext4Test(false) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() + owner, _, _ := createOwner4Test(ctx, c) owner.EnqueueJob(model.AdminJob{ CfID: "test-changefeed1", @@ -310,6 +322,8 @@ func (s *ownerSuite) TestUpdateGCSafePoint(c *check.C) { o := NewOwner(mockPDClient) o.gcManager = gc.NewManager(mockPDClient) ctx := cdcContext.NewBackendContext4Test(true) + ctx, cancel := cdcContext.WithCancel(ctx) + defer cancel() state := model.NewGlobalState().(*model.GlobalReactorState) tester := orchestrator.NewReactorStateTester(c, state, nil) diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index 308b39b6133..d130a20a462 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -324,6 +324,7 @@ func (k *kafkaSaramaProducer) stop() { if atomic.SwapInt32(&k.closing, kafkaProducerClosing) == kafkaProducerClosing { return } + log.Info("kafka producer closing...") close(k.closeCh) }