From 5fc3d25f573b88e21686fe43638df38db106670b Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 23 Nov 2023 10:07:10 +0800 Subject: [PATCH] ddlManager (ticdc): add bootstrap sending function (#10045) ref pingcap/tiflow#9898, close pingcap/tiflow#10107 --- cdc/owner/changefeed_test.go | 4 ++ cdc/owner/ddl_manager.go | 73 ++++++++++++++++++++++++++++++++++++ cdc/owner/ddl_sink.go | 17 +++++++++ 3 files changed, 94 insertions(+) diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 28eaec00a03..14c3615a39e 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -140,6 +140,10 @@ func (m *mockDDLSink) emitCheckpointTs(ts uint64, tables []*model.TableInfo) { m.mu.currentTables = tables } +func (m *mockDDLSink) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error { + return nil +} + func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo) { m.mu.Lock() defer m.mu.Unlock() diff --git a/cdc/owner/ddl_manager.go b/cdc/owner/ddl_manager.go index 0d65fb61f48..c466f228c7d 100644 --- a/cdc/owner/ddl_manager.go +++ b/cdc/owner/ddl_manager.go @@ -17,6 +17,7 @@ import ( "context" "math/rand" "sort" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -93,6 +94,14 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{ timodel.ActionRemovePartitioning: {}, } +type bootstrapState int32 + +const ( + bootstrapStateNone bootstrapState = iota + bootstrapStateRunning + bootstrapStateCompleted +) + // ddlManager holds the pending DDL events of all tables and responsible for // executing them to downstream. // It also provides the ability to calculate the barrier of a changefeed. @@ -130,6 +139,10 @@ type ddlManager struct { BDRMode bool sinkType model.DownstreamType ddlResolvedTs model.Ts + + needBootstrap bool + errCh chan error + bootstrapState int32 } func newDDLManager( @@ -167,6 +180,7 @@ func newDDLManager( sinkType: model.DB, tableCheckpoint: make(map[model.TableName]model.Ts), pendingDDLs: make(map[model.TableName][]*model.DDLEvent), + errCh: make(chan error, 1), } } @@ -184,6 +198,18 @@ func (m *ddlManager) tick( checkpointTs model.Ts, tableCheckpoint map[model.TableName]model.Ts, ) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) { + // needBootstrap is true when the downstream is kafka + // and the protocol is simple protocol. + if m.needBootstrap { + ok, err := m.checkAndBootstrap(ctx) + if err != nil { + return nil, nil, err + } + if !ok { + return nil, nil, nil + } + } + m.justSentDDL = nil m.updateCheckpointTs(checkpointTs, tableCheckpoint) @@ -598,6 +624,53 @@ func (m *ddlManager) cleanCache() { m.physicalTablesCache = nil } +func (m *ddlManager) checkAndBootstrap(ctx context.Context) (bool, error) { + if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateCompleted) { + return true, nil + } + + select { + case err := <-m.errCh: + return false, err + default: + } + + if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateRunning) { + return false, nil + } + // begin bootstrap + atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateRunning)) + tables, err := m.allTables(ctx) + if err != nil { + return false, err + } + bootstrapEvents := make([]*model.DDLEvent, 0, len(tables)) + for _, table := range tables { + ddlEvent := &model.DDLEvent{ + StartTs: m.startTs, + CommitTs: m.startTs, + TableInfo: table, + IsBootstrap: true, + } + bootstrapEvents = append(bootstrapEvents, ddlEvent) + } + // send bootstrap events + go func() { + for _, event := range bootstrapEvents { + err := m.ddlSink.emitBootstrapEvent(ctx, event) + if err != nil { + log.Error("emit bootstrap event failed", + zap.Any("bootstrapEvent", event), zap.Error(err)) + atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateNone)) + m.errCh <- err + return + } + } + atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateCompleted)) + }() + return false, nil +} + // getRelatedPhysicalTableIDs get all related physical table ids of a ddl event. // It is a helper function to calculate tableBarrier. func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID { diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index f06d01e6f3a..1c67c0ffad6 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -54,6 +54,7 @@ type DDLSink interface { // the DDL event will be sent to another goroutine and execute to downstream // the caller of this function can call again and again until a true returned emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error) + emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error emitSyncPoint(ctx context.Context, checkpointTs uint64) error // close the ddlsink, cancel running goroutine. close(ctx context.Context) error @@ -384,6 +385,22 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo return false, nil } +// emitBootstrapEvent sent bootstrap event to downstream. +// It is a synchronous operation. +func (s *ddlSinkImpl) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error { + if !ddl.IsBootstrap { + return nil + } + err := s.sink.WriteDDLEvent(ctx, ddl) + if err != nil { + return errors.Trace(err) + } + // TODO: change this log to debug level after testing complete. + log.Info("emit bootstrap event", zap.String("namespace", s.changefeedID.Namespace), + zap.String("changefeed", s.changefeedID.ID), zap.Any("bootstrapEvent", ddl)) + return nil +} + func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (err error) { if checkpointTs == s.lastSyncPoint { return nil