Skip to content

Commit

Permalink
processor(ticdc): replace tableID with Span in sourcemanager and sink…
Browse files Browse the repository at this point in the history
…manager (#7978)

ref #7720
  • Loading branch information
overvenus authored Jan 4, 2023
1 parent 7f07d7d commit 7294c37
Show file tree
Hide file tree
Showing 28 changed files with 678 additions and 570 deletions.
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ type pullerNode struct {
}

func newPullerNode(
tableID tablepb.Span,
span tablepb.Span,
startTs model.Ts,
tableName string,
changefeed model.ChangeFeedID,
) *pullerNode {
return &pullerNode{
span: tableID,
span: span,
startTs: startTs,
tableName: tableName,
changefeed: changefeed,
Expand Down
74 changes: 38 additions & 36 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (p *processor) AddTableSpan(
var alreadyExist bool
var state tablepb.TableState
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
state, alreadyExist = p.sinkManager.GetTableState(span)
} else {
table, ok := p.tableSpans.Get(span)
if ok {
Expand All @@ -176,7 +176,7 @@ func (p *processor) AddTableSpan(
// be stopped on original capture already, it's safe to start replicating data now.
if !isPrepare {
if p.pullBasedSinking {
if err := p.sinkManager.StartTable(span.TableID, startTs); err != nil {
if err := p.sinkManager.StartTable(span, startTs); err != nil {
return false, errors.Trace(err)
}
} else {
Expand Down Expand Up @@ -221,12 +221,12 @@ func (p *processor) AddTableSpan(

if p.pullBasedSinking {
p.sinkManager.AddTable(
span.TableID, startTs, p.changefeed.Info.TargetTs)
span, startTs, p.changefeed.Info.TargetTs)
if p.redoManager.Enabled() {
p.redoManager.AddTable(span, startTs)
}
p.sourceManager.AddTable(
ctx.(cdcContext.Context), span.TableID, p.getTableName(ctx, span.TableID), startTs)
ctx.(cdcContext.Context), span, p.getTableName(ctx, span.TableID), startTs)
} else {
table, err := p.createTablePipeline(
ctx.(cdcContext.Context), span, &model.TableReplicaInfo{StartTs: startTs})
Expand All @@ -246,7 +246,7 @@ func (p *processor) RemoveTableSpan(span tablepb.Span) bool {
}

if p.pullBasedSinking {
_, exist := p.sinkManager.GetTableState(span.TableID)
_, exist := p.sinkManager.GetTableState(span)
if !exist {
log.Warn("Table which will be deleted is not found",
zap.String("capture", p.captureInfo.ID),
Expand All @@ -255,7 +255,7 @@ func (p *processor) RemoveTableSpan(span tablepb.Span) bool {
zap.Stringer("span", &span))
return true
}
p.sinkManager.AsyncStopTable(span.TableID)
p.sinkManager.AsyncStopTable(span)
return true
}
table, ok := p.tableSpans.Get(span)
Expand Down Expand Up @@ -297,9 +297,9 @@ func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bo
done := func() bool {
var alreadyExist bool
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
state, alreadyExist = p.sinkManager.GetTableState(span)
if alreadyExist {
stats := p.sinkManager.GetTableStats(span.TableID)
stats := p.sinkManager.GetTableStats(span)
tableResolvedTs = stats.ResolvedTs
tableCheckpointTs = stats.CheckpointTs
}
Expand Down Expand Up @@ -372,9 +372,9 @@ func (p *processor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool
var state tablepb.TableState
var tableCheckpointTs uint64
if p.pullBasedSinking {
state, alreadyExist = p.sinkManager.GetTableState(span.TableID)
state, alreadyExist = p.sinkManager.GetTableState(span)
if alreadyExist {
stats := p.sinkManager.GetTableStats(span.TableID)
stats := p.sinkManager.GetTableStats(span)
tableCheckpointTs = stats.CheckpointTs
}
} else {
Expand Down Expand Up @@ -407,12 +407,12 @@ func (p *processor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool
}

if p.pullBasedSinking {
stats := p.sinkManager.GetTableStats(span.TableID)
stats := p.sinkManager.GetTableStats(span)
if p.redoManager.Enabled() {
p.redoManager.RemoveTable(span)
}
p.sinkManager.RemoveTable(span.TableID)
p.sourceManager.RemoveTable(span.TableID)
p.sinkManager.RemoveTable(span)
p.sourceManager.RemoveTable(span)
log.Info("table removed",
zap.String("captureID", p.captureInfo.ID),
zap.String("namespace", p.changefeedID.Namespace),
Expand Down Expand Up @@ -441,7 +441,7 @@ func (p *processor) IsRemoveTableSpanFinished(span tablepb.Span) (model.Ts, bool
// GetTableSpanCount implements TableExecutor interface.
func (p *processor) GetTableSpanCount() int {
if p.pullBasedSinking {
return len(p.sinkManager.GetAllCurrentTableIDs())
return len(p.sinkManager.GetAllCurrentTableSpans())
}
return p.tableSpans.Len()
}
Expand All @@ -454,18 +454,18 @@ func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
// GetTableSpanStatus implements TableExecutor interface
func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus {
if p.pullBasedSinking {
state, exist := p.sinkManager.GetTableState(span.TableID)
state, exist := p.sinkManager.GetTableState(span)
if !exist {
return tablepb.TableStatus{
TableID: span.TableID,
Span: span,
State: tablepb.TableStateAbsent,
}
}
sinkStats := p.sinkManager.GetTableStats(span.TableID)
sinkStats := p.sinkManager.GetTableStats(span)
stats := tablepb.Stats{}
if collectStat {
stats = p.getStatsFromSourceManagerAndSinkManager(span.TableID, sinkStats)
stats = p.getStatsFromSourceManagerAndSinkManager(span, sinkStats)
}
return tablepb.TableStatus{
TableID: span.TableID,
Expand Down Expand Up @@ -502,8 +502,10 @@ func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tabl
}
}

func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableID, sinkStats sinkmanager.TableStats) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(tableID)
func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.GetTablePullerStats(span)
now, _ := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
Expand All @@ -526,7 +528,7 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(tableID model.TableI
},
}

sortStats := p.sourceManager.GetTableSorterStats(tableID)
sortStats := p.sourceManager.GetTableSorterStats(span)
stats.StageCheckpoints["sorter-ingress"] = tablepb.Checkpoint{
CheckpointTs: sortStats.ReceivedMaxCommitTs,
ResolvedTs: sortStats.ReceivedMaxResolvedTs,
Expand Down Expand Up @@ -1068,21 +1070,21 @@ func (p *processor) handlePosition(currentTs int64) {
minCheckpointTs := minResolvedTs
minCheckpointTableID := int64(0)
if p.pullBasedSinking {
tableIDs := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tableIDs {
stats := p.sinkManager.GetTableStats(tableID)
spans := p.sinkManager.GetAllCurrentTableSpans()
for _, span := range spans {
stats := p.sinkManager.GetTableStats(span)
log.Debug("sink manager gets table stats",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID),
zap.Stringer("span", &span),
zap.Any("stats", stats))
if stats.ResolvedTs < minResolvedTs {
minResolvedTs = stats.ResolvedTs
minResolvedTableID = tableID
minResolvedTableID = span.TableID
}
if stats.CheckpointTs < minCheckpointTs {
minCheckpointTs = stats.CheckpointTs
minCheckpointTableID = tableID
minCheckpointTableID = span.TableID
}
}
} else {
Expand Down Expand Up @@ -1225,8 +1227,8 @@ func (p *processor) removeTable(table tablepb.TablePipeline, span tablepb.Span)
p.redoManager.RemoveTable(span)
}
if p.pullBasedSinking {
p.sinkManager.RemoveTable(span.TableID)
p.sourceManager.RemoveTable(span.TableID)
p.sinkManager.RemoveTable(span)
p.sourceManager.RemoveTable(span)
} else {
table.Cancel()
table.Wait()
Expand Down Expand Up @@ -1264,8 +1266,8 @@ func (p *processor) doGCSchemaStorage() {

func (p *processor) refreshMetrics() {
if p.pullBasedSinking {
tables := p.sinkManager.GetAllCurrentTableIDs()
p.metricSyncTableNumGauge.Set(float64(len(tables)))
tableSpans := p.sinkManager.GetAllCurrentTableSpans()
p.metricSyncTableNumGauge.Set(float64(len(tableSpans)))
sortEngineReceivedEvents := p.sourceManager.ReceivedEvents()
tableSinksReceivedEvents := p.sinkManager.ReceivedEvents()
p.metricRemainKVEventGauge.Set(float64(sortEngineReceivedEvents - tableSinksReceivedEvents))
Expand Down Expand Up @@ -1443,13 +1445,13 @@ func (p *processor) cleanupMetrics() {
func (p *processor) WriteDebugInfo(w io.Writer) error {
fmt.Fprintf(w, "%+v\n", *p.changefeed)
if p.pullBasedSinking {
tables := p.sinkManager.GetAllCurrentTableIDs()
for _, tableID := range tables {
state, _ := p.sinkManager.GetTableState(tableID)
stats := p.sinkManager.GetTableStats(tableID)
spans := p.sinkManager.GetAllCurrentTableSpans()
for _, span := range spans {
state, _ := p.sinkManager.GetTableState(span)
stats := p.sinkManager.GetTableStats(span)
// TODO: add table name.
fmt.Fprintf(w, "tableID: %d, resolvedTs: %d, checkpointTs: %d, state: %s\n",
tableID, stats.ResolvedTs, stats.CheckpointTs, state)
fmt.Fprintf(w, "span: %s, resolvedTs: %d, checkpointTs: %d, state: %s\n",
&span, stats.ResolvedTs, stats.CheckpointTs, state)
}
} else {
p.tableSpans.Ascend(func(span tablepb.Span, tablePipeline tablepb.TablePipeline) bool {
Expand Down
Loading

0 comments on commit 7294c37

Please sign in to comment.