Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

barrier handle bootstrap event #438

Merged
merged 1 commit into from
Nov 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions maintainer/barrier.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,39 @@ func (b *Barrier) HandleStatus(from node.ID,
})
}

// HandleBootstrapResponse rebuild the block event from the bootstrap response
func (b *Barrier) HandleBootstrapResponse(bootstrapRespMap map[node.ID]*heartbeatpb.MaintainerBootstrapResponse) {
for _, resp := range bootstrapRespMap {
for _, span := range resp.Spans {
// we only care about the WAITING, WRITING and DONE stage
if span.BlockState != nil && span.BlockState.Stage != heartbeatpb.BlockStage_NONE {
blockState := span.BlockState
key := getEventKey(blockState.BlockTs, blockState.IsSyncPoint)
event, ok := b.blockedTs[key]
if !ok {
event = NewBlockEvent(resp.ChangefeedID, b.controller, blockState, b.splitTableEnabled)
b.blockedTs[key] = event
}
switch blockState.Stage {
case heartbeatpb.BlockStage_WAITING:
// it's the dispatcher's responsibility to resend the block event
event.markDispatcherEventDone(common.NewDispatcherIDFromPB(span.ID))
case heartbeatpb.BlockStage_WRITING:
// it's in writing stage, must be the writer dispatcher
// it's the maintainer's responsibility to resend the write action
event.selected = true
event.writerDispatcher = common.NewDispatcherIDFromPB(span.ID)
case heartbeatpb.BlockStage_DONE:
// it's the maintainer's responsibility to resend the pass action
event.selected = true
event.writerDispatcherAdvanced = true
event.markDispatcherEventDone(common.NewDispatcherIDFromPB(span.ID))
}
}
}
}
}

// Resend resends the message to the dispatcher manger, the pass action is handle here
func (b *Barrier) Resend() []*messaging.TargetMessage {
var msgs []*messaging.TargetMessage
Expand Down
4 changes: 4 additions & 0 deletions maintainer/barrier_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,10 @@ func NewBlockEvent(cfID string, controller *Controller,
}
}
}
log.Info("new block event is created",
zap.String("changefeedID", cfID),
zap.Uint64("block-ts", event.commitTs),
zap.Bool("sync-point", event.isSyncPoint))
return event
}

Expand Down
109 changes: 27 additions & 82 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/logservice/schemastore"
"github.com/pingcap/ticdc/maintainer/replica"
"github.com/pingcap/ticdc/maintainer/split"
"github.com/pingcap/ticdc/pkg/bootstrap"
"github.com/pingcap/ticdc/pkg/common"
appcontext "github.com/pingcap/ticdc/pkg/common/context"
commonEvent "github.com/pingcap/ticdc/pkg/common/event"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/filter"
"github.com/pingcap/ticdc/pkg/messaging"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/pkg/node"
"github.com/pingcap/ticdc/server/watcher"
"github.com/pingcap/ticdc/utils"
"github.com/pingcap/ticdc/utils/dynstream"
"github.com/pingcap/ticdc/utils/threadpool"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -78,7 +74,7 @@ type Maintainer struct {

removed *atomic.Bool

initialized bool
bootstrapped bool

// startCheckpointTs is the check point ts when the maintainer is created
// it's will be sent to dispatcher manager to initialize the checkpoint ts and get the real checkpoint ts
Expand Down Expand Up @@ -146,7 +142,7 @@ func NewMaintainer(cfID model.ChangeFeedID,
taskScheduler: taskScheduler,
startCheckpointTs: checkpointTs,
controller: NewController(cfID.ID, checkpointTs, pdAPI, regionCache, taskScheduler,
cfg.Config.Scheduler, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)),
cfg.Config, ddlSpan, conf.AddTableBatchSize, time.Duration(conf.CheckBalanceInterval)),
mc: mc,
state: heartbeatpb.ComponentState_Working,
removed: atomic.NewBool(false),
Expand Down Expand Up @@ -178,7 +174,6 @@ func NewMaintainer(cfID model.ChangeFeedID,
handleEventDuration: metrics.MaintainerHandleEventDuration.WithLabelValues(cfID.Namespace, cfID.ID),
}
m.bootstrapper = bootstrap.NewBootstrapper[heartbeatpb.MaintainerBootstrapResponse](m.id.ID, m.getNewBootstrapFn())
m.barrier = NewBarrier(m.controller, cfg.Config.Scheduler.EnableTableAcrossNodes)
log.Info("maintainer is created", zap.String("id", cfID.String()))
metrics.MaintainerGauge.WithLabelValues(cfID.Namespace, cfID.ID).Inc()
return m
Expand Down Expand Up @@ -267,18 +262,6 @@ func (m *Maintainer) initialize() error {
start := time.Now()
log.Info("start to initialize changefeed maintainer",
zap.String("id", m.id.String()))
tables, err := m.initTables()
if err != nil {
return errors.Trace(err)
}
m.controller.SetInitialTables(tables)

log.Info("changefeed maintainer initialized",
zap.String("id", m.id.String()),
zap.Duration("duration", time.Since(start)))
m.initialized = true
m.state = heartbeatpb.ComponentState_Working
m.statusChanged.Store(true)

// detect the capture changes
m.nodeManager.RegisterNodeChangeHandler(node.ID("maintainer-"+m.id.ID), func(allNodes map[node.ID]*node.Info) {
Expand All @@ -298,6 +281,11 @@ func (m *Maintainer) initialize() error {
changefeedID: m.id.ID,
eventType: EventPeriod,
}, time.Now().Add(time.Millisecond*500))
log.Info("changefeed maintainer initialized",
zap.String("id", m.id.String()),
zap.Duration("duration", time.Since(start)))
m.state = heartbeatpb.ComponentState_Working
m.statusChanged.Store(true)
return nil
}

Expand All @@ -312,20 +300,11 @@ func (m *Maintainer) cleanupMetrics() {
}

func (m *Maintainer) onInit() bool {
// already initialized
if m.initialized {
return false
err := m.initialize()
if err != nil {
m.handleError(err)
}
// async initialize the changefeed
go func() {
err := m.initialize()
if err != nil {
m.handleError(err)
}
m.stream.Wake() <- m.id.ID
log.Info("stream waked", zap.String("changefeed", m.id.String()))
}()
return true
return false
}

func (m *Maintainer) onMessage(msg *messaging.TargetMessage) {
Expand Down Expand Up @@ -406,7 +385,7 @@ func (m *Maintainer) calCheckpointTs() {
// 1. node change
// 2. ddl
// 3. interval scheduling, like balance, split
if time.Since(m.lastCheckpointTsTime) < 2*time.Second ||
if !m.bootstrapped || time.Since(m.lastCheckpointTsTime) < 2*time.Second ||
!m.controller.ScheduleFinished() {
return
}
Expand Down Expand Up @@ -460,6 +439,10 @@ func (m *Maintainer) sendMessages(msgs []*messaging.TargetMessage) {
}

func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) {
// ignore the heartbeat if the maintianer not bootstrapped
if !m.bootstrapped {
return
}
req := msg.Message[0].(*heartbeatpb.HeartBeatRequest)
if req.Watermark != nil {
m.checkpointTsByCapture[msg.From] = *req.Watermark
Expand All @@ -478,6 +461,10 @@ func (m *Maintainer) onHeartBeatRequest(msg *messaging.TargetMessage) {
}

func (m *Maintainer) onBlockStateRequest(msg *messaging.TargetMessage) {
// the barrier is not initialized
if !m.bootstrapped {
return
}
req := msg.Message[0].(*heartbeatpb.BlockStatusRequest)
ackMsg := m.barrier.HandleStatus(msg.From, req)
m.sendMessages([]*messaging.TargetMessage{ackMsg})
Expand All @@ -495,57 +482,12 @@ func (m *Maintainer) onBootstrapDone(cachedResp map[node.ID]*heartbeatpb.Maintai
if cachedResp == nil {
return
}
log.Info("all nodes have sent bootstrap response",
zap.String("changefeed", m.id.ID),
zap.Int("size", len(cachedResp)))
workingMap := make(map[int64]utils.Map[*heartbeatpb.TableSpan, *replica.SpanReplication])
for server, bootstrapMsg := range cachedResp {
log.Info("received bootstrap response",
zap.String("changefeed", m.id.ID),
zap.Any("server", server),
zap.Int("size", len(bootstrapMsg.Spans)))
for _, info := range bootstrapMsg.Spans {
dispatcherID := common.NewDispatcherIDFromPB(info.ID)
if dispatcherID == m.tableTriggerEventDispatcherID {
log.Info(
"skip table trigger event dispatcher",
zap.String("changefeed", m.id.ID),
zap.String("dispatcher", dispatcherID.String()),
zap.String("server", server.String()))
continue
}
status := &heartbeatpb.TableSpanStatus{
ComponentStatus: info.ComponentStatus,
ID: info.ID,
CheckpointTs: info.CheckpointTs,
}
span := info.Span

//working on remote, the state must be absent or working since it's reported by remote
stm := replica.NewWorkingReplicaSet(m.id, dispatcherID, info.SchemaID, span, status, server)
tableMap, ok := workingMap[span.TableID]
if !ok {
tableMap = utils.NewBtreeMap[*heartbeatpb.TableSpan, *replica.SpanReplication](heartbeatpb.LessTableSpan)
workingMap[span.TableID] = tableMap
}
tableMap.ReplaceOrInsert(span, stm)
}
}
m.controller.FinishBootstrap(workingMap)
}

// initTableIDs get tables ids base on the filter and checkpoint ts
func (m *Maintainer) initTables() ([]commonEvent.Table, error) {
startTs := m.watermark.CheckpointTs
f, err := filter.NewFilter(m.config.Config.Filter, "", m.config.Config.ForceReplicate)
barrier, err := m.controller.FinishBootstrap(cachedResp)
if err != nil {
return nil, errors.Cause(err)
m.handleError(err)
}

schemaStore := appcontext.GetService[schemastore.SchemaStore](appcontext.SchemaStore)
tables, err := schemaStore.GetAllPhysicalTables(startTs, f)
log.Info("get table ids", zap.Int("count", len(tables)), zap.String("changefeed", m.id.String()))
return tables, nil
m.barrier = barrier
m.bootstrapped = true
}

func (m *Maintainer) onNodeClosed(from node.ID, response *heartbeatpb.MaintainerCloseResponse) {
Expand All @@ -563,6 +505,9 @@ func (m *Maintainer) handleResendMessage() {
if m.removing {
m.sendMaintainerCloseRequestToAllNode()
}
if !m.bootstrapped {
return
}
// resend barrier ack messages
m.sendMessages(m.barrier.Resend())
}
Expand Down
Loading
Loading