From c4f8055178bfd35e728f145a9c27805b130441af Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Tue, 8 Feb 2022 13:25:35 +0800 Subject: [PATCH] =?UTF-8?q?metrics(ticdc):=20add=20some=20log=20and=20metr?= =?UTF-8?q?ics=20to=20owner=20and=20processorManage=E2=80=A6=20(#4402)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit close pingcap/tiflow#3884 --- cdc/owner/changefeed.go | 33 ++++++++++++++++++++++++++++++++- cdc/owner/metrics.go | 29 ++++++++++++++++++++++++++++- cdc/processor/manager.go | 21 +++++++++++++++++---- cdc/processor/metrics.go | 18 ++++++++++++++++++ cdc/processor/processor.go | 12 ++++++++++++ cdc/sink/sink.go | 1 + pkg/orchestrator/interfaces.go | 8 -------- 7 files changed, 108 insertions(+), 14 deletions(-) diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 5cead7f9937..c719d761ce4 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -17,6 +17,7 @@ import ( "context" "strings" "sync" + "time" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -72,6 +73,7 @@ type changefeed struct { metricsChangefeedCheckpointTsLagGauge prometheus.Gauge metricsChangefeedResolvedTsGauge prometheus.Gauge metricsChangefeedResolvedTsLagGauge prometheus.Gauge + metricChangefeedTickDuration prometheus.Observer newDDLPuller func(ctx cdcContext.Context, startTs uint64) (DDLPuller, error) newSink func() DDLSink @@ -109,12 +111,25 @@ func newChangefeed4Test( } func (c *changefeed) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo) { + startTime := time.Now() + ctx = cdcContext.WithErrorHandler(ctx, func(err error) error { c.errCh <- errors.Trace(err) return nil }) state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID) - if err := c.tick(ctx, state, captures); err != nil { + err := c.tick(ctx, state, captures) + + // The tick duration is recorded only if changefeed has completed initialization + if c.initialized { + costTime := time.Since(startTime) + if costTime > changefeedLogsWarnDuration { + log.Warn("changefeed tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime)) + } + c.metricChangefeedTickDuration.Observe(costTime.Seconds()) + } + + if err != nil { log.Error("an error occurred in Owner", zap.String("changefeed", c.state.ID), zap.Error(err)) var code string if rfcCode, ok := cerror.RFCCode(err); ok { @@ -185,7 +200,12 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed // So we return here. return nil } + startTime := time.Now() newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(ctx, c.state, c.schema.AllPhysicalTables(), captures) + costTime := time.Since(startTime) + if costTime > schedulerLogsWarnDuration { + log.Warn("scheduler tick took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime)) + } if err != nil { return errors.Trace(err) } @@ -296,6 +316,7 @@ LOOP: c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.WithLabelValues(c.id) c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.WithLabelValues(c.id) c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.WithLabelValues(c.id) + c.metricChangefeedTickDuration = changefeedTickDuration.WithLabelValues(c.id) // create scheduler c.scheduler, err = c.newScheduler(ctx, checkpointTs) @@ -338,6 +359,9 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) { c.metricsChangefeedResolvedTsGauge = nil c.metricsChangefeedResolvedTsLagGauge = nil + changefeedTickDuration.DeleteLabelValues(c.id) + c.metricChangefeedTickDuration = nil + c.initialized = false } @@ -548,7 +572,14 @@ func (c *changefeed) updateStatus(currentTs int64, checkpointTs, resolvedTs mode } func (c *changefeed) Close(ctx cdcContext.Context) { + startTime := time.Now() + c.releaseResources(ctx) + costTime := time.Since(startTime) + if costTime > changefeedLogsWarnDuration { + log.Warn("changefeed close took too long", zap.String("changefeed", c.id), zap.Duration("duration", costTime)) + } + changefeedCloseDuration.Observe(costTime.Seconds()) } func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider { diff --git a/cdc/owner/metrics.go b/cdc/owner/metrics.go index bc1bff8d0aa..fa29fec9e84 100644 --- a/cdc/owner/metrics.go +++ b/cdc/owner/metrics.go @@ -13,7 +13,11 @@ package owner -import "github.com/prometheus/client_golang/prometheus" +import ( + "time" + + "github.com/prometheus/client_golang/prometheus" +) var ( changefeedCheckpointTsGauge = prometheus.NewGaugeVec( @@ -65,6 +69,22 @@ var ( Name: "status", Help: "The status of changefeeds", }, []string{"changefeed"}) + changefeedTickDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_tick_duration", + Help: "Bucketed histogram of owner tick changefeed reactor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"changefeed"}) + changefeedCloseDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "owner", + Name: "changefeed_close_duration", + Help: "Bucketed histogram of owner close changefeed reactor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }) ) const ( @@ -72,6 +92,11 @@ const ( maintainTableTypeTotal string = "total" // tables that are dispatched to a processor and have not been finished yet maintainTableTypeWip string = "wip" + // When heavy operations (such as network IO and serialization) take too much time, the program + // should print a warning log, and if necessary, the timeout should be exposed externally through + // monitor. + changefeedLogsWarnDuration = 1 * time.Second + schedulerLogsWarnDuration = 1 * time.Second ) // InitMetrics registers all metrics used in owner @@ -83,4 +108,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(ownershipCounter) registry.MustRegister(ownerMaintainTableNumGauge) registry.MustRegister(changefeedStatusGauge) + registry.MustRegister(changefeedTickDuration) + registry.MustRegister(changefeedCloseDuration) } diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index 935e484bb0e..e386f6cc995 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -27,6 +27,7 @@ import ( cdcContext "github.com/pingcap/tiflow/pkg/context" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/orchestrator" + "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -36,6 +37,7 @@ const ( commandTpUnknow commandTp = iota //nolint:varcheck,deadcode commandTpClose commandTpWriteDebugInfo + processorLogsWarnDuration = 1 * time.Second ) type command struct { @@ -53,16 +55,19 @@ type Manager struct { newProcessor func(cdcContext.Context) *processor enableNewScheduler bool + + metricProcessorCloseDuration prometheus.Observer } // NewManager creates a new processor manager func NewManager() *Manager { conf := config.GetGlobalServerConfig() return &Manager{ - processors: make(map[model.ChangeFeedID]*processor), - commandQueue: make(chan *command, 4), - newProcessor: newProcessor, - enableNewScheduler: conf.Debug.EnableNewScheduler, + processors: make(map[model.ChangeFeedID]*processor), + commandQueue: make(chan *command, 4), + newProcessor: newProcessor, + enableNewScheduler: conf.Debug.EnableNewScheduler, + metricProcessorCloseDuration: processorCloseDuration.WithLabelValues(conf.AdvertiseAddr), } } @@ -129,7 +134,15 @@ func (m *Manager) Tick(stdCtx context.Context, state orchestrator.ReactorState) func (m *Manager) closeProcessor(changefeedID model.ChangeFeedID) { if processor, exist := m.processors[changefeedID]; exist { + startTime := time.Now() + captureID := processor.captureInfo.ID err := processor.Close() + costTime := time.Since(startTime) + if costTime > processorLogsWarnDuration { + log.Warn("processor close took too long", zap.String("changefeed", changefeedID), + zap.String("capture", captureID), zap.Duration("duration", costTime)) + } + m.metricProcessorCloseDuration.Observe(costTime.Seconds()) if err != nil { log.Warn("failed to close processor", zap.String("changefeed", changefeedID), diff --git a/cdc/processor/metrics.go b/cdc/processor/metrics.go index 5c52b0ccba6..9875929b8d0 100644 --- a/cdc/processor/metrics.go +++ b/cdc/processor/metrics.go @@ -81,6 +81,22 @@ var ( Name: "schema_storage_gc_ts", Help: "the TS of the currently maintained oldest snapshot in SchemaStorage", }, []string{"changefeed", "capture"}) + processorTickDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "processor", + Name: "processor_tick_duration", + Help: "Bucketed histogram of processorManager tick processor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"changefeed", "capture"}) + processorCloseDuration = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "processor", + Name: "processor_close_duration", + Help: "Bucketed histogram of processorManager close processor time (s).", + Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18), + }, []string{"capture"}) ) // InitMetrics registers all metrics used in processor @@ -94,4 +110,6 @@ func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(syncTableNumGauge) registry.MustRegister(processorErrorCounter) registry.MustRegister(processorSchemaStorageGcTsGauge) + registry.MustRegister(processorTickDuration) + registry.MustRegister(processorCloseDuration) } diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index b66f7831089..53199727e5b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -92,6 +92,7 @@ type processor struct { metricSyncTableNumGauge prometheus.Gauge metricSchemaStorageGcTsGauge prometheus.Gauge metricProcessorErrorCounter prometheus.Counter + metricProcessorTickDuration prometheus.Observer } // checkReadyForMessages checks whether all necessary Etcd keys have been established. @@ -246,6 +247,7 @@ func newProcessor(ctx cdcContext.Context) *processor { metricSyncTableNumGauge: syncTableNumGauge.WithLabelValues(changefeedID, advertiseAddr), metricProcessorErrorCounter: processorErrorCounter.WithLabelValues(changefeedID, advertiseAddr), metricSchemaStorageGcTsGauge: processorSchemaStorageGcTsGauge.WithLabelValues(changefeedID, advertiseAddr), + metricProcessorTickDuration: processorTickDuration.WithLabelValues(changefeedID, advertiseAddr), } p.createTablePipeline = p.createTablePipelineImpl p.lazyInit = p.lazyInitImpl @@ -280,6 +282,7 @@ func isProcessorIgnorableError(err error) bool { // the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd // The main logic of processor is in this function, including the calculation of many kinds of ts, maintain table pipeline, error handling, etc. func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedReactorState) (orchestrator.ReactorState, error) { + startTime := time.Now() p.changefeed = state state.CheckCaptureAlive(ctx.GlobalVars().CaptureInfo.ID) ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{ @@ -287,6 +290,14 @@ func (p *processor) Tick(ctx cdcContext.Context, state *orchestrator.ChangefeedR Info: state.Info, }) _, err := p.tick(ctx, state) + + costTime := time.Since(startTime) + if costTime > processorLogsWarnDuration { + log.Warn("processor tick took too long", zap.String("changefeed", p.changefeedID), + zap.String("capture", ctx.GlobalVars().CaptureInfo.ID), zap.Duration("duration", costTime)) + } + p.metricProcessorTickDuration.Observe(costTime.Seconds()) + if err == nil { return state, nil } @@ -1058,6 +1069,7 @@ func (p *processor) Close() error { syncTableNumGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) processorErrorCounter.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) processorSchemaStorageGcTsGauge.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) + processorTickDuration.DeleteLabelValues(p.changefeedID, p.captureInfo.AdvertiseAddr) if p.sinkManager != nil { // pass a canceled context is ok here, since we don't need to wait Close ctx, cancel := context.WithCancel(context.Background()) diff --git a/cdc/sink/sink.go b/cdc/sink/sink.go index e1254cc5ee1..c557c6e8a46 100644 --- a/cdc/sink/sink.go +++ b/cdc/sink/sink.go @@ -41,6 +41,7 @@ type Sink interface { // TryEmitRowChangedEvents is thread-safety and non-blocking. TryEmitRowChangedEvents(ctx context.Context, rows ...*model.RowChangedEvent) (bool, error) + // EmitDDLEvent sends DDL Event to Sink // EmitDDLEvent should execute DDL to downstream synchronously // diff --git a/pkg/orchestrator/interfaces.go b/pkg/orchestrator/interfaces.go index 5e74fab592b..20452cd6f26 100644 --- a/pkg/orchestrator/interfaces.go +++ b/pkg/orchestrator/interfaces.go @@ -67,11 +67,3 @@ func (s *SingleDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map } return nil } - -// MultiDataPatch represents an update to many keys -type MultiDataPatch func(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error - -// Patch implements the DataPatch interface -func (m MultiDataPatch) Patch(valueMap map[util.EtcdKey][]byte, changedSet map[util.EtcdKey]struct{}) error { - return m(valueMap, changedSet) -}