Skip to content

Commit

Permalink
cdc: refactor TableExecutor interface and agent
Browse files Browse the repository at this point in the history
* Replace TableID with tablepb.Span in TableExecutor
* Replace TableID with tablepb.Span in agent

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Dec 6, 2022
1 parent 15afac0 commit 56d985b
Show file tree
Hide file tree
Showing 10 changed files with 476 additions and 296 deletions.
2 changes: 1 addition & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (m *managerImpl) handleCommand(ctx cdcContext.Context) error {
case commandTpQueryTableCount:
count := 0
for _, p := range m.processors {
count += len(p.GetAllCurrentTables())
count += p.GetAllCurrentTables()
}
select {
case cmd.payload.(chan int) <- count:
Expand Down
130 changes: 67 additions & 63 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ func (p *processor) checkReadyForMessages() bool {
return p.changefeed != nil && p.changefeed.Status != nil
}

var _ scheduler.TableExecutor = (*processor)(nil)

// AddTable implements TableExecutor interface.
// AddTable may cause by the following scenario
// 1. `Create Table`, a new table dispatched to the processor, `isPrepare` should be false
// 2. Prepare phase for 2 phase scheduling, `isPrepare` should be true.
// 3. Replicating phase for 2 phase scheduling, `isPrepare` should be false
func (p *processor) AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts, isPrepare bool,
ctx context.Context, span tablepb.Span, startTs model.Ts, isPrepare bool,
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
Expand All @@ -135,17 +137,17 @@ func (p *processor) AddTable(
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
}

var alreadyExist bool
var state tablepb.TableState
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
} else {
table, ok := p.tables[tableID]
table, ok := p.tables[span.TableID]
if ok {
alreadyExist = true
state = table.State()
Expand All @@ -161,7 +163,7 @@ func (p *processor) AddTable(
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
return true, nil
Expand All @@ -170,11 +172,11 @@ func (p *processor) AddTable(
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
if p.pullBasedSinking {
if err := p.sinkManager.StartTable(tableID, startTs); err != nil {
if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil {
return false, errors.Trace(err)
}
} else {
p.tables[tableID].Start(startTs)
p.tables[span.TableID].Start(startTs)
}
}
return true, nil
Expand All @@ -183,7 +185,7 @@ func (p *processor) AddTable(
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
return true, nil
Expand All @@ -192,10 +194,10 @@ func (p *processor) AddTable(
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
p.removeTable(p.tables[tableID], tableID)
p.removeTable(p.tables[span.TableID], span.TableID)
}
}

Expand All @@ -208,36 +210,37 @@ func (p *processor) AddTable(
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", startTs),
zap.Bool("isPrepare", isPrepare))
}

if p.pullBasedSinking {
p.sourceManager.AddTable(ctx.(cdcContext.Context), tableID, p.getTableName(ctx, tableID), startTs)
p.sourceManager.AddTable(
ctx.(cdcContext.Context), span.TableID, p.getTableName(ctx, span.TableID), startTs)
if p.redoManager.Enabled() {
p.redoManager.AddTable(tableID, startTs)
p.redoManager.AddTable(span.TableID, startTs)
}
p.sinkManager.AddTable(tableID, startTs, p.changefeed.Info.TargetTs)
p.sinkManager.AddTable(span.TableID, startTs, p.changefeed.Info.TargetTs)
if !isPrepare {
if err := p.sinkManager.StartTable(tableID, startTs); err != nil {
if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil {
return false, errors.Trace(err)
}
}
} else {
table, err := p.createTablePipeline(
ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs})
ctx.(cdcContext.Context), span.TableID, &model.TableReplicaInfo{StartTs: startTs})
if err != nil {
return false, errors.Trace(err)
}
p.tables[tableID] = table
p.tables[span.TableID] = table
if !isPrepare {
table.Start(startTs)
log.Debug("start table",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("startTs", startTs))
}
}
Expand All @@ -246,31 +249,31 @@ func (p *processor) AddTable(
}

// RemoveTable implements TableExecutor interface.
func (p *processor) RemoveTable(tableID model.TableID) bool {
func (p *processor) RemoveTable(span tablepb.Span) bool {
if !p.checkReadyForMessages() {
return false
}

if p.pullBasedSinking {
_, exist := p.sinkManager.GetTableState(tableID)
_, exist := p.sinkManager.GetTableState(span.TableID)
if !exist {
log.Warn("Table which will be deleted is not found",
zap.String("capture", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
zap.Int64("tableID", span.TableID))
return true
}
p.sinkManager.AsyncStopTable(tableID)
p.sinkManager.AsyncStopTable(span.TableID)
return true
}
table, ok := p.tables[tableID]
table, ok := p.tables[span.TableID]
if !ok {
log.Warn("table which will be deleted is not found",
zap.String("capture", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
zap.Int64("tableID", span.TableID))
return true
}
if !table.AsyncStop() {
Expand All @@ -281,14 +284,14 @@ func (p *processor) RemoveTable(tableID model.TableID) bool {
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Uint64("checkpointTs", table.CheckpointTs()),
zap.Int64("tableID", tableID))
zap.Int64("tableID", span.TableID))
return false
}
return true
}

// IsAddTableFinished implements TableExecutor interface.
func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bool {
func (p *processor) IsAddTableFinished(span tablepb.Span, isPrepare bool) bool {
if !p.checkReadyForMessages() {
return false
}
Expand All @@ -303,14 +306,14 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
done := func() bool {
var alreadyExist bool
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
if alreadyExist {
stats := p.sinkManager.GetTableStats(tableID)
stats := p.sinkManager.GetTableStats(span.TableID)
tableResolvedTs = stats.ResolvedTs
tableCheckpointTs = stats.CheckpointTs
}
} else {
table, ok := p.tables[tableID]
table, ok := p.tables[span.TableID]
if ok {
alreadyExist = true
state = table.State()
Expand All @@ -324,7 +327,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Bool("isPrepare", isPrepare))
}

Expand All @@ -340,7 +343,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
Expand All @@ -356,7 +359,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
Expand All @@ -369,7 +372,7 @@ func (p *processor) IsAddTableFinished(tableID model.TableID, isPrepare bool) bo
}

// IsRemoveTableFinished implements TableExecutor interface.
func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool) {
func (p *processor) IsRemoveTableFinished(span tablepb.Span) (model.Ts, bool) {
if !p.checkReadyForMessages() {
return 0, false
}
Expand All @@ -378,13 +381,13 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
var state tablepb.TableState
var tableCheckpointTs uint64
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(tableID)
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
if alreadyExist {
stats := p.sinkManager.GetTableStats(tableID)
stats := p.sinkManager.GetTableStats(span.TableID)
tableCheckpointTs = stats.CheckpointTs
}
} else {
table, ok := p.tables[tableID]
table, ok := p.tables[span.TableID]
if ok {
alreadyExist = true
state = table.State()
Expand All @@ -397,7 +400,7 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
zap.Int64("tableID", span.TableID))
return 0, true
}

Expand All @@ -407,53 +410,49 @@ func (p *processor) IsRemoveTableFinished(tableID model.TableID) (model.Ts, bool
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Uint64("checkpointTs", tableCheckpointTs),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Any("tableStatus", state))
return 0, false
}

if p.pullBasedSinking {
stats := p.sinkManager.GetTableStats(tableID)
p.sourceManager.RemoveTable(tableID)
p.sinkManager.RemoveTable(tableID)
stats := p.sinkManager.GetTableStats(span.TableID)
p.sourceManager.RemoveTable(span.TableID)
p.sinkManager.RemoveTable(span.TableID)
if p.redoManager.Enabled() {
p.redoManager.RemoveTable(tableID)
p.redoManager.RemoveTable(span.TableID)
}
log.Info("table removed",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", stats.CheckpointTs))

return stats.CheckpointTs, true
}
table := p.tables[tableID]
table := p.tables[span.TableID]
p.metricRemainKVEventGauge.Sub(float64(table.RemainEvents()))
table.Cancel()
table.Wait()
delete(p.tables, tableID)
delete(p.tables, span.TableID)

checkpointTs := table.CheckpointTs()
log.Info("table removed",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Int64("tableID", span.TableID),
zap.Uint64("checkpointTs", checkpointTs))
return checkpointTs, true
}

// GetAllCurrentTables implements TableExecutor interface.
func (p *processor) GetAllCurrentTables() []model.TableID {
func (p *processor) GetAllCurrentTables() int {
if p.pullBasedSinking {
return p.sinkManager.GetAllCurrentTableIDs()
}
ret := make([]model.TableID, 0, len(p.tables))
for tableID := range p.tables {
ret = append(ret, tableID)
return len(p.sinkManager.GetAllCurrentTableIDs())
}
return ret
return len(p.tables)
}

// GetCheckpoint implements TableExecutor interface.
Expand All @@ -462,35 +461,39 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
}

// GetTableStatus implements TableExecutor interface
func (p *processor) GetTableStatus(tableID model.TableID) tablepb.TableStatus {
func (p *processor) GetTableStatus(span tablepb.Span) tablepb.TableStatus {
if p.pullBasedSinking {
state, exist := p.sinkManager.GetTableState(tableID)
state, exist := p.sinkManager.GetTableState(span.TableID)
if !exist {
return tablepb.TableStatus{
TableID: tableID,
TableID: span.TableID,
Span: span,
State: tablepb.TableStateAbsent,
}
}
sinkStats := p.sinkManager.GetTableStats(tableID)
sinkStats := p.sinkManager.GetTableStats(span.TableID)
return tablepb.TableStatus{
TableID: tableID,
TableID: span.TableID,
Span: span,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: sinkStats.CheckpointTs,
ResolvedTs: sinkStats.ResolvedTs,
},
State: state,
Stats: p.getStatsFromSourceManagerAndSinkManager(tableID, sinkStats),
Stats: p.getStatsFromSourceManagerAndSinkManager(span.TableID, sinkStats),
}
}
table, ok := p.tables[tableID]
table, ok := p.tables[span.TableID]
if !ok {
return tablepb.TableStatus{
TableID: tableID,
TableID: span.TableID,
Span: span,
State: tablepb.TableStateAbsent,
}
}
return tablepb.TableStatus{
TableID: tableID,
TableID: span.TableID,
Span: span,
Checkpoint: tablepb.Checkpoint{
CheckpointTs: table.CheckpointTs(),
ResolvedTs: table.ResolvedTs(),
Expand Down Expand Up @@ -924,9 +927,10 @@ func (p *processor) newAgentImpl(
messageRouter := ctx.GlobalVars().MessageRouter
etcdClient := ctx.GlobalVars().EtcdClient
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug.Scheduler
ret, err = scheduler.NewAgent(
ctx, captureID, liveness,
messageServer, messageRouter, etcdClient, p, p.changefeedID)
messageServer, messageRouter, etcdClient, p, p.changefeedID, cfg)
return ret, errors.Trace(err)
}

Expand Down
Loading

0 comments on commit 56d985b

Please sign in to comment.