Skip to content

Commit

Permalink
kafka(ticdc): make kafka metrics more robust (#11533)
Browse files Browse the repository at this point in the history
close #11532
  • Loading branch information
3AceShowHand authored Aug 30, 2024
1 parent 6f697c4 commit 8306633
Show file tree
Hide file tree
Showing 20 changed files with 328 additions and 344 deletions.
8 changes: 4 additions & 4 deletions cdc/entry/mounter_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,10 @@ func NewMounterGroup(
}

func (m *mounterGroup) Run(ctx context.Context, _ ...chan<- error) error {
inputChanSize := mounterGroupInputChanSizeGauge.WithLabelValues(m.changefeedID.Namespace, m.changefeedID.ID)
ticker := time.NewTicker(defaultMetricInterval)
defer func() {
ticker.Stop()
mounterGroupInputChanSizeGauge.DeleteLabelValues(m.changefeedID.Namespace, m.changefeedID.ID)
}()
g, ctx := errgroup.WithContext(ctx)
Expand All @@ -88,15 +91,12 @@ func (m *mounterGroup) Run(ctx context.Context, _ ...chan<- error) error {
})
}
g.Go(func() error {
metrics := mounterGroupInputChanSizeGauge.WithLabelValues(m.changefeedID.Namespace, m.changefeedID.ID)
ticker := time.NewTicker(defaultMetricInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
metrics.Set(float64(len(m.inputCh)))
inputChanSize.Set(float64(len(m.inputCh)))
}
}
})
Expand Down
6 changes: 5 additions & 1 deletion cdc/processor/memquota/mem_quota.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ func NewMemQuota(changefeedID model.ChangeFeedID, totalBytes uint64, comp string
m.wg.Add(1)
go func() {
timer := time.NewTicker(3 * time.Second)
defer timer.Stop()
defer func() {
timer.Stop()
MemoryQuota.DeleteLabelValues(changefeedID.Namespace, changefeedID.ID, "total", comp)
MemoryQuota.DeleteLabelValues(changefeedID.Namespace, changefeedID.ID, "used", comp)
}()
for {
select {
case <-timer.C:
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/cloudstorage/cloud_storage_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func newDDLSink(ctx context.Context,
d := &DDLSink{
id: changefeedID,
storage: storage,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
statistics: metrics.NewStatistics(changefeedID, sink.TxnSink),
cfg: cfg,
lastSendCheckpointTsTime: time.Now(),
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/kafka_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewKafkaDDLSink(
}

ddlProducer := producerCreator(ctx, changefeedID, syncProducer)
s := newDDLSink(ctx, changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
s := newDDLSink(changefeedID, ddlProducer, adminClient, topicManager, eventRouter, encoderBuilder, protocol)
log.Info("DDL sink producer client created", zap.Duration("duration", time.Since(start)))
return s, nil
}
4 changes: 2 additions & 2 deletions cdc/sink/ddlsink/mq/mq_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type DDLSink struct {
admin kafka.ClusterAdminClient
}

func newDDLSink(ctx context.Context,
func newDDLSink(
changefeedID model.ChangeFeedID,
producer ddlproducer.DDLProducer,
adminClient kafka.ClusterAdminClient,
Expand All @@ -92,7 +92,7 @@ func newDDLSink(ctx context.Context,
topicManager: topicManager,
encoderBuilder: encoderBuilder,
producer: producer,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.RowSink),
statistics: metrics.NewStatistics(changefeedID, sink.RowSink),
admin: adminClient,
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mq/pulsar_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func NewPulsarDDLSink(
return nil, errors.Trace(err)
}

s := newDDLSink(ctx, changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)
s := newDDLSink(changefeedID, p, nil, topicManager, eventRouter, encoderBuilder, protocol)

return s, nil
}
2 changes: 1 addition & 1 deletion cdc/sink/ddlsink/mysql/mysql_ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewDDLSink(
id: changefeedID,
db: db,
cfg: cfg,
statistics: metrics.NewStatistics(ctx, changefeedID, sink.TxnSink),
statistics: metrics.NewStatistics(changefeedID, sink.TxnSink),
lastExecutedNormalDDLCache: lruCache,
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/dmlsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func NewDMLSink(ctx context.Context,
outputRawChangeEvent: replicaConfig.Sink.CloudStorageConfig.GetOutputRawChangeEvent(),
encodingWorkers: make([]*encodingWorker, defaultEncodingConcurrency),
workers: make([]*dmlWorker, cfg.WorkerCount),
statistics: metrics.NewStatistics(wgCtx, changefeedID, sink.TxnSink),
statistics: metrics.NewStatistics(changefeedID, sink.TxnSink),
cancel: wgCancel,
dead: make(chan struct{}),
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/sink/dmlsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
cfg.FileIndexWidth = 6
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, model.DefaultChangeFeedID("dml-worker-test"),
sink.TxnSink)
statistics := metrics.NewStatistics(model.DefaultChangeFeedID("dml-worker-test"), sink.TxnSink)
pdlock := pdutil.NewMonotonicClock(clock.New())
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", chann.NewAutoDrainChann[eventFragment](), pdlock, statistics)
Expand Down
5 changes: 1 addition & 4 deletions cdc/sink/dmlsink/mq/mq_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ import (
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/manager"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/transformer"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"go.uber.org/atomic"
Expand Down Expand Up @@ -89,8 +87,7 @@ func newDMLSink(
errCh chan error,
) *dmlSink {
ctx, cancel := context.WithCancelCause(ctx)
statistics := metrics.NewStatistics(ctx, changefeedID, sink.RowSink)
worker := newWorker(changefeedID, protocol, producer, encoderGroup, statistics)
worker := newWorker(changefeedID, protocol, producer, encoderGroup)

s := &dmlSink{
id: changefeedID,
Expand Down
55 changes: 21 additions & 34 deletions cdc/sink/dmlsink/mq/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -64,13 +64,6 @@ type worker struct {

// producer is used to send the messages to the Kafka broker.
producer dmlproducer.DMLProducer

// metricMQWorkerSendMessageDuration tracks the time duration cost on send messages.
metricMQWorkerSendMessageDuration prometheus.Observer
// metricMQWorkerBatchSize tracks each batch's size.
metricMQWorkerBatchSize prometheus.Observer
// metricMQWorkerBatchDuration tracks the time duration cost on batch messages.
metricMQWorkerBatchDuration prometheus.Observer
// statistics is used to record DML metrics.
statistics *metrics.Statistics
}
Expand All @@ -81,21 +74,16 @@ func newWorker(
protocol config.Protocol,
producer dmlproducer.DMLProducer,
encoderGroup codec.EncoderGroup,
statistics *metrics.Statistics,
) *worker {
w := &worker{
changeFeedID: id,
protocol: protocol,
msgChan: chann.NewAutoDrainChann[mqEvent](),
ticker: time.NewTicker(batchInterval),
encoderGroup: encoderGroup,
producer: producer,
metricMQWorkerSendMessageDuration: mq.WorkerSendMessageDuration.WithLabelValues(id.Namespace, id.ID),
metricMQWorkerBatchSize: mq.WorkerBatchSize.WithLabelValues(id.Namespace, id.ID),
metricMQWorkerBatchDuration: mq.WorkerBatchDuration.WithLabelValues(id.Namespace, id.ID),
statistics: statistics,
changeFeedID: id,
protocol: protocol,
msgChan: chann.NewAutoDrainChann[mqEvent](),
ticker: time.NewTicker(batchInterval),
encoderGroup: encoderGroup,
producer: producer,
statistics: metrics.NewStatistics(id, sink.RowSink),
}

return w
}

Expand Down Expand Up @@ -171,6 +159,13 @@ func (w *worker) batchEncodeRun(ctx context.Context) (retErr error) {
zap.String("protocol", w.protocol.String()),
)

metricBatchDuration := mq.WorkerBatchDuration.WithLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
metricBatchSize := mq.WorkerBatchSize.WithLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
defer func() {
mq.WorkerBatchDuration.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
mq.WorkerBatchSize.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
}()

msgsBuf := make([]mqEvent, batchSize)
for {
start := time.Now()
Expand All @@ -181,9 +176,8 @@ func (w *worker) batchEncodeRun(ctx context.Context) (retErr error) {
if msgCount == 0 {
continue
}

w.metricMQWorkerBatchSize.Observe(float64(msgCount))
w.metricMQWorkerBatchDuration.Observe(time.Since(start).Seconds())
metricBatchSize.Observe(float64(msgCount))
metricBatchDuration.Observe(time.Since(start).Seconds())

msgs := msgsBuf[:msgCount]
// Group messages by its TopicPartitionKey before adding them to the encoder group.
Expand Down Expand Up @@ -270,23 +264,15 @@ func (w *worker) group(
}

func (w *worker) sendMessages(ctx context.Context) error {
ticker := time.NewTicker(15 * time.Second)
metric := codec.EncoderGroupOutputChanSizeGauge.
WithLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
defer func() {
ticker.Stop()
codec.EncoderGroupOutputChanSizeGauge.
DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
}()
metricSendMessageDuration := mq.WorkerSendMessageDuration.WithLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
defer mq.WorkerSendMessageDuration.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)

var err error
outCh := w.encoderGroup.Output()
for {
select {
case <-ctx.Done():
return errors.Trace(ctx.Err())
case <-ticker.C:
metric.Set(float64(len(outCh)))
case future, ok := <-outCh:
if !ok {
log.Warn("MQ sink encoder's output channel closed",
Expand All @@ -312,7 +298,7 @@ func (w *worker) sendMessages(ctx context.Context) error {
}); err != nil {
return err
}
w.metricMQWorkerSendMessageDuration.Observe(time.Since(start).Seconds())
metricSendMessageDuration.Observe(time.Since(start).Seconds())
}
}
}
Expand All @@ -321,6 +307,7 @@ func (w *worker) sendMessages(ctx context.Context) error {
func (w *worker) close() {
w.msgChan.CloseAndDrain()
w.producer.Close()
w.statistics.Close()
mq.WorkerSendMessageDuration.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
mq.WorkerBatchSize.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
mq.WorkerBatchDuration.DeleteLabelValues(w.changeFeedID.Namespace, w.changeFeedID.ID)
Expand Down
32 changes: 12 additions & 20 deletions cdc/sink/dmlsink/mq/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,15 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/dmlsink"
"github.com/pingcap/tiflow/cdc/sink/dmlsink/mq/dmlproducer"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sink/tablesink/state"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/codec"
"github.com/pingcap/tiflow/pkg/sink/codec/builder"
"github.com/pingcap/tiflow/pkg/sink/codec/common"
"github.com/stretchr/testify/require"
)

func newBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlproducer.DMLProducer) {
func newBatchEncodeWorker(t *testing.T) (*worker, dmlproducer.DMLProducer) {
id := model.DefaultChangeFeedID("test")
// 200 is about the size of a rowEvent change.
encoderConfig := common.NewConfig(config.ProtocolOpen).WithMaxMessageBytes(200).WithChangefeedID(id)
Expand All @@ -43,14 +41,13 @@ func newBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlproduc
p := dmlproducer.NewDMLMockProducer(context.Background(), id, nil, nil, nil, nil)
require.NoError(t, err)
encoderConcurrency := 4
statistics := metrics.NewStatistics(ctx, id, sink.RowSink)
cfg := config.GetDefaultReplicaConfig()
cfg.Sink.EncoderConcurrency = &encoderConcurrency
encoderGroup := codec.NewEncoderGroup(cfg.Sink, encoderBuilder, id)
return newWorker(id, config.ProtocolOpen, p, encoderGroup, statistics), p
return newWorker(id, config.ProtocolOpen, p, encoderGroup), p
}

func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlproducer.DMLProducer) {
func newNonBatchEncodeWorker(t *testing.T) (*worker, dmlproducer.DMLProducer) {
id := model.DefaultChangeFeedID("test")
// 300 is about the size of a rowEvent change.
encoderConfig := common.NewConfig(config.ProtocolCanalJSON).WithMaxMessageBytes(300).WithChangefeedID(id)
Expand All @@ -59,11 +56,10 @@ func newNonBatchEncodeWorker(ctx context.Context, t *testing.T) (*worker, dmlpro
p := dmlproducer.NewDMLMockProducer(context.Background(), id, nil, nil, nil, nil)
require.NoError(t, err)
encoderConcurrency := 4
statistics := metrics.NewStatistics(ctx, id, sink.RowSink)
cfg := config.GetDefaultReplicaConfig()
cfg.Sink.EncoderConcurrency = &encoderConcurrency
encoderGroup := codec.NewEncoderGroup(cfg.Sink, encoderBuilder, id)
return newWorker(id, config.ProtocolOpen, p, encoderGroup, statistics), p
return newWorker(id, config.ProtocolOpen, p, encoderGroup), p
}

func TestNonBatchEncode_SendMessages(t *testing.T) {
Expand All @@ -77,7 +73,7 @@ func TestNonBatchEncode_SendMessages(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

worker, p := newNonBatchEncodeWorker(ctx, t)
worker, p := newNonBatchEncodeWorker(t)
defer worker.close()

key := model.TopicPartitionKey{
Expand Down Expand Up @@ -135,7 +131,7 @@ func TestBatchEncode_Batch(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, _ := newBatchEncodeWorker(ctx, t)
worker, _ := newBatchEncodeWorker(t)
defer worker.close()
key := model.TopicPartitionKey{
Topic: "test",
Expand Down Expand Up @@ -183,9 +179,7 @@ func TestBatchEncode_Group(t *testing.T) {
Topic: "test1",
Partition: 2,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, _ := newBatchEncodeWorker(ctx, t)
worker, _ := newBatchEncodeWorker(t)
defer worker.close()

tableStatus := state.TableSinkSinking
Expand Down Expand Up @@ -285,9 +279,7 @@ func TestBatchEncode_GroupWhenTableStopping(t *testing.T) {
Topic: "test",
Partition: 2,
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, _ := newBatchEncodeWorker(ctx, t)
worker, _ := newBatchEncodeWorker(t)
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping
Expand Down Expand Up @@ -359,7 +351,7 @@ func TestBatchEncode_SendMessages(t *testing.T) {
tableStatus := state.TableSinkSinking
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, p := newBatchEncodeWorker(ctx, t)
worker, p := newBatchEncodeWorker(t)
defer worker.close()

helper := entry.NewSchemaTestHelper(t)
Expand Down Expand Up @@ -477,7 +469,7 @@ func TestBatchEncodeWorker_Abort(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
worker, _ := newBatchEncodeWorker(ctx, t)
worker, _ := newBatchEncodeWorker(t)
defer worker.close()

var wg sync.WaitGroup
Expand All @@ -503,7 +495,7 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
worker, p := newNonBatchEncodeWorker(ctx, t)
worker, p := newNonBatchEncodeWorker(t)
defer worker.close()
replicatingStatus := state.TableSinkSinking
stoppedStatus := state.TableSinkStopping
Expand Down Expand Up @@ -573,7 +565,7 @@ func TestNonBatchEncode_SendMessagesWhenTableStopping(t *testing.T) {

func TestNonBatchEncodeWorker_Abort(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
worker, _ := newBatchEncodeWorker(ctx, t)
worker, _ := newBatchEncodeWorker(t)
defer worker.close()

var wg sync.WaitGroup
Expand Down
Loading

0 comments on commit 8306633

Please sign in to comment.