Skip to content

Commit

Permalink
metrics(ticdc): add some log and metrics to owner and processorManage… (
Browse files Browse the repository at this point in the history
#4402)

close #3884
  • Loading branch information
CharlesCheung96 authored Feb 8, 2022
1 parent 3c410c5 commit c4f8055
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 14 deletions.
33 changes: 32 additions & 1 deletion cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"strings"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
Expand Down Expand Up @@ -72,6 +73,7 @@ type changefeed struct {
metricsChangefeedCheckpointTsLagGauge prometheus.Gauge
metricsChangefeedResolvedTsGauge prometheus.Gauge
metricsChangefeedResolvedTsLagGauge prometheus.Gauge
metricChangefeedTickDuration prometheus.Observer

newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
Expand Down Expand Up @@ -109,12 +111,25 @@ func newChangefeed4Test(
}

func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) {
startTime := time.Now()

ctx = cdcContext.WithErrorHandler(ctx, func(err error) error {
c.errCh <- errors.Trace(err)
return nil
})
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
if err := c.tick(ctx, state, captures); err != nil {
err := c.tick(ctx, state, captures)

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized {
costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
c.metricChangefeedTickDuration.Observe(costTime.Seconds())
}

if err != nil {
log.Error("an error occurred in Owner", zap.String("changefeed", c.state.ID), zap.Error(err))
var code string
if rfcCode, ok := cerror.RFCCode(err); ok {
Expand Down Expand Up @@ -185,7 +200,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
// So we return here.
return nil
}
startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(ctx, c.state, c.schema.AllPhysicalTables(), captures)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -296,6 +316,7 @@ LOOP:
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id)
c.metricChangefeedTickDuration = changefeedTickDuration.WithLabelValues(c.id)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -338,6 +359,9 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id)
c.metricChangefeedTickDuration = nil

c.initialized = false
}

Expand Down Expand Up @@ -548,7 +572,14 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode
}

func (c *changefeed) Close(ctx cdcContext.Context) {
startTime := time.Now()

c.releaseResources(ctx)
costTime := time.Since(startTime)
if costTime > changefeedLogsWarnDuration {
log.Warn("changefeed close took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime))
}
changefeedCloseDuration.Observe(costTime.Seconds())
}

func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
Expand Down
29 changes: 28 additions & 1 deletion cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@

package owner

import "github.com/prometheus/client_golang/prometheus"
import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

var (
changefeedCheckpointTsGauge = prometheus.NewGaugeVec(
Expand Down Expand Up @@ -65,13 +69,34 @@ var (
Name: "status",
Help: "The status of changefeeds",
}, []string{"changefeed"})
changefeedTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_tick_duration",
Help: "Bucketed histogram of owner tick changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
changefeedCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_close_duration",
Help: "Bucketed histogram of owner close changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
})
)

const (
// total tables that have been dispatched to a single processor
maintainTableTypeTotal string = "total"
// tables that are dispatched to a processor and have not been finished yet
maintainTableTypeWip string = "wip"
// When heavy operations (such as network IO and serialization) take too much time, the program
// should print a warning log, and if necessary, the timeout should be exposed externally through
// monitor.
changefeedLogsWarnDuration = 1 * time.Second
schedulerLogsWarnDuration = 1 * time.Second
)

// InitMetrics registers all metrics used in owner
Expand All @@ -83,4 +108,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ownershipCounter)
registry.MustRegister(ownerMaintainTableNumGauge)
registry.MustRegister(changefeedStatusGauge)
registry.MustRegister(changefeedTickDuration)
registry.MustRegister(changefeedCloseDuration)
}
21 changes: 17 additions & 4 deletions cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -36,6 +37,7 @@ const (
commandTpUnknow commandTp = iota //nolint:varcheck,deadcode
commandTpClose
commandTpWriteDebugInfo
processorLogsWarnDuration = 1 * time.Second
)

type command struct {
Expand All @@ -53,16 +55,19 @@ type Manager struct {
newProcessor func(cdcContext.Context) *processor

enableNewScheduler bool

metricProcessorCloseDuration prometheus.Observer
}

// NewManager creates a new processor manager
func NewManager() *Manager {
conf := config.GetGlobalServerConfig()
return &Manager{
processors: make(map[model.ChangeFeedID]*processor),
commandQueue: make(chan *command, 4),
newProcessor: newProcessor,
enableNewScheduler: conf.Debug.EnableNewScheduler,
processors: make(map[model.ChangeFeedID]*processor),
commandQueue: make(chan *command, 4),
newProcessor: newProcessor,
enableNewScheduler: conf.Debug.EnableNewScheduler,
metricProcessorCloseDuration: processorCloseDuration.WithLabelValues(conf.AdvertiseAddr),
}
}

Expand Down Expand Up @@ -129,7 +134,15 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState)

func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) {
if processor, exist := m.processors[changefeedID]; exist {
startTime := time.Now()
captureID := processor.captureInfo.ID
err := processor.Close()
costTime := time.Since(startTime)
if costTime > processorLogsWarnDuration {
log.Warn("processor close took too long", zap.String("changefeed", changefeedID),
zap.String("capture", captureID), zap.Duration("duration", costTime))
}
m.metricProcessorCloseDuration.Observe(costTime.Seconds())
if err != nil {
log.Warn("failed to close processor",
zap.String("changefeed", changefeedID),
Expand Down
18 changes: 18 additions & 0 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,22 @@ var (
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed", "capture"})
processorTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_tick_duration",
Help: "Bucketed histogram of processorManager tick processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed", "capture"})
processorCloseDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_close_duration",
Help: "Bucketed histogram of processorManager close processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"capture"})
)

// InitMetrics registers all metrics used in processor
Expand All @@ -94,4 +110,6 @@ func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(syncTableNumGauge)
registry.MustRegister(processorErrorCounter)
registry.MustRegister(processorSchemaStorageGcTsGauge)
registry.MustRegister(processorTickDuration)
registry.MustRegister(processorCloseDuration)
}
12 changes: 12 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type processor struct {
metricSyncTableNumGauge prometheus.Gauge
metricSchemaStorageGcTsGauge prometheus.Gauge
metricProcessorErrorCounter prometheus.Counter
metricProcessorTickDuration prometheus.Observer
}

// checkReadyForMessages checks whether all necessary Etcd keys have been established.
Expand Down Expand Up @@ -246,6 +247,7 @@ func newProcessor(ctx cdcContext.Context) *processor {
metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr),
metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr),
metricProcessorTickDuration: processorTickDuration.WithLabelValues(changefeedID, advertiseAddr),
}
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
Expand Down Expand Up @@ -280,13 +282,22 @@ func isProcessorIgnorableError(err error) bool {
// the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd
// The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc.
func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) (orchestrator.ReactorState, error) {
startTime := time.Now()
p.changefeed = state
state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID)
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
ID: state.ID,
Info: state.Info,
})
_, err := p.tick(ctx, state)

costTime := time.Since(startTime)
if costTime > processorLogsWarnDuration {
log.Warn("processor tick took too long", zap.String("changefeed", p.changefeedID),
zap.String("capture", ctx.GlobalVars().CaptureInfo.ID), zap.Duration("duration", costTime))
}
p.metricProcessorTickDuration.Observe(costTime.Seconds())

if err == nil {
return state, nil
}
Expand Down Expand Up @@ -1058,6 +1069,7 @@ func (p *processor) Close() error {
syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
processorTickDuration.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr)
if p.sinkManager != nil {
// pass a canceled context is ok here, since we don't need to wait Close
ctx, cancel := context.WithCancel(context.Background())
Expand Down
1 change: 1 addition & 0 deletions cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Sink interface {

// TryEmitRowChangedEvents is thread-safety and non-blocking.
TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error)

// EmitDDLEvent sends DDL Event to Sink
// EmitDDLEvent should execute DDL to downstream synchronously
//
Expand Down
8 changes: 0 additions & 8 deletions pkg/orchestrator/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,3 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map
}
return nil
}

// MultiDataPatch represents an update to many keys
type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error

// Patch implements the DataPatch interface
func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error {
return m(valueMap, changedSet)
}

0 comments on commit c4f8055

Please sign in to comment.