Skip to content

Commit

Permalink
leveldb: add message, config and metrics (#3272)
Browse files Browse the repository at this point in the history
  • Loading branch information
overvenus authored Nov 5, 2021
1 parent 28424ed commit 9a8eaf0
Show file tree
Hide file tree
Showing 19 changed files with 647 additions and 64 deletions.
4 changes: 4 additions & 0 deletions cdc/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline"
"github.com/pingcap/ticdc/cdc/puller"
"github.com/pingcap/ticdc/cdc/sink"
"github.com/pingcap/ticdc/cdc/sorter"
"github.com/pingcap/ticdc/cdc/sorter/leveldb"
"github.com/pingcap/ticdc/cdc/sorter/memory"
"github.com/pingcap/ticdc/cdc/sorter/unified"
"github.com/pingcap/ticdc/pkg/actor"
Expand All @@ -45,6 +47,8 @@ func init() {
initServerMetrics(registry)
actor.InitMetrics(registry)
// Sorter metrics
sorter.InitMetrics(registry)
memory.InitMetrics(registry)
unified.InitMetrics(registry)
leveldb.InitMetrics(registry)
}
56 changes: 56 additions & 0 deletions cdc/sorter/leveldb/message/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package message

import (
"fmt"

"github.com/pingcap/ticdc/cdc/sorter/encoding"
Iterator "github.com/syndtr/goleveldb/leveldb/iterator"
lutil "github.com/syndtr/goleveldb/leveldb/util"
"golang.org/x/sync/semaphore"
)

// Task is a leveldb actor task. It carries write and read request.
type Task struct {
UID uint32
TableID uint64

// encoded key -> serde.marshal(event)
// If a value is empty, it deletes the key/value entry in leveldb.
Events map[Key][]byte
// Must be buffered channel to avoid blocking.
IterCh chan LimitedIterator `json:"-"` // Make Task JSON printable.
Irange *lutil.Range
// Set NeedIter whenever caller wants to read something from an iterator.
NeedIter bool
}

// Key is the key that is written to leveldb.
type Key string

// String returns a pretty printed string.
func (k Key) String() string {
uid, tableID, startTs, CRTs := encoding.DecodeKey([]byte(k))
return fmt.Sprintf(
"uid: %d, tableID: %d, startTs: %d, CRTs: %d",
uid, tableID, startTs, CRTs)
}

// LimitedIterator is a wrapper of leveldb iterator that has a sema to limit
// the total number of open iterators.
type LimitedIterator struct {
Iterator.Iterator
Sema *semaphore.Weighted
}
37 changes: 37 additions & 0 deletions cdc/sorter/leveldb/message/task_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package message

import (
"testing"

"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sorter/encoding"
"github.com/stretchr/testify/require"
)

func TestPrint(t *testing.T) {
t.Parallel()
event := model.NewPolymorphicEvent(&model.RawKVEntry{
OpType: model.OpTypeDelete,
Key: []byte{1},
StartTs: 3,
CRTs: 4,
})

require.Equal(t, "uid: 1, tableID: 2, startTs: 3, CRTs: 4",
Key(encoding.EncodeKey(1, 2, event)).String())
require.Equal(t, "uid: 1, tableID: 2, startTs: 0, CRTs: 3",
Key(encoding.EncodeTsKey(1, 2, 3)).String())
}
50 changes: 50 additions & 0 deletions cdc/sorter/leveldb/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package leveldb

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
sorterWriteBytesHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "leveldb_write_bytes",
Help: "Bucketed histogram of sorter write batch bytes",
Buckets: prometheus.ExponentialBuckets(16, 2.0, 20),
}, []string{"capture", "id"})

sorterWriteDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "leveldb_write_duration_seconds",
Help: "Bucketed histogram of sorter write duration",
Buckets: prometheus.ExponentialBuckets(0.004, 2.0, 20),
}, []string{"capture", "id"})

sorterCleanupKVCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "leveldb_cleanup_kv_total",
Help: "The total number of cleaned up kv entries",
}, []string{"capture", "id"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sorterWriteDurationHistogram)
registry.MustRegister(sorterWriteBytesHistogram)
registry.MustRegister(sorterCleanupKVCounter)
}
69 changes: 69 additions & 0 deletions cdc/sorter/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package sorter

import (
"github.com/prometheus/client_golang/prometheus"
)

var (
// SorterEventCount is the metric that counts events output by the sorter.
SorterEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "event_count",
Help: "The number of events output by the sorter",
}, []string{"capture", "changefeed", "type"})

// SorterResolvedTsGauge is the metric that records sorter resolved ts.
SorterResolvedTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "resolved_ts_gauge",
Help: "the resolved ts of the sorter",
}, []string{"capture", "changefeed"})

// SorterInMemoryDataSizeGauge is the metric that records sorter memory usage.
SorterInMemoryDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "in_memory_data_size_gauge",
Help: "The amount of pending data stored in-memory by the sorter",
}, []string{"capture", "id"})

// SorterOnDiskDataSizeGauge is the metric that records sorter disk usage.
SorterOnDiskDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "on_disk_data_size_gauge",
Help: "The amount of pending data stored on-disk by the sorter",
}, []string{"capture", "id"})

// SorterOpenFileCountGauge is the metric that records sorter open files.
SorterOpenFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "open_file_count_gauge",
Help: "The number of open file descriptors held by the sorter",
}, []string{"capture", "id"})
)

// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(SorterEventCount)
registry.MustRegister(SorterResolvedTsGauge)
registry.MustRegister(SorterInMemoryDataSizeGauge)
registry.MustRegister(SorterOnDiskDataSizeGauge)
registry.MustRegister(SorterOpenFileCountGauge)
}
7 changes: 4 additions & 3 deletions cdc/sorter/unified/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/sorter"
sorterencoding "github.com/pingcap/ticdc/cdc/sorter/encoding"
"github.com/pingcap/ticdc/pkg/config"
cerrors "github.com/pingcap/ticdc/pkg/errors"
Expand Down Expand Up @@ -93,9 +94,9 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) {
ticker := time.NewTicker(backgroundJobInterval)
defer ticker.Stop()

metricSorterInMemoryDataSizeGauge := sorterInMemoryDataSizeGauge.WithLabelValues(captureAddr)
metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr)
metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr)
metricSorterInMemoryDataSizeGauge := sorter.SorterInMemoryDataSizeGauge.WithLabelValues(captureAddr, "0")
metricSorterOnDiskDataSizeGauge := sorter.SorterOnDiskDataSizeGauge.WithLabelValues(captureAddr, "0")
metricSorterOpenFileCountGauge := sorter.SorterOpenFileCountGauge.WithLabelValues(captureAddr, "0")

for {
select {
Expand Down
5 changes: 3 additions & 2 deletions cdc/sorter/unified/merger.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/cdc/model"
"github.com/pingcap/ticdc/cdc/sorter"
cerrors "github.com/pingcap/ticdc/pkg/errors"
"github.com/pingcap/ticdc/pkg/notify"
"github.com/pingcap/ticdc/pkg/util"
Expand All @@ -39,11 +40,11 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch
captureAddr := util.CaptureAddrFromCtx(ctx)
changefeedID := util.ChangefeedIDFromCtx(ctx)

metricSorterEventCount := sorterEventCount.MustCurryWith(map[string]string{
metricSorterEventCount := sorter.SorterEventCount.MustCurryWith(map[string]string{
"capture": captureAddr,
"changefeed": changefeedID,
})
metricSorterResolvedTsGauge := sorterResolvedTsGauge.WithLabelValues(captureAddr, changefeedID)
metricSorterResolvedTsGauge := sorter.SorterResolvedTsGauge.WithLabelValues(captureAddr, changefeedID)
metricSorterMergerStartTsGauge := sorterMergerStartTsGauge.WithLabelValues(captureAddr, changefeedID)
metricSorterMergeCountHistogram := sorterMergeCountHistogram.WithLabelValues(captureAddr, changefeedID)

Expand Down
40 changes: 0 additions & 40 deletions cdc/sorter/unified/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,48 +25,13 @@ var (
Help: "the number of events consumed by the sorter",
}, []string{"capture", "changefeed", "type"})

sorterEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "event_count",
Help: "the number of events output by the sorter",
}, []string{"capture", "changefeed", "type"})

sorterResolvedTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "resolved_ts_gauge",
Help: "the resolved ts of the sorter",
}, []string{"capture", "changefeed"})

sorterMergerStartTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "merger_start_ts_gauge",
Help: "the start TS of each merge in the sorter",
}, []string{"capture", "changefeed"})

sorterInMemoryDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "in_memory_data_size_gauge",
Help: "the amount of pending data stored in-memory by the sorter",
}, []string{"capture"})

sorterOnDiskDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "on_disk_data_size_gauge",
Help: "the amount of pending data stored on-disk by the sorter",
}, []string{"capture"})

sorterOpenFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Name: "open_file_count_gauge",
Help: "the number of open file descriptors held by the sorter",
}, []string{"capture"})

sorterFlushCountHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sorter",
Expand All @@ -87,12 +52,7 @@ var (
// InitMetrics registers all metrics in this file
func InitMetrics(registry *prometheus.Registry) {
registry.MustRegister(sorterConsumeCount)
registry.MustRegister(sorterEventCount)
registry.MustRegister(sorterResolvedTsGauge)
registry.MustRegister(sorterMergerStartTsGauge)
registry.MustRegister(sorterInMemoryDataSizeGauge)
registry.MustRegister(sorterOnDiskDataSizeGauge)
registry.MustRegister(sorterOpenFileCountGauge)
registry.MustRegister(sorterFlushCountHistogram)
registry.MustRegister(sorterMergeCountHistogram)
}
4 changes: 2 additions & 2 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -331,9 +331,9 @@ error = '''
get tikv grpc context failed
'''

["CDC:ErrIllegalUnifiedSorterParameter"]
["CDC:ErrIllegalSorterParameter"]
error = '''
illegal parameter for unified sorter: %s
illegal parameter for sorter: %s
'''

["CDC:ErrIndexKeyTableNotFound"]
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ require (
github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14
github.com/swaggo/gin-swagger v1.2.0
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954
github.com/tidwall/gjson v1.9.1
github.com/tidwall/sjson v1.2.2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211011083157-49c8dd23f1f0
Expand Down
5 changes: 4 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -920,8 +920,9 @@ github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+t
github.com/swaggo/swag v1.6.3/go.mod h1:wcc83tB4Mb2aNiL/HP4MFeQdpHUrca+Rp/DRNgWAUio=
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476 h1:UjnSXdNPIG+5FJ6xLQODEdk7gSnJlMldu3sPAxxCO+4=
github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476/go.mod h1:xDhTyuFIujYiN3DKWC/H/83xcfHp+UE/IzWWampG7Zc=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs=
github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM=
github.com/thoas/go-funk v0.7.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q=
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ=
Expand Down Expand Up @@ -1172,6 +1173,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/
golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
Expand Down Expand Up @@ -1266,6 +1268,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
Loading

0 comments on commit 9a8eaf0

Please sign in to comment.