Skip to content

Commit

Permalink
ddlManager (ticdc): add bootstrap sending function (#10045)
Browse files Browse the repository at this point in the history
ref #9898, close #10107
  • Loading branch information
asddongmen authored Nov 23, 2023
1 parent 5921050 commit 5fc3d25
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 0 deletions.
4 changes: 4 additions & 0 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,10 @@ func (m *mockDDLSink) emitCheckpointTs(ts uint64, tables []*model.TableInfo) {
m.mu.currentTables = tables
}

func (m *mockDDLSink) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
return nil
}

func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo) {
m.mu.Lock()
defer m.mu.Unlock()
Expand Down
73 changes: 73 additions & 0 deletions cdc/owner/ddl_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -93,6 +94,14 @@ var redoBarrierDDLs = map[timodel.ActionType]struct{}{
timodel.ActionRemovePartitioning: {},
}

type bootstrapState int32

const (
bootstrapStateNone bootstrapState = iota
bootstrapStateRunning
bootstrapStateCompleted
)

// ddlManager holds the pending DDL events of all tables and responsible for
// executing them to downstream.
// It also provides the ability to calculate the barrier of a changefeed.
Expand Down Expand Up @@ -130,6 +139,10 @@ type ddlManager struct {
BDRMode bool
sinkType model.DownstreamType
ddlResolvedTs model.Ts

needBootstrap bool
errCh chan error
bootstrapState int32
}

func newDDLManager(
Expand Down Expand Up @@ -167,6 +180,7 @@ func newDDLManager(
sinkType: model.DB,
tableCheckpoint: make(map[model.TableName]model.Ts),
pendingDDLs: make(map[model.TableName][]*model.DDLEvent),
errCh: make(chan error, 1),
}
}

Expand All @@ -184,6 +198,18 @@ func (m *ddlManager) tick(
checkpointTs model.Ts,
tableCheckpoint map[model.TableName]model.Ts,
) ([]model.TableID, *schedulepb.BarrierWithMinTs, error) {
// needBootstrap is true when the downstream is kafka
// and the protocol is simple protocol.
if m.needBootstrap {
ok, err := m.checkAndBootstrap(ctx)
if err != nil {
return nil, nil, err
}
if !ok {
return nil, nil, nil
}
}

m.justSentDDL = nil
m.updateCheckpointTs(checkpointTs, tableCheckpoint)

Expand Down Expand Up @@ -598,6 +624,53 @@ func (m *ddlManager) cleanCache() {
m.physicalTablesCache = nil
}

func (m *ddlManager) checkAndBootstrap(ctx context.Context) (bool, error) {
if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateCompleted) {
return true, nil
}

select {
case err := <-m.errCh:
return false, err
default:
}

if atomic.LoadInt32(&m.bootstrapState) == int32(bootstrapStateRunning) {
return false, nil
}
// begin bootstrap
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateRunning))
tables, err := m.allTables(ctx)
if err != nil {
return false, err
}
bootstrapEvents := make([]*model.DDLEvent, 0, len(tables))
for _, table := range tables {
ddlEvent := &model.DDLEvent{
StartTs: m.startTs,
CommitTs: m.startTs,
TableInfo: table,
IsBootstrap: true,
}
bootstrapEvents = append(bootstrapEvents, ddlEvent)
}
// send bootstrap events
go func() {
for _, event := range bootstrapEvents {
err := m.ddlSink.emitBootstrapEvent(ctx, event)
if err != nil {
log.Error("emit bootstrap event failed",
zap.Any("bootstrapEvent", event), zap.Error(err))
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateNone))
m.errCh <- err
return
}
}
atomic.StoreInt32(&m.bootstrapState, int32(bootstrapStateCompleted))
}()
return false, nil
}

// getRelatedPhysicalTableIDs get all related physical table ids of a ddl event.
// It is a helper function to calculate tableBarrier.
func getRelatedPhysicalTableIDs(ddl *model.DDLEvent) []model.TableID {
Expand Down
17 changes: 17 additions & 0 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type DDLSink interface {
// the DDL event will be sent to another goroutine and execute to downstream
// the caller of this function can call again and again until a true returned
emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bool, error)
emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error
emitSyncPoint(ctx context.Context, checkpointTs uint64) error
// close the ddlsink, cancel running goroutine.
close(ctx context.Context) error
Expand Down Expand Up @@ -384,6 +385,22 @@ func (s *ddlSinkImpl) emitDDLEvent(ctx context.Context, ddl *model.DDLEvent) (bo
return false, nil
}

// emitBootstrapEvent sent bootstrap event to downstream.
// It is a synchronous operation.
func (s *ddlSinkImpl) emitBootstrapEvent(ctx context.Context, ddl *model.DDLEvent) error {
if !ddl.IsBootstrap {
return nil
}
err := s.sink.WriteDDLEvent(ctx, ddl)
if err != nil {
return errors.Trace(err)
}
// TODO: change this log to debug level after testing complete.
log.Info("emit bootstrap event", zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID), zap.Any("bootstrapEvent", ddl))
return nil
}

func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (err error) {
if checkpointTs == s.lastSyncPoint {
return nil
Expand Down

0 comments on commit 5fc3d25

Please sign in to comment.