Skip to content

Commit

Permalink
sink(ticdc): Add metrics for cloud storage sink (#7735)
Browse files Browse the repository at this point in the history
ref #6797
  • Loading branch information
zhaoxinyu authored Nov 29, 2022
1 parent 92523b2 commit 5cbc97f
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 29 deletions.
1 change: 1 addition & 0 deletions cdc/sink/codec/csv/csv_encoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func (b *BatchEncoder) Build() (messages []*common.Message) {
}
b.valueBuf.Reset()
b.callbackBuf = make([]func(), 0)
b.batchSize = 0
}
return []*common.Message{ret}
}
Expand Down
25 changes: 16 additions & 9 deletions cdc/sinkv2/eventsink/cloudstorage/cloud_storage_dml_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec/builder"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state"
"github.com/pingcap/tiflow/cdc/sinkv2/util"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
)

Expand All @@ -39,7 +41,7 @@ const (
)

// Assert EventSink[E event.TableEvent] implementation
var _ eventsink.EventSink[*model.SingleTableTxn] = (*sink)(nil)
var _ eventsink.EventSink[*model.SingleTableTxn] = (*dmlSink)(nil)

// versionedTable is used to wrap TableName with a version
type versionedTable struct {
Expand All @@ -62,9 +64,9 @@ type eventFragment struct {
encodedMsgs []*common.Message
}

// sink is the cloud storage sink.
// dmlSink is the cloud storage sink.
// It will send the events to cloud storage systems.
type sink struct {
type dmlSink struct {
// msgCh is a channel to hold eventFragment.
msgCh chan eventFragment
// encodingWorkers defines a group of workers for encoding events.
Expand All @@ -73,7 +75,8 @@ type sink struct {
defragmenter *defragmenter
// writer is a dmlWriter which manages a group of dmlWorkers and
// sends encoded messages to individual dmlWorkers.
writer *dmlWriter
writer *dmlWriter
statistics *metrics.Statistics
// last sequence number
lastSeqNum uint64
}
Expand All @@ -83,8 +86,8 @@ func NewCloudStorageSink(ctx context.Context,
sinkURI *url.URL,
replicaConfig *config.ReplicaConfig,
errCh chan error,
) (*sink, error) {
s := &sink{}
) (*dmlSink, error) {
s := &dmlSink{}
// create cloud storage config and then apply the params of sinkURI to it.
cfg := cloudstorage.NewConfig()
err := cfg.Apply(ctx, sinkURI, replicaConfig)
Expand Down Expand Up @@ -130,7 +133,8 @@ func NewCloudStorageSink(ctx context.Context,
s.msgCh = make(chan eventFragment, defaultChannelSize)
s.defragmenter = newDefragmenter(ctx)
orderedCh := s.defragmenter.orderedOut()
s.writer = newDMLWriter(ctx, changefeedID, storage, cfg, ext, orderedCh, errCh)
s.statistics = metrics.NewStatistics(ctx, sink.TxnSink)
s.writer = newDMLWriter(ctx, changefeedID, storage, cfg, ext, s.statistics, orderedCh, errCh)

// create a group of encoding workers.
for i := 0; i < defaultEncodingConcurrency; i++ {
Expand All @@ -144,7 +148,7 @@ func NewCloudStorageSink(ctx context.Context,
}

// WriteEvents write events to cloud storage sink.
func (s *sink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.SingleTableTxn]) error {
func (s *dmlSink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.SingleTableTxn]) error {
var tbl versionedTable

for _, txn := range txns {
Expand Down Expand Up @@ -172,11 +176,14 @@ func (s *sink) WriteEvents(txns ...*eventsink.CallbackableEvent[*model.SingleTab
}

// Close closes the cloud storage sink.
func (s *sink) Close() error {
func (s *dmlSink) Close() error {
s.defragmenter.close()
for _, w := range s.encodingWorkers {
w.close()
}
s.writer.close()
if s.statistics != nil {
s.statistics.Close()
}
return nil
}
54 changes: 37 additions & 17 deletions cdc/sinkv2/eventsink/cloudstorage/dml_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
mcloudstorage "github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

Expand All @@ -45,11 +48,14 @@ type dmlWorker struct {
// fileIndex maintains a mapping of <table, index number>.
fileIndex map[versionedTable]uint64
// fileSize maintains a mapping of <table, file size>.
fileSize map[versionedTable]uint64
wg sync.WaitGroup
isClosed uint64
errCh chan<- error
extension string
fileSize map[versionedTable]uint64
wg sync.WaitGroup
isClosed uint64
errCh chan<- error
extension string
statistics *metrics.Statistics
metricWriteBytes prometheus.Gauge
metricFileCount prometheus.Gauge
}

type tableEventsMap struct {
Expand All @@ -74,19 +80,23 @@ func newDMLWorker(
storage storage.ExternalStorage,
config *cloudstorage.Config,
extension string,
statistics *metrics.Statistics,
errCh chan<- error,
) *dmlWorker {
d := &dmlWorker{
id: id,
changeFeedID: changefeedID,
storage: storage,
config: config,
flushNotifyCh: make(chan flushTask, 1),
tableEvents: newTableEventsMap(),
fileIndex: make(map[versionedTable]uint64),
fileSize: make(map[versionedTable]uint64),
extension: extension,
errCh: errCh,
id: id,
changeFeedID: changefeedID,
storage: storage,
config: config,
tableEvents: newTableEventsMap(),
flushNotifyCh: make(chan flushTask, 1),
fileIndex: make(map[versionedTable]uint64),
fileSize: make(map[versionedTable]uint64),
extension: extension,
errCh: errCh,
statistics: statistics,
metricWriteBytes: mcloudstorage.CloudStorageWriteBytesGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricFileCount: mcloudstorage.CloudStorageFileCountGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID),
}

return d
Expand Down Expand Up @@ -129,9 +139,13 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
continue
}

rowsCnt := 0
for _, frag := range events {
msgs := frag.encodedMsgs
d.statistics.ObserveRows(frag.event.Event.Rows...)
for _, msg := range msgs {
d.metricWriteBytes.Add(float64(len(msg.Value)))
rowsCnt += msg.GetRowsCount()
buf.Write(msg.Value)
callbacks = append(callbacks, msg.Callback)
}
Expand All @@ -156,11 +170,17 @@ func (d *dmlWorker) backgroundFlushMsgs(ctx context.Context) {
}

path := d.generateDataFilePath(table)
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
if err := d.statistics.RecordBatchExecution(func() (int, error) {
err := d.storage.WriteFile(ctx, path, buf.Bytes())
if err != nil {
return 0, err
}
return rowsCnt, nil
}); err != nil {
d.errCh <- err
return
}
d.metricFileCount.Add(1)

for _, cb := range callbacks {
if cb != nil {
Expand Down
5 changes: 4 additions & 1 deletion cdc/sinkv2/eventsink/cloudstorage/dml_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@ import (
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink/codec/common"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
"github.com/stretchr/testify/require"
)
Expand All @@ -47,8 +49,9 @@ func testDMLWorker(ctx context.Context, t *testing.T, dir string) *dmlWorker {
err = cfg.Apply(context.TODO(), sinkURI, config.GetDefaultReplicaConfig())
require.Nil(t, err)

statistics := metrics.NewStatistics(ctx, sink.TxnSink)
d := newDMLWorker(1, model.DefaultChangeFeedID("dml-worker-test"), storage,
cfg, ".json", errCh)
cfg, ".json", statistics, errCh)
return d
}

Expand Down
4 changes: 3 additions & 1 deletion cdc/sinkv2/eventsink/cloudstorage/dml_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics"
"github.com/pingcap/tiflow/pkg/chann"
"github.com/pingcap/tiflow/pkg/hash"
"github.com/pingcap/tiflow/pkg/sink/cloudstorage"
Expand All @@ -42,6 +43,7 @@ func newDMLWriter(ctx context.Context,
storage storage.ExternalStorage,
config *cloudstorage.Config,
extension string,
statistics *metrics.Statistics,
inputCh <-chan eventFragment,
errCh chan<- error,
) *dmlWriter {
Expand All @@ -62,7 +64,7 @@ func newDMLWriter(ctx context.Context,
}()

for i := 0; i < config.WorkerCount; i++ {
d := newDMLWorker(i, changefeedID, storage, w.config, extension, errCh)
d := newDMLWorker(i, changefeedID, storage, w.config, extension, statistics, errCh)
w.workerChannels[i] = chann.New[eventFragment]()
d.run(ctx, w.workerChannels[i])
w.workers = append(w.workers, d)
Expand Down
45 changes: 45 additions & 0 deletions cdc/sinkv2/metrics/cloudstorage/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package cloudstorage

import "github.com/prometheus/client_golang/prometheus"

const (
namespace = "ticdc"
subsystem = "sinkv2"
)

// Metrics for cloud storage sink
var (
// CloudStorageWriteBytesGauge records the total number of bytes written to cloud storage.
CloudStorageWriteBytesGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_write_bytes_total",
Help: "Total number of bytes written to cloud storage",
}, []string{"namespace", "changefeed"})

// CloudStorageFileCountGauge records the number of files generated by cloud storage sink.
CloudStorageFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "cloud_storage_file_count",
Help: "Total number of files managed by a cloud storage sink",
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file.
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(CloudStorageWriteBytesGauge)
registry.MustRegister(CloudStorageFileCountGauge)
}
2 changes: 2 additions & 0 deletions cdc/sinkv2/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package metrics

import (
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/cloudstorage"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/mq"
"github.com/pingcap/tiflow/cdc/sinkv2/metrics/txn"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -73,4 +74,5 @@ func InitMetrics(registry *prometheus.Registry) {

txn.InitMetrics(registry)
mq.InitMetrics(registry)
cloudstorage.InitMetrics(registry)
}
Loading

0 comments on commit 5cbc97f

Please sign in to comment.