Skip to content

Commit

Permalink
barrier handle bootstrap event (#438)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Nov 2, 2024
1 parent 4249191 commit 1dba381
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 97 deletions.
33 changes: 33 additions & 0 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ func (b *Barrier) HandleStatus(from node.ID,
})
}

// HandleBootstrapResponse rebuild the block event from the bootstrap response
func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) {
for _, resp := range bootstrapRespMap {
for _, span := range resp.Spans {
// we only care about the WAITING, WRITING and DONE stage
if span.BlockState != nil && span.BlockState.Stage != heartbeatpb.BlockStage_NONE {
blockState := span.BlockState
key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint)
event, ok := b.blockedTs[key]
if !ok {
event = NewBlockEvent(resp.ChangefeedID, b.controller, blockState, b.splitTableEnabled)
b.blockedTs[key] = event
}
switch blockState.Stage {
case heartbeatpb.BlockStage_WAITING:
// it's the dispatcher's responsibility to resend the block event
event.markDispatcherEventDone(common.NewDispatcherIDFromPB(span.ID))
case heartbeatpb.BlockStage_WRITING:
// it's in writing stage, must be the writer dispatcher
// it's the maintainer's responsibility to resend the write action
event.selected = true
event.writerDispatcher = common.NewDispatcherIDFromPB(span.ID)
case heartbeatpb.BlockStage_DONE:
// it's the maintainer's responsibility to resend the pass action
event.selected = true
event.writerDispatcherAdvanced = true
event.markDispatcherEventDone(common.NewDispatcherIDFromPB(span.ID))
}
}
}
}
}

// Resend resends the message to the dispatcher manger, the pass action is handle here
func (b *Barrier) Resend() []*messaging.TargetMessage {
var msgs []*messaging.TargetMessage
Expand Down
4 changes: 4 additions & 0 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func NewBlockEvent(cfID string, controller *Controller,
}
}
}
log.Info("new block event is created",
zap.String("changefeedID", cfID),
zap.Uint64("block-ts", event.commitTs),
zap.Bool("sync-point", event.isSyncPoint))
return event
}

Expand Down
109 changes: 27 additions & 82 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/maintainer/replica"
"github.com/pingcap/ticdc/maintainer/split"
"github.com/pingcap/ticdc/pkg/bootstrap"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -78,7 +74,7 @@ type Maintainer struct {

removed *atomic.Bool

initialized bool
bootstrapped bool

// startCheckpointTs is the check point ts when the maintainer is created
// it's will be sent to dispatcher manager to initialize the checkpoint ts and get the real checkpoint ts
Expand Down Expand Up @@ -146,7 +142,7 @@ func NewMaintainer(cfID model.ChangeFeedID,
taskScheduler: taskScheduler,
startCheckpointTs: checkpointTs,
controller: NewController(cfID.ID, checkpointTs, pdAPI, regionCache, taskScheduler,
cfg.Config.Scheduler, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)),
cfg.Config, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)),
mc: mc,
state: heartbeatpb.ComponentState_Working,
removed: atomic.NewBool(false),
Expand Down Expand Up @@ -178,7 +174,6 @@ func NewMaintainer(cfID model.ChangeFeedID,
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace, cfID.ID),
}
m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.ID, m.getNewBootstrapFn())
m.barrier = NewBarrier(m.controller, cfg.Config.Scheduler.EnableTableAcrossNodes)
log.Info("maintainer is created", zap.String("id", cfID.String()))
metrics.MaintainerGauge.WithLabelValues(cfID.Namespace, cfID.ID).Inc()
return m
Expand Down Expand Up @@ -267,18 +262,6 @@ func (m *Maintainer) initialize() error {
start := time.Now()
log.Info("start to initialize changefeed maintainer",
zap.String("id", m.id.String()))
tables, err := m.initTables()
if err != nil {
return errors.Trace(err)
}
m.controller.SetInitialTables(tables)

log.Info("changefeed maintainer initialized",
zap.String("id", m.id.String()),
zap.Duration("duration", time.Since(start)))
m.initialized = true
m.state = heartbeatpb.ComponentState_Working
m.statusChanged.Store(true)

// detect the capture changes
m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.ID), func(allNodes map[node.ID]*node.Info) {
Expand All @@ -298,6 +281,11 @@ func (m *Maintainer) initialize() error {
changefeedID: m.id.ID,
eventType: EventPeriod,
}, time.Now().Add(time.Millisecond*500))
log.Info("changefeed maintainer initialized",
zap.String("id", m.id.String()),
zap.Duration("duration", time.Since(start)))
m.state = heartbeatpb.ComponentState_Working
m.statusChanged.Store(true)
return nil
}

Expand All @@ -312,20 +300,11 @@ func (m *Maintainer) cleanupMetrics() {
}

func (m *Maintainer) onInit() bool {
// already initialized
if m.initialized {
return false
err := m.initialize()
if err != nil {
m.handleError(err)
}
// async initialize the changefeed
go func() {
err := m.initialize()
if err != nil {
m.handleError(err)
}
m.stream.Wake() <- m.id.ID
log.Info("stream waked", zap.String("changefeed", m.id.String()))
}()
return true
return false
}

func (m *Maintainer) onMessage(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -406,7 +385,7 @@ func (m *Maintainer) calCheckpointTs() {
// 1. node change
// 2. ddl
// 3. interval scheduling, like balance, split
if time.Since(m.lastCheckpointTsTime) < 2*time.Second ||
if !m.bootstrapped || time.Since(m.lastCheckpointTsTime) < 2*time.Second ||
!m.controller.ScheduleFinished() {
return
}
Expand Down Expand Up @@ -460,6 +439,10 @@ func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) {
}

func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) {
// ignore the heartbeat if the maintianer not bootstrapped
if !m.bootstrapped {
return
}
req := msg.Message[0].(*heartbeatpb.HeartBeatRequest)
if req.Watermark != nil {
m.checkpointTsByCapture[msg.From] = *req.Watermark
Expand All @@ -478,6 +461,10 @@ func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) {
}

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
// the barrier is not initialized
if !m.bootstrapped {
return
}
req := msg.Message[0].(*heartbeatpb.BlockStatusRequest)
ackMsg := m.barrier.HandleStatus(msg.From, req)
m.sendMessages([]*messaging.TargetMessage{ackMsg})
Expand All @@ -495,57 +482,12 @@ func (m *Maintainer) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Maintai
if cachedResp == nil {
return
}
log.Info("all nodes have sent bootstrap response",
zap.String("changefeed", m.id.ID),
zap.Int("size", len(cachedResp)))
workingMap := make(map[int64]utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplication])
for server, bootstrapMsg := range cachedResp {
log.Info("received bootstrap response",
zap.String("changefeed", m.id.ID),
zap.Any("server", server),
zap.Int("size", len(bootstrapMsg.Spans)))
for _, info := range bootstrapMsg.Spans {
dispatcherID := common.NewDispatcherIDFromPB(info.ID)
if dispatcherID == m.tableTriggerEventDispatcherID {
log.Info(
"skip table trigger event dispatcher",
zap.String("changefeed", m.id.ID),
zap.String("dispatcher", dispatcherID.String()),
zap.String("server", server.String()))
continue
}
status := &heartbeatpb.TableSpanStatus{
ComponentStatus: info.ComponentStatus,
ID: info.ID,
CheckpointTs: info.CheckpointTs,
}
span := info.Span

//working on remote, the state must be absent or working since it's reported by remote
stm := replica.NewWorkingReplicaSet(m.id, dispatcherID, info.SchemaID, span, status, server)
tableMap, ok := workingMap[span.TableID]
if !ok {
tableMap = utils.NewBtreeMap[*heartbeatpb.TableSpan, *replica.SpanReplication](heartbeatpb.LessTableSpan)
workingMap[span.TableID] = tableMap
}
tableMap.ReplaceOrInsert(span, stm)
}
}
m.controller.FinishBootstrap(workingMap)
}

// initTableIDs get tables ids base on the filter and checkpoint ts
func (m *Maintainer) initTables() ([]commonEvent.Table, error) {
startTs := m.watermark.CheckpointTs
f, err := filter.NewFilter(m.config.Config.Filter, "", m.config.Config.ForceReplicate)
barrier, err := m.controller.FinishBootstrap(cachedResp)
if err != nil {
return nil, errors.Cause(err)
m.handleError(err)
}

schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
tables, err := schemaStore.GetAllPhysicalTables(startTs, f)
log.Info("get table ids", zap.Int("count", len(tables)), zap.String("changefeed", m.id.String()))
return tables, nil
m.barrier = barrier
m.bootstrapped = true
}

func (m *Maintainer) onNodeClosed(from node.ID, response *heartbeatpb.MaintainerCloseResponse) {
Expand All @@ -563,6 +505,9 @@ func (m *Maintainer) handleResendMessage() {
if m.removing {
m.sendMaintainerCloseRequestToAllNode()
}
if !m.bootstrapped {
return
}
// resend barrier ack messages
m.sendMessages(m.barrier.Resend())
}
Expand Down
Loading

0 comments on commit 1dba381

Please sign in to comment.