Skip to content

Commit

Permalink
processor(ticdc): add min resolved/checkpoint table ID metrics (pingc…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Jan 18, 2022
1 parent 3acebd2 commit 8957631
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 89 deletions.
16 changes: 16 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ var (
Name: "resolved_ts_lag",
Help: "local resolved ts lag of processor",
}, []string{"changefeed", "capture"})
resolvedTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_resolved_table_id",
Help: "ID of the minimum resolved table",
}, []string{"changefeed", "capture"})
checkpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -46,6 +53,13 @@ var (
Name: "checkpoint_ts_lag",
Help: "global checkpoint ts lag of processor",
}, []string{"changefeed", "capture"})
checkpointTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_checkpoint_table_id",
Help: "ID of the minimum checkpoint table",
}, []string{"changefeed", "capture"})
syncTableNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -66,8 +80,10 @@ var (
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(resolvedTsGauge)
registry.MustRegister(resolvedTsLagGauge)
registry.MustRegister(resolvedTsMinTableIDGauge)
registry.MustRegister(checkpointTsGauge)
registry.MustRegister(checkpointTsLagGauge)
registry.MustRegister(checkpointTsMinTableIDGauge)
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
}
8 changes: 0 additions & 8 deletions cdc/processor/pipeline/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,6 @@ import (
)

var (
tableResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "table_resolved_ts",
Help: "local resolved ts of processor",
}, []string{"changefeed", "capture", "table"})
txnCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand All @@ -44,7 +37,6 @@ var (

// InitMetrics registers all metrics used in processor
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(tableResolvedTsGauge)
registry.MustRegister(txnCounter)
registry.MustRegister(tableMemoryHistogram)
}
6 changes: 0 additions & 6 deletions cdc/processor/pipeline/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"context"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/puller"
cdcContext "github.com/pingcap/tiflow/pkg/context"
Expand Down Expand Up @@ -58,7 +57,6 @@ func (n *pullerNode) tableSpan(ctx cdcContext.Context) []regionspan.Span {
}

func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
metricTableResolvedTsGauge := tableResolvedTsGauge.WithLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
ctxC, cancel := context.WithCancel(ctx)
ctxC = util.PutTableInfoInCtx(ctxC, n.tableID, n.tableName)
ctxC = util.PutCaptureAddrInCtx(ctxC, ctx.GlobalVars().CaptureInfo.AdvertiseAddr)
Expand All @@ -80,9 +78,6 @@ func (n *pullerNode) Init(ctx pipeline.NodeContext) error {
if rawKV == nil {
continue
}
if rawKV.OpType == model.OpTypeResolved {
metricTableResolvedTsGauge.Set(float64(oracle.ExtractPhysical(rawKV.CRTs)))
}
pEvent := model.NewPolymorphicEvent(rawKV)
ctx.SendToNextNode(pipeline.PolymorphicEventMessage(pEvent))
}
Expand All @@ -100,7 +95,6 @@ func (n *pullerNode) Receive(ctx pipeline.NodeContext) error {
}

func (n *pullerNode) Destroy(ctx pipeline.NodeContext) error {
tableResolvedTsGauge.DeleteLabelValues(ctx.ChangefeedVars().ID, ctx.GlobalVars().CaptureInfo.AdvertiseAddr, n.tableName)
n.cancel()
return n.wg.Wait()
}
34 changes: 22 additions & 12 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,14 @@ type processor struct {
lazyInit func(ctx cdcContext.Context) error
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error)

metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricResolvedTsGauge prometheus.Gauge
metricResolvedTsLagGauge prometheus.Gauge
metricMinResolvedTableIDGuage prometheus.Gauge
metricCheckpointTsGauge prometheus.Gauge
metricCheckpointTsLagGauge prometheus.Gauge
metricMinCheckpointTableIDGuage prometheus.Gauge
metricSyncTableNumGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
}

// newProcessor creates a new processor
Expand All @@ -90,12 +92,14 @@ func newProcessor(ctx cdcContext.Context) *processor {
captureInfo: ctx.GlobalVars().CaptureInfo,
cancel: func() {},

metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsGauge: resolvedTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricResolvedTsLagGauge: resolvedTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricMinResolvedTableIDGuage: resolvedTsMinTableIDGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsGauge: checkpointTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricCheckpointTsLagGauge: checkpointTsLagGauge.WithLabelValues(changefeedID, advertiseAddr),
metricMinCheckpointTableIDGuage: checkpointTsMinTableIDGauge.WithLabelValues(changefeedID, advertiseAddr),
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
Expand Down Expand Up @@ -569,31 +573,37 @@ func (p *processor) checkTablesNum(ctx cdcContext.Context) error {
// handlePosition calculates the local resolved ts and local checkpoint ts
func (p *processor) handlePosition(currentTs int64) {
minResolvedTs := uint64(math.MaxUint64)
minResolvedTableID := int64(0)
if p.schemaStorage != nil {
minResolvedTs = p.schemaStorage.ResolvedTs()
}
for _, table := range p.tables {
ts := table.ResolvedTs()
if ts < minResolvedTs {
minResolvedTs = ts
minResolvedTableID, _ = table.ID()
}
}

minCheckpointTs := minResolvedTs
minCheckpointTableID := int64(0)
for _, table := range p.tables {
ts := table.CheckpointTs()
if ts < minCheckpointTs {
minCheckpointTs = ts
minCheckpointTableID, _ = table.ID()
}
}

resolvedPhyTs := oracle.ExtractPhysical(minResolvedTs)
p.metricResolvedTsLagGauge.Set(float64(currentTs-resolvedPhyTs) / 1e3)
p.metricResolvedTsGauge.Set(float64(resolvedPhyTs))
p.metricMinResolvedTableIDGuage.Set(float64(minResolvedTableID))

checkpointPhyTs := oracle.ExtractPhysical(minCheckpointTs)
p.metricCheckpointTsLagGauge.Set(float64(currentTs-checkpointPhyTs) / 1e3)
p.metricCheckpointTsGauge.Set(float64(checkpointPhyTs))
p.metricMinCheckpointTableIDGuage.Set(float64(minCheckpointTableID))

// minResolvedTs and minCheckpointTs may less than global resolved ts and global checkpoint ts when a new table added, the startTs of the new table is less than global checkpoint ts.
if minResolvedTs != p.changefeed.TaskPositions[p.captureInfo.ID].ResolvedTs ||
Expand Down
Loading

0 comments on commit 8957631

Please sign in to comment.