Skip to content

Commit

Permalink
sinkv2(ticdc): add mq worker flush duration metric
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Sep 29, 2022
1 parent f249c6d commit 1c06141
Show file tree
Hide file tree
Showing 11 changed files with 152 additions and 87 deletions.
2 changes: 1 addition & 1 deletion cdc/sinkv2/ddlsink/mq/ddlproducer/kafka_ddl_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
collector "github.com/pingcap/tiflow/cdc/sinkv2/metrics/kafka"
collector "github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq/kafka"
cerror "github.com/pingcap/tiflow/pkg/errors"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
Expand Down
2 changes: 1 addition & 1 deletion cdc/sinkv2/eventsink/mq/dmlproducer/kafka_dml_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
collector "github.com/pingcap/tiflow/cdc/sinkv2/metrics/kafka"
collector "github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq/kafka"
cerror "github.com/pingcap/tiflow/pkg/errors"
pkafka "github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/util"
Expand Down
21 changes: 15 additions & 6 deletions cdc/sinkv2/eventsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -50,6 +52,9 @@ type worker struct {
encoder codec.EventBatchEncoder
// producer is used to send the messages to the Kafka broker.
producer dmlproducer.DMLProducer
// metricMQWorkerFlushDuration is the metric of the flush duration.
// We record the flush duration for each batch.
metricMQWorkerFlushDuration prometheus.Observer
// statistics is used to record DML metrics.
statistics *metrics.Statistics
}
Expand All @@ -62,12 +67,13 @@ func newWorker(
statistics *metrics.Statistics,
) *worker {
w := &worker{
changeFeedID: id,
msgChan: chann.New[mqEvent](),
ticker: time.NewTicker(mqv1.FlushInterval),
encoder: encoder,
producer: producer,
statistics: statistics,
changeFeedID: id,
msgChan: chann.New[mqEvent](),
ticker: time.NewTicker(mqv1.FlushInterval),
encoder: encoder,
producer: producer,
metricMQWorkerFlushDuration: mq.WorkerFlushDuration.WithLabelValues(id.Namespace, id.ID),
statistics: statistics,
}

return w
Expand All @@ -87,6 +93,7 @@ func (w *worker) run(ctx context.Context) (retErr error) {
// Fixed size of the batch.
eventsBuf := make([]mqEvent, mqv1.FlushBatchSize)
for {
start := time.Now()
endIndex, err := w.batch(ctx, eventsBuf)
if err != nil {
return errors.Trace(err)
Expand All @@ -103,6 +110,8 @@ func (w *worker) run(ctx context.Context) (retErr error) {
if err != nil {
return errors.Trace(err)
}
duration := time.Since(start)
w.metricMQWorkerFlushDuration.Observe(duration.Seconds())
}
}

Expand Down
5 changes: 3 additions & 2 deletions cdc/sinkv2/eventsink/txn/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/quotes"
Expand Down Expand Up @@ -100,8 +101,8 @@ func NewMySQLBackends(
dmlMaxRetry: defaultDMLMaxRetry,
statistics: statistics,

metricTxnSinkDMLBatchCommit: metrics.TxnSinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCallback: metrics.TxnSinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCommit: txn.SinkDMLBatchCommit.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnSinkDMLBatchCallback: txn.SinkDMLBatchCallback.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
})
}

Expand Down
10 changes: 5 additions & 5 deletions cdc/sinkv2/eventsink/txn/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -73,10 +73,10 @@ func newWorker(ctx context.Context, ID int, backend backend, errCh chan<- error,
backend: backend,
errCh: errCh,

metricConflictDetectDuration: metrics.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: metrics.TxnWorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: metrics.TxnWorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerHandledRows: metrics.TxnWorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),
metricConflictDetectDuration: txn.ConflictDetectDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerFlushDuration: txn.WorkerFlushDuration.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerBusyRatio: txn.WorkerBusyRatio.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTxnWorkerHandledRows: txn.WorkerHandledRows.WithLabelValues(changefeedID.Namespace, changefeedID.ID, wid),

flushInterval: backend.MaxFlushInterval(),
hasPending: false,
Expand Down
72 changes: 5 additions & 67 deletions cdc/sinkv2/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,69 +14,14 @@
package metrics

import (
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/kafka"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/prometheus/client_golang/prometheus"
)

// rowSizeLowBound is set to 128K, only track data event with size not smaller than it.
const largeRowSizeLowBound = 128 * 1024

// ---------- Metrics for txn sink and backends. ---------- //
var (
// ConflictDetectDuration records the duration of detecting conflict.
ConflictDetectDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_conflict_detect_duration",
Help: "Bucketed histogram of conflict detect time (s) for single DML statement.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~1000s
}, []string{"namespace", "changefeed"})

TxnWorkerFlushDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~1000s
}, []string{"namespace", "changefeed"})

TxnWorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})

TxnWorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_handled_rows",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed", "id"})

TxnSinkDMLBatchCommit = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_sink_dml_batch_commit",
Help: "Duration of committing a DML batch",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18), // 10ms~1000s
}, []string{"namespace", "changefeed"})

TxnSinkDMLBatchCallback = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_sink_dml_batch_callback",
Help: "Duration of execuing a batch of callbacks",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18), // 10ms~1000s
}, []string{"namespace", "changefeed"})
)

// ---------- Metrics used in Statistics. ---------- //
var (
// ExecBatchHistogram records batch size of a txn.
Expand Down Expand Up @@ -119,20 +64,13 @@ var (
}, []string{"namespace", "changefeed", "type"}) // type is for `sinkType`
)

// InitMetrics registers all metrics in this file
// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(TxnWorkerFlushDuration)
registry.MustRegister(TxnWorkerBusyRatio)
registry.MustRegister(TxnWorkerHandledRows)
registry.MustRegister(TxnSinkDMLBatchCommit)
registry.MustRegister(TxnSinkDMLBatchCallback)

registry.MustRegister(ExecBatchHistogram)
registry.MustRegister(ExecDDLHistogram)
registry.MustRegister(LargeRowSizeHistogram)
registry.MustRegister(ExecutionErrorCounter)

// Register Kafka producer and broker metrics.
kafka.InitMetrics(registry)
txn.InitMetrics(registry)
mq.InitMetrics(registry)
}
File renamed without changes.
File renamed without changes.
35 changes: 35 additions & 0 deletions cdc/sinkv2/metrics/mq/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package mq

import (
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq/kafka"
"github.com/prometheus/client_golang/prometheus"
)

// WorkerFlushDuration records the duration of flushing a group messages.
var WorkerFlushDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "mq_worker_flush_duration",
Help: "Flush duration(s) for MQ worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~1000s
}, []string{"namespace", "changefeed"})

// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(WorkerFlushDuration)
kafka.InitMetrics(registry)
}
10 changes: 5 additions & 5 deletions cdc/sinkv2/metrics/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func NewStatistics(ctx context.Context, sinkType sink.Type) *Statistics {
changefeedID: contextutil.ChangefeedIDFromCtx(ctx),
}

namespace := statistics.changefeedID.Namespace
namespcae := statistics.changefeedID.Namespace
changefeedID := statistics.changefeedID.ID
s := sinkType.String()
statistics.metricExecDDLHis = ExecDDLHistogram.WithLabelValues(namespace, changefeedID, s)
statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespace, changefeedID, s)
statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespace, changefeedID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespace, changefeedID, s)
statistics.metricExecDDLHis = ExecDDLHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecBatchHis = ExecBatchHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricRowSizeHis = LargeRowSizeHistogram.WithLabelValues(namespcae, changefeedID, s)
statistics.metricExecErrCnt = ExecutionErrorCounter.WithLabelValues(namespcae, changefeedID, s)
return statistics
}

Expand Down
82 changes: 82 additions & 0 deletions cdc/sinkv2/metrics/txn/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package txn

import "github.com/prometheus/client_golang/prometheus"

// ---------- Metrics for txn sink and backends. ---------- //
var (
// ConflictDetectDuration records the duration of detecting conflict.
ConflictDetectDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_conflict_detect_duration",
Help: "Bucketed histogram of conflict detect time (s) for single DML statement.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~1000s
}, []string{"namespace", "changefeed"})

WorkerFlushDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_flush_duration",
Help: "Flush duration (s) for txn worker.",
Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), // 1ms~1000s
}, []string{"namespace", "changefeed"})

WorkerBusyRatio = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_busy_ratio",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed"})

WorkerHandledRows = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_worker_handled_rows",
Help: "Busy ratio (X ms in 1s) for all workers.",
}, []string{"namespace", "changefeed", "id"})

SinkDMLBatchCommit = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_sink_dml_batch_commit",
Help: "Duration of committing a DML batch",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18), // 10ms~1000s
}, []string{"namespace", "changefeed"})

SinkDMLBatchCallback = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sinkv2",
Name: "txn_sink_dml_batch_callback",
Help: "Duration of execuing a batch of callbacks",
Buckets: prometheus.ExponentialBuckets(0.01, 2, 18), // 10ms~1000s
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(ConflictDetectDuration)
registry.MustRegister(WorkerFlushDuration)
registry.MustRegister(WorkerBusyRatio)
registry.MustRegister(WorkerHandledRows)
registry.MustRegister(SinkDMLBatchCommit)
registry.MustRegister(SinkDMLBatchCallback)
}

0 comments on commit 1c06141

Please sign in to comment.