Skip to content

Commit

Permalink
Merge branch 'master' into span-replication/maps
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Jan 12, 2023
2 parents 6b765be + a7600c4 commit 5df46fe
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 584 deletions.
48 changes: 0 additions & 48 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,6 @@ import (
)

var (
resolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts",
Help: "local resolved ts of processor",
}, []string{"namespace", "changefeed"})
resolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts_lag",
Help: "local resolved ts lag of processor",
}, []string{"namespace", "changefeed"})
resolvedTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_resolved_table_id",
Help: "ID of the minimum resolved table",
}, []string{"namespace", "changefeed"})
checkpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts",
Help: "global checkpoint ts of processor",
}, []string{"namespace", "changefeed"})
checkpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts_lag",
Help: "global checkpoint ts lag of processor",
}, []string{"namespace", "changefeed"})
checkpointTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_checkpoint_table_id",
Help: "ID of the minimum checkpoint table",
}, []string{"namespace", "changefeed"})
syncTableNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down Expand Up @@ -128,12 +86,6 @@ var (

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(resolvedTsGauge)
registry.MustRegister(resolvedTsLagGauge)
registry.MustRegister(resolvedTsMinTableIDGauge)
registry.MustRegister(checkpointTsGauge)
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(checkpointTsMinTableIDGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
Expand Down
126 changes: 11 additions & 115 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"
"fmt"
"io"
"math"
"strconv"
"sync"
"time"
Expand Down Expand Up @@ -96,25 +95,17 @@ type processor struct {
newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error)
cfg *config.SchedulerConfig

liveness *model.Liveness
agent scheduler.Agent
checkpointTs model.Ts
resolvedTs model.Ts

metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricMinResolvedTableIDGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricMinCheckpointTableIDGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
metricsTableSinkTotalRows prometheus.Counter
metricsTableMemoryHistogram prometheus.Observer
metricsProcessorMemoryGauge prometheus.Gauge
metricRemainKVEventGauge prometheus.Gauge
liveness *model.Liveness
agent scheduler.Agent

metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
metricsTableSinkTotalRows prometheus.Counter
metricsTableMemoryHistogram prometheus.Observer
metricsProcessorMemoryGauge prometheus.Gauge
metricRemainKVEventGauge prometheus.Gauge
}

// checkReadyForMessages checks whether all necessary Etcd keys have been established.
Expand Down Expand Up @@ -287,9 +278,7 @@ func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bo
return false
}

localResolvedTs := p.resolvedTs
globalResolvedTs := p.changefeed.Status.ResolvedTs
localCheckpointTs := p.agent.GetLastSentCheckpointTs()
globalCheckpointTs := p.changefeed.Status.CheckpointTs

var tableResolvedTs, tableCheckpointTs uint64
Expand Down Expand Up @@ -336,10 +325,8 @@ func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bo
zap.String("changefeed", p.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("tableCheckpointTs", tableCheckpointTs),
zap.Uint64("localCheckpointTs", localCheckpointTs),
zap.Uint64("globalCheckpointTs", globalCheckpointTs),
zap.Any("state", state),
zap.Bool("isPrepare", isPrepare))
Expand All @@ -352,10 +339,8 @@ func (p *processor) IsAddTableSpanFinished(span tablepb.Span, isPrepare bool) bo
zap.String("changefeed", p.changefeedID.ID),
zap.Stringer("span", &span),
zap.Uint64("tableResolvedTs", tableResolvedTs),
zap.Uint64("localResolvedTs", localResolvedTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("tableCheckpointTs", tableCheckpointTs),
zap.Uint64("localCheckpointTs", localCheckpointTs),
zap.Uint64("globalCheckpointTs", globalCheckpointTs),
zap.Any("state", state),
zap.Bool("isPrepare", isPrepare))
Expand Down Expand Up @@ -446,11 +431,6 @@ func (p *processor) GetTableSpanCount() int {
return p.tableSpans.Len()
}

// GetCheckpoint implements TableExecutor interface.
func (p *processor) GetCheckpoint() (checkpointTs, resolvedTs model.Ts) {
return p.checkpointTs, p.resolvedTs
}

// GetTableSpanStatus implements TableExecutor interface
func (p *processor) GetTableSpanStatus(span tablepb.Span, collectStat bool) tablepb.TableStatus {
if p.pullBasedSinking {
Expand Down Expand Up @@ -560,18 +540,6 @@ func newProcessor(
cancel: func() {},
liveness: liveness,

metricResolvedTsGauge: resolvedTsGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricResolvedTsLagGauge: resolvedTsLagGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricMinResolvedTableIDGauge: resolvedTsMinTableIDGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsGauge: checkpointTsGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricCheckpointTsLagGauge: checkpointTsLagGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricMinCheckpointTableIDGauge: checkpointTsMinTableIDGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricSyncTableNumGauge: syncTableNumGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricProcessorErrorCounter: processorErrorCounter.
Expand Down Expand Up @@ -721,10 +689,6 @@ func (p *processor) tick(ctx cdcContext.Context) error {
return errors.Trace(err)
}
p.pushResolvedTs2Table()
// it is no need to check the error here, because we will use
// local time when an error return, which is acceptable
pdTime, _ := p.upstream.PDClock.CurrentTime()
p.handlePosition(oracle.GetPhysical(pdTime))

p.doGCSchemaStorage()

Expand Down Expand Up @@ -1057,66 +1021,6 @@ func (p *processor) sendError(err error) {
}
}

// handlePosition calculates the local resolved ts and local checkpoint ts.
// resolvedTs = min(schemaStorage's resolvedTs, all table's resolvedTs).
// table's resolvedTs = redo's resolvedTs if redo enable, else sorter's resolvedTs.
// checkpointTs = min(resolvedTs, all table's checkpointTs).
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
minResolvedTableID := int64(0)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
}
minCheckpointTs := minResolvedTs
minCheckpointTableID := int64(0)
if p.pullBasedSinking {
spans := p.sinkManager.GetAllCurrentTableSpans()
for _, span := range spans {
stats := p.sinkManager.GetTableStats(span)
log.Debug("sink manager gets table stats",
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Stringer("span", &span),
zap.Any("stats", stats))
if stats.ResolvedTs < minResolvedTs {
minResolvedTs = stats.ResolvedTs
minResolvedTableID = span.TableID
}
if stats.CheckpointTs < minCheckpointTs {
minCheckpointTs = stats.CheckpointTs
minCheckpointTableID = span.TableID
}
}
} else {
p.tableSpans.Range(func(span tablepb.Span, table tablepb.TablePipeline) bool {
rts := table.ResolvedTs()
if rts < minResolvedTs {
minResolvedTs = rts
minResolvedTableID = table.ID()
}
cts := table.CheckpointTs()
if cts < minCheckpointTs {
minCheckpointTs = cts
minCheckpointTableID = table.ID()
}
return true
})
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))
p.metricMinResolvedTableIDGauge.Set(float64(minResolvedTableID))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))
p.metricMinCheckpointTableIDGauge.Set(float64(minCheckpointTableID))

p.checkpointTs = minCheckpointTs
p.resolvedTs = minResolvedTs
}

// pushResolvedTs2Table sends global resolved ts to all the table pipelines.
func (p *processor) pushResolvedTs2Table() {
resolvedTs := p.changefeed.Status.ResolvedTs
Expand Down Expand Up @@ -1416,14 +1320,6 @@ func (p *processor) Close(ctx cdcContext.Context) error {
}

func (p *processor) cleanupMetrics() {
resolvedTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
resolvedTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

checkpointTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsLagGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
checkpointTsMinTableIDGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)

syncTableNumGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorErrorCounter.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID.Namespace, p.changefeedID.ID)
Expand Down
53 changes: 4 additions & 49 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,24 +248,18 @@ type mockAgent struct {
// dummy to satisfy the interface
scheduler.Agent

executor scheduler.TableExecutor
lastCheckpointTs model.Ts
liveness *model.Liveness
isClosed bool
executor scheduler.TableExecutor
liveness *model.Liveness
isClosed bool
}

func (a *mockAgent) Tick(_ context.Context) error {
if a.executor.GetTableSpanCount() == 0 {
return nil
}
a.lastCheckpointTs, _ = a.executor.GetCheckpoint()
return nil
}

func (a *mockAgent) GetLastSentCheckpointTs() (checkpointTs model.Ts) {
return a.lastCheckpointTs
}

func (a *mockAgent) Close() error {
a.isClosed = true
return nil
Expand Down Expand Up @@ -305,9 +299,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {

require.Equal(t, 1, p.tableSpans.Len())

checkpointTs := p.agent.GetLastSentCheckpointTs()
require.Equal(t, checkpointTs, model.Ts(0))

done := p.IsAddTableSpanFinished(spanz.TableIDToComparableSpan(1), true)
require.False(t, done)
require.Equal(t, tablepb.TableStatePreparing, table1.State())
Expand All @@ -323,10 +314,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, done)
require.Equal(t, tablepb.TableStatePrepared, table1.State())

// no table is `replicating`
checkpointTs = p.agent.GetLastSentCheckpointTs()
require.Equal(t, checkpointTs, model.Ts(20))

ok, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), 30, true)
require.NoError(t, err)
require.True(t, ok)
Expand All @@ -347,9 +334,6 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) {
require.True(t, done)
require.Equal(t, tablepb.TableStateReplicating, table1.State())

checkpointTs = p.agent.GetLastSentCheckpointTs()
require.Equal(t, table1.CheckpointTs(), checkpointTs)

err = p.Close(ctx)
require.Nil(t, err)
require.Nil(t, p.agent)
Expand Down Expand Up @@ -446,15 +430,9 @@ func TestProcessorClose(t *testing.T) {
return status, true, nil
})
tester.MustApplyPatches()
p.tableSpans.GetV(spanz.TableIDToComparableSpan(1)).(*mockTablePipeline).resolvedTs = 110
p.tableSpans.GetV(spanz.TableIDToComparableSpan(2)).(*mockTablePipeline).resolvedTs = 90
p.tableSpans.GetV(spanz.TableIDToComparableSpan(1)).(*mockTablePipeline).checkpointTs = 90
p.tableSpans.GetV(spanz.TableIDToComparableSpan(2)).(*mockTablePipeline).checkpointTs = 95
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.EqualValues(t, p.checkpointTs, 90)
require.EqualValues(t, p.resolvedTs, 90)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)

require.Nil(t, p.Close(ctx))
Expand Down Expand Up @@ -516,23 +494,6 @@ func TestPositionDeleted(t *testing.T) {
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

table1 := p.tableSpans.GetV(spanz.TableIDToComparableSpan(1)).(*mockTablePipeline)
table2 := p.tableSpans.GetV(spanz.TableIDToComparableSpan(2)).(*mockTablePipeline)

table1.resolvedTs++
table2.resolvedTs++

table1.checkpointTs++
table2.checkpointTs++

// cal position
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()

require.Equal(t, model.Ts(31), p.checkpointTs)
require.Equal(t, model.Ts(31), p.resolvedTs)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)

// some others delete the task position
Expand All @@ -541,18 +502,12 @@ func TestPositionDeleted(t *testing.T) {
return nil, true, nil
})
tester.MustApplyPatches()

// position created again
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.Equal(t, &model.TaskPosition{}, p.changefeed.TaskPositions[p.captureInfo.ID])

// cal position
err = p.Tick(ctx)
require.Nil(t, err)
tester.MustApplyPatches()
require.Equal(t, model.Ts(31), p.checkpointTs)
require.Equal(t, model.Ts(31), p.resolvedTs)
require.Contains(t, p.changefeed.TaskPositions, p.captureInfo.ID)
}

Expand Down
Loading

0 comments on commit 5df46fe

Please sign in to comment.