From 9a8eaf07209dc1668468e6657aaae6eecf7a1e4a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Fri, 5 Nov 2021 20:55:04 +0800 Subject: [PATCH] leveldb: add message, config and metrics (#3272) --- cdc/metrics.go | 4 + cdc/sorter/leveldb/message/task.go | 56 +++++++ cdc/sorter/leveldb/message/task_test.go | 37 +++++ cdc/sorter/leveldb/metrics.go | 50 ++++++ cdc/sorter/metrics.go | 69 ++++++++ cdc/sorter/unified/backend_pool.go | 7 +- cdc/sorter/unified/merger.go | 5 +- cdc/sorter/unified/metrics.go | 40 ----- errors.toml | 4 +- go.mod | 2 +- go.sum | 5 +- pkg/actor/message/message.go | 23 +++ pkg/actor/message/message_test.go | 8 + pkg/cmd/server/server_test.go | 60 +++++++ pkg/config/config.go | 20 +++ pkg/config/config_data_test.go | 199 ++++++++++++++++++++++++ pkg/config/config_test.go | 35 ++++- pkg/config/sorter.go | 81 +++++++++- pkg/errors/errors.go | 6 +- 19 files changed, 647 insertions(+), 64 deletions(-) create mode 100644 cdc/sorter/leveldb/message/task.go create mode 100644 cdc/sorter/leveldb/message/task_test.go create mode 100644 cdc/sorter/leveldb/metrics.go create mode 100644 cdc/sorter/metrics.go create mode 100644 pkg/config/config_data_test.go diff --git a/cdc/metrics.go b/cdc/metrics.go index ae7cfac34e7..220de00f247 100644 --- a/cdc/metrics.go +++ b/cdc/metrics.go @@ -21,6 +21,8 @@ import ( tablepipeline "github.com/pingcap/ticdc/cdc/processor/pipeline" "github.com/pingcap/ticdc/cdc/puller" "github.com/pingcap/ticdc/cdc/sink" + "github.com/pingcap/ticdc/cdc/sorter" + "github.com/pingcap/ticdc/cdc/sorter/leveldb" "github.com/pingcap/ticdc/cdc/sorter/memory" "github.com/pingcap/ticdc/cdc/sorter/unified" "github.com/pingcap/ticdc/pkg/actor" @@ -45,6 +47,8 @@ func init() { initServerMetrics(registry) actor.InitMetrics(registry) // Sorter metrics + sorter.InitMetrics(registry) memory.InitMetrics(registry) unified.InitMetrics(registry) + leveldb.InitMetrics(registry) } diff --git a/cdc/sorter/leveldb/message/task.go b/cdc/sorter/leveldb/message/task.go new file mode 100644 index 00000000000..24795e94067 --- /dev/null +++ b/cdc/sorter/leveldb/message/task.go @@ -0,0 +1,56 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import ( + "fmt" + + "github.com/pingcap/ticdc/cdc/sorter/encoding" + Iterator "github.com/syndtr/goleveldb/leveldb/iterator" + lutil "github.com/syndtr/goleveldb/leveldb/util" + "golang.org/x/sync/semaphore" +) + +// Task is a leveldb actor task. It carries write and read request. +type Task struct { + UID uint32 + TableID uint64 + + // encoded key -> serde.marshal(event) + // If a value is empty, it deletes the key/value entry in leveldb. + Events map[Key][]byte + // Must be buffered channel to avoid blocking. + IterCh chan LimitedIterator `json:"-"` // Make Task JSON printable. + Irange *lutil.Range + // Set NeedIter whenever caller wants to read something from an iterator. + NeedIter bool +} + +// Key is the key that is written to leveldb. +type Key string + +// String returns a pretty printed string. +func (k Key) String() string { + uid, tableID, startTs, CRTs := encoding.DecodeKey([]byte(k)) + return fmt.Sprintf( + "uid: %d, tableID: %d, startTs: %d, CRTs: %d", + uid, tableID, startTs, CRTs) +} + +// LimitedIterator is a wrapper of leveldb iterator that has a sema to limit +// the total number of open iterators. +type LimitedIterator struct { + Iterator.Iterator + Sema *semaphore.Weighted +} diff --git a/cdc/sorter/leveldb/message/task_test.go b/cdc/sorter/leveldb/message/task_test.go new file mode 100644 index 00000000000..8b0de95283c --- /dev/null +++ b/cdc/sorter/leveldb/message/task_test.go @@ -0,0 +1,37 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package message + +import ( + "testing" + + "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sorter/encoding" + "github.com/stretchr/testify/require" +) + +func TestPrint(t *testing.T) { + t.Parallel() + event := model.NewPolymorphicEvent(&model.RawKVEntry{ + OpType: model.OpTypeDelete, + Key: []byte{1}, + StartTs: 3, + CRTs: 4, + }) + + require.Equal(t, "uid: 1, tableID: 2, startTs: 3, CRTs: 4", + Key(encoding.EncodeKey(1, 2, event)).String()) + require.Equal(t, "uid: 1, tableID: 2, startTs: 0, CRTs: 3", + Key(encoding.EncodeTsKey(1, 2, 3)).String()) +} diff --git a/cdc/sorter/leveldb/metrics.go b/cdc/sorter/leveldb/metrics.go new file mode 100644 index 00000000000..65dd6cb0001 --- /dev/null +++ b/cdc/sorter/leveldb/metrics.go @@ -0,0 +1,50 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package leveldb + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + sorterWriteBytesHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "leveldb_write_bytes", + Help: "Bucketed histogram of sorter write batch bytes", + Buckets: prometheus.ExponentialBuckets(16, 2.0, 20), + }, []string{"capture", "id"}) + + sorterWriteDurationHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "leveldb_write_duration_seconds", + Help: "Bucketed histogram of sorter write duration", + Buckets: prometheus.ExponentialBuckets(0.004, 2.0, 20), + }, []string{"capture", "id"}) + + sorterCleanupKVCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "leveldb_cleanup_kv_total", + Help: "The total number of cleaned up kv entries", + }, []string{"capture", "id"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(sorterWriteDurationHistogram) + registry.MustRegister(sorterWriteBytesHistogram) + registry.MustRegister(sorterCleanupKVCounter) +} diff --git a/cdc/sorter/metrics.go b/cdc/sorter/metrics.go new file mode 100644 index 00000000000..f4e30109101 --- /dev/null +++ b/cdc/sorter/metrics.go @@ -0,0 +1,69 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package sorter + +import ( + "github.com/prometheus/client_golang/prometheus" +) + +var ( + // SorterEventCount is the metric that counts events output by the sorter. + SorterEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "event_count", + Help: "The number of events output by the sorter", + }, []string{"capture", "changefeed", "type"}) + + // SorterResolvedTsGauge is the metric that records sorter resolved ts. + SorterResolvedTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "resolved_ts_gauge", + Help: "the resolved ts of the sorter", + }, []string{"capture", "changefeed"}) + + // SorterInMemoryDataSizeGauge is the metric that records sorter memory usage. + SorterInMemoryDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "in_memory_data_size_gauge", + Help: "The amount of pending data stored in-memory by the sorter", + }, []string{"capture", "id"}) + + // SorterOnDiskDataSizeGauge is the metric that records sorter disk usage. + SorterOnDiskDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "on_disk_data_size_gauge", + Help: "The amount of pending data stored on-disk by the sorter", + }, []string{"capture", "id"}) + + // SorterOpenFileCountGauge is the metric that records sorter open files. + SorterOpenFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: "ticdc", + Subsystem: "sorter", + Name: "open_file_count_gauge", + Help: "The number of open file descriptors held by the sorter", + }, []string{"capture", "id"}) +) + +// InitMetrics registers all metrics in this file +func InitMetrics(registry *prometheus.Registry) { + registry.MustRegister(SorterEventCount) + registry.MustRegister(SorterResolvedTsGauge) + registry.MustRegister(SorterInMemoryDataSizeGauge) + registry.MustRegister(SorterOnDiskDataSizeGauge) + registry.MustRegister(SorterOpenFileCountGauge) +} diff --git a/cdc/sorter/unified/backend_pool.go b/cdc/sorter/unified/backend_pool.go index 3f5add5f70b..689d423e742 100644 --- a/cdc/sorter/unified/backend_pool.go +++ b/cdc/sorter/unified/backend_pool.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/log" + "github.com/pingcap/ticdc/cdc/sorter" sorterencoding "github.com/pingcap/ticdc/cdc/sorter/encoding" "github.com/pingcap/ticdc/pkg/config" cerrors "github.com/pingcap/ticdc/pkg/errors" @@ -93,9 +94,9 @@ func newBackEndPool(dir string, captureAddr string) (*backEndPool, error) { ticker := time.NewTicker(backgroundJobInterval) defer ticker.Stop() - metricSorterInMemoryDataSizeGauge := sorterInMemoryDataSizeGauge.WithLabelValues(captureAddr) - metricSorterOnDiskDataSizeGauge := sorterOnDiskDataSizeGauge.WithLabelValues(captureAddr) - metricSorterOpenFileCountGauge := sorterOpenFileCountGauge.WithLabelValues(captureAddr) + metricSorterInMemoryDataSizeGauge := sorter.SorterInMemoryDataSizeGauge.WithLabelValues(captureAddr, "0") + metricSorterOnDiskDataSizeGauge := sorter.SorterOnDiskDataSizeGauge.WithLabelValues(captureAddr, "0") + metricSorterOpenFileCountGauge := sorter.SorterOpenFileCountGauge.WithLabelValues(captureAddr, "0") for { select { diff --git a/cdc/sorter/unified/merger.go b/cdc/sorter/unified/merger.go index 97ff7baa9ac..e331f1c125e 100644 --- a/cdc/sorter/unified/merger.go +++ b/cdc/sorter/unified/merger.go @@ -26,6 +26,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/log" "github.com/pingcap/ticdc/cdc/model" + "github.com/pingcap/ticdc/cdc/sorter" cerrors "github.com/pingcap/ticdc/pkg/errors" "github.com/pingcap/ticdc/pkg/notify" "github.com/pingcap/ticdc/pkg/util" @@ -39,11 +40,11 @@ func runMerger(ctx context.Context, numSorters int, in <-chan *flushTask, out ch captureAddr := util.CaptureAddrFromCtx(ctx) changefeedID := util.ChangefeedIDFromCtx(ctx) - metricSorterEventCount := sorterEventCount.MustCurryWith(map[string]string{ + metricSorterEventCount := sorter.SorterEventCount.MustCurryWith(map[string]string{ "capture": captureAddr, "changefeed": changefeedID, }) - metricSorterResolvedTsGauge := sorterResolvedTsGauge.WithLabelValues(captureAddr, changefeedID) + metricSorterResolvedTsGauge := sorter.SorterResolvedTsGauge.WithLabelValues(captureAddr, changefeedID) metricSorterMergerStartTsGauge := sorterMergerStartTsGauge.WithLabelValues(captureAddr, changefeedID) metricSorterMergeCountHistogram := sorterMergeCountHistogram.WithLabelValues(captureAddr, changefeedID) diff --git a/cdc/sorter/unified/metrics.go b/cdc/sorter/unified/metrics.go index d510ccf78a1..d4f9a23e2cf 100644 --- a/cdc/sorter/unified/metrics.go +++ b/cdc/sorter/unified/metrics.go @@ -25,20 +25,6 @@ var ( Help: "the number of events consumed by the sorter", }, []string{"capture", "changefeed", "type"}) - sorterEventCount = prometheus.NewCounterVec(prometheus.CounterOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "event_count", - Help: "the number of events output by the sorter", - }, []string{"capture", "changefeed", "type"}) - - sorterResolvedTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "resolved_ts_gauge", - Help: "the resolved ts of the sorter", - }, []string{"capture", "changefeed"}) - sorterMergerStartTsGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ Namespace: "ticdc", Subsystem: "sorter", @@ -46,27 +32,6 @@ var ( Help: "the start TS of each merge in the sorter", }, []string{"capture", "changefeed"}) - sorterInMemoryDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "in_memory_data_size_gauge", - Help: "the amount of pending data stored in-memory by the sorter", - }, []string{"capture"}) - - sorterOnDiskDataSizeGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "on_disk_data_size_gauge", - Help: "the amount of pending data stored on-disk by the sorter", - }, []string{"capture"}) - - sorterOpenFileCountGauge = prometheus.NewGaugeVec(prometheus.GaugeOpts{ - Namespace: "ticdc", - Subsystem: "sorter", - Name: "open_file_count_gauge", - Help: "the number of open file descriptors held by the sorter", - }, []string{"capture"}) - sorterFlushCountHistogram = prometheus.NewHistogramVec(prometheus.HistogramOpts{ Namespace: "ticdc", Subsystem: "sorter", @@ -87,12 +52,7 @@ var ( // InitMetrics registers all metrics in this file func InitMetrics(registry *prometheus.Registry) { registry.MustRegister(sorterConsumeCount) - registry.MustRegister(sorterEventCount) - registry.MustRegister(sorterResolvedTsGauge) registry.MustRegister(sorterMergerStartTsGauge) - registry.MustRegister(sorterInMemoryDataSizeGauge) - registry.MustRegister(sorterOnDiskDataSizeGauge) - registry.MustRegister(sorterOpenFileCountGauge) registry.MustRegister(sorterFlushCountHistogram) registry.MustRegister(sorterMergeCountHistogram) } diff --git a/errors.toml b/errors.toml index 57860d8a95a..72d977fa499 100755 --- a/errors.toml +++ b/errors.toml @@ -331,9 +331,9 @@ error = ''' get tikv grpc context failed ''' -["CDC:ErrIllegalUnifiedSorterParameter"] +["CDC:ErrIllegalSorterParameter"] error = ''' -illegal parameter for unified sorter: %s +illegal parameter for sorter: %s ''' ["CDC:ErrIndexKeyTableNotFound"] diff --git a/go.mod b/go.mod index eeacc6091c8..9e730fe431c 100644 --- a/go.mod +++ b/go.mod @@ -70,7 +70,7 @@ require ( github.com/swaggo/files v0.0.0-20190704085106-630677cd5c14 github.com/swaggo/gin-swagger v1.2.0 github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476 - github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 + github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 github.com/tidwall/gjson v1.9.1 github.com/tidwall/sjson v1.2.2 github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211011083157-49c8dd23f1f0 diff --git a/go.sum b/go.sum index b6b572356d4..b9ea6b4f78b 100644 --- a/go.sum +++ b/go.sum @@ -920,8 +920,9 @@ github.com/swaggo/swag v1.5.1/go.mod h1:1Bl9F/ZBpVWh22nY0zmYyASPO1lI/zIwRDrpZU+t github.com/swaggo/swag v1.6.3/go.mod h1:wcc83tB4Mb2aNiL/HP4MFeQdpHUrca+Rp/DRNgWAUio= github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476 h1:UjnSXdNPIG+5FJ6xLQODEdk7gSnJlMldu3sPAxxCO+4= github.com/swaggo/swag v1.6.6-0.20200529100950-7c765ddd0476/go.mod h1:xDhTyuFIujYiN3DKWC/H/83xcfHp+UE/IzWWampG7Zc= -github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965 h1:1oFLiOyVl+W7bnBzGhf7BbIv9loSFQcieWWYIjLqcAw= github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA= +github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954 h1:xQdMZ1WLrgkkvOZ/LDQxjVxMLdby7osSh4ZEVa5sIjs= +github.com/syndtr/goleveldb v1.0.1-0.20210305035536-64b5b1c73954/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= github.com/thoas/go-funk v0.7.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/thoas/go-funk v0.8.0/go.mod h1:+IWnUfUmFO1+WVYQWQtIJHeRRdaIyyYglZN7xzUPe4Q= github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJfDRtkanvQPiooDH8HvJ2FBh+iKT/OmiQQ= @@ -1172,6 +1173,7 @@ golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/ golang.org/x/net v0.0.0-20200602114024-627f9648deb9/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A= golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200707034311-ab3426394381/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20200813134508-3edf25e44fcc/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20200904194848-62affa334b73/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= @@ -1266,6 +1268,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200814200057-3d37ad5750ed/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/pkg/actor/message/message.go b/pkg/actor/message/message.go index 3e44a4f4604..ff88692305e 100644 --- a/pkg/actor/message/message.go +++ b/pkg/actor/message/message.go @@ -15,6 +15,7 @@ package message import ( "github.com/pingcap/ticdc/cdc/model" + sorter "github.com/pingcap/ticdc/cdc/sorter/leveldb/message" ) // Type is the type of Message @@ -24,7 +25,9 @@ type Type int const ( TypeUnknown Type = iota TypeTick + TypeStop TypeBarrier + TypeSorterTask // Add a new type when adding a new message. ) @@ -34,6 +37,11 @@ type Message struct { Tp Type // BarrierTs BarrierTs model.Ts + // Leveldb sorter task + // TODO: find a way to hide it behind an interface while saving + // memory allocation. + // See https://cs.opensource.google/go/go/+/refs/tags/go1.17.2:src/runtime/iface.go;l=325 + SorterTask sorter.Task } // TickMessage creates the message of Tick @@ -43,6 +51,13 @@ func TickMessage() Message { } } +// StopMessage creates the message of Stop +func StopMessage() Message { + return Message{ + Tp: TypeStop, + } +} + // BarrierMessage creates the message of Command func BarrierMessage(barrierTs model.Ts) Message { return Message{ @@ -50,3 +65,11 @@ func BarrierMessage(barrierTs model.Ts) Message { BarrierTs: barrierTs, } } + +// SorterMessage creates the message of sorter +func SorterMessage(task sorter.Task) Message { + return Message{ + Tp: TypeSorterTask, + SorterTask: task, + } +} diff --git a/pkg/actor/message/message_test.go b/pkg/actor/message/message_test.go index 4551ee9dfa1..18f1e575d88 100644 --- a/pkg/actor/message/message_test.go +++ b/pkg/actor/message/message_test.go @@ -17,6 +17,7 @@ import ( "encoding/json" "testing" + sorter "github.com/pingcap/ticdc/cdc/sorter/leveldb/message" "github.com/pingcap/ticdc/pkg/leakutil" "github.com/stretchr/testify/require" ) @@ -41,3 +42,10 @@ func TestBarrierMessage(t *testing.T) { msg := BarrierMessage(1) require.Equal(t, TypeBarrier, msg.Tp) } + +func TestSorterMessage(t *testing.T) { + task := sorter.Task{UID: 1, TableID: 2} + msg := SorterMessage(task) + require.Equal(t, TypeSorterTask, msg.Tp) + require.Equal(t, task, msg.SorterTask) +} diff --git a/pkg/cmd/server/server_test.go b/pkg/cmd/server/server_test.go index 08cac96c4cf..4286e4ee26b 100644 --- a/pkg/cmd/server/server_test.go +++ b/pkg/cmd/server/server_test.go @@ -15,6 +15,7 @@ package server import ( "fmt" + "math" "os" "path/filepath" "testing" @@ -174,6 +175,21 @@ func (s *serverSuite) TestParseCfg(c *check.C) { MaxMemoryConsumption: 60000, NumWorkerPoolGoroutine: 90, SortDir: config.DefaultSortDir, + EnableLevelDB: false, + LevelDB: config.LevelDBConfig{ + LevelDBCount: 16, + LevelDBConcurrency: 256, + MaxOpenFiles: 10000, + BlockSize: 65536, + BlockCacheSize: 0, + WriterBufferSize: 8388608, + Compression: "snappy", + TargetFileSizeBase: 8388608, + CompactionL0Trigger: 160, + WriteL0SlowdownTrigger: math.MaxInt32, + WriteL0PauseTrigger: math.MaxInt32, + CleanupSpeedLimit: 10000, + }, }, Security: &config.SecurityConfig{ CertPath: "bb", @@ -221,6 +237,20 @@ max-memory-percentage = 3 num-concurrent-worker = 4 num-workerpool-goroutine = 5 sort-dir = "/tmp/just_a_test" +enable-leveldb-sorter = false +[sorter.leveldb] +leveldb-count = 5 +leveldb-concurrency = 6 +max-open-files = 7 +block-size = 32768 # 32 KB +block-cache-size = 8 +writer-buffer-size = 9 +compression = "none" +target-file-size-base = 10 +compaction-l0-trigger = 11 +write-l0-slowdown-trigger = 12 +write-l0-pause-trigger = 13 +cleanup-speed-limit = 14 `, dataDir) err := os.WriteFile(configPath, []byte(configContent), 0o644) c.Assert(err, check.IsNil) @@ -260,6 +290,21 @@ sort-dir = "/tmp/just_a_test" MaxMemoryConsumption: 2000000, NumWorkerPoolGoroutine: 5, SortDir: config.DefaultSortDir, + EnableLevelDB: false, + LevelDB: config.LevelDBConfig{ + LevelDBCount: 5, + LevelDBConcurrency: 6, + MaxOpenFiles: 7, + BlockSize: 32768, + BlockCacheSize: 8, + WriterBufferSize: 9, + Compression: "none", + TargetFileSizeBase: 10, + CompactionL0Trigger: 11, + WriteL0SlowdownTrigger: 12, + WriteL0PauseTrigger: 13, + CleanupSpeedLimit: 14, + }, }, Security: &config.SecurityConfig{}, PerTableMemoryQuota: 10 * 1024 * 1024, // 10M @@ -363,6 +408,21 @@ cert-allowed-cn = ["dd","ee"] MaxMemoryConsumption: 60000000, NumWorkerPoolGoroutine: 5, SortDir: config.DefaultSortDir, + EnableLevelDB: false, + LevelDB: config.LevelDBConfig{ + LevelDBCount: 16, + LevelDBConcurrency: 256, + MaxOpenFiles: 10000, + BlockSize: 65536, + BlockCacheSize: 0, + WriterBufferSize: 8388608, + Compression: "snappy", + TargetFileSizeBase: 8388608, + CompactionL0Trigger: 160, + WriteL0SlowdownTrigger: math.MaxInt32, + WriteL0PauseTrigger: math.MaxInt32, + CleanupSpeedLimit: 10000, + }, }, Security: &config.SecurityConfig{ CertPath: "bb", diff --git a/pkg/config/config.go b/pkg/config/config.go index c1c41b61481..9be1f9b6ad7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -16,6 +16,7 @@ package config import ( "encoding/json" "fmt" + "math" "net" "strings" "sync/atomic" @@ -194,6 +195,25 @@ var defaultServerConfig = &ServerConfig{ MaxMemoryConsumption: 16 * 1024 * 1024 * 1024, // 16GB NumWorkerPoolGoroutine: 16, SortDir: DefaultSortDir, + + // Default leveldb sorter config + EnableLevelDB: false, + LevelDB: LevelDBConfig{ + LevelDBCount: 16, + // Following configs are optimized for write throughput. + // Users should not change them. + LevelDBConcurrency: 256, + MaxOpenFiles: 10000, + BlockSize: 65536, + BlockCacheSize: 0, + WriterBufferSize: 8388608, + Compression: "snappy", + TargetFileSizeBase: 8388608, + CompactionL0Trigger: 160, + WriteL0SlowdownTrigger: math.MaxInt32, + WriteL0PauseTrigger: math.MaxInt32, + CleanupSpeedLimit: 10000, + }, }, Security: &SecurityConfig{}, PerTableMemoryQuota: 10 * 1024 * 1024, // 10MB diff --git a/pkg/config/config_data_test.go b/pkg/config/config_data_test.go new file mode 100644 index 00000000000..e0382a47ccc --- /dev/null +++ b/pkg/config/config_data_test.go @@ -0,0 +1,199 @@ +// Copyright 2021 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +const ( + testCfgTestReplicaConfigOutDated = `{ + "case-sensitive": false, + "enable-old-value": true, + "force-replicate": true, + "check-gc-safe-point": true, + "filter": { + "rules": [ + "1.1" + ], + "ignore-txn-start-ts": null, + "ddl-allow-list": null + }, + "mounter": { + "worker-num": 3 + }, + "sink": { + "dispatch-rules": [ + { + "db-name": "a", + "tbl-name": "b", + "rule": "r1" + }, + { + "db-name": "a", + "tbl-name": "c", + "rule": "r2" + }, + { + "db-name": "a", + "tbl-name": "d", + "rule": "r2" + } + ], + "protocol": "default" + }, + "cyclic-replication": { + "enable": false, + "replica-id": 0, + "filter-replica-ids": null, + "id-buckets": 0, + "sync-ddl": false + }, + "scheduler": { + "type": "table-number", + "polling-time": -1 + }, + "consistent": { + "level": "none", + "max-log-size": 64, + "flush-interval": 1000, + "storage": "" + } +}` + + testCfgTestServerConfigMarshal = `{ + "addr": "192.155.22.33:8887", + "advertise-addr": "", + "log-file": "", + "log-level": "info", + "log": { + "file": { + "max-size": 300, + "max-days": 0, + "max-backups": 0 + } + }, + "data-dir": "", + "gc-ttl": 86400, + "tz": "System", + "capture-session-ttl": 10, + "owner-flush-interval": 200000000, + "processor-flush-interval": 100000000, + "sorter": { + "num-concurrent-worker": 4, + "chunk-size-limit": 999, + "max-memory-percentage": 30, + "max-memory-consumption": 17179869184, + "num-workerpool-goroutine": 16, + "sort-dir": "/tmp/sorter", + "enable-leveldb-sorter": false, + "leveldb": { + "leveldb-count": 16, + "leveldb-concurrency": 256, + "max-open-files": 10000, + "block-size": 65536, + "block-cache-size": 0, + "writer-buffer-size": 8388608, + "compression": "snappy", + "target-file-size-base": 8388608, + "compaction-l0-trigger": 160, + "write-l0-slowdown-trigger": 2147483647, + "write-l0-pause-trigger": 2147483647, + "cleanup-speed-limit": 10000 + } + }, + "security": { + "ca-path": "", + "cert-path": "", + "key-path": "", + "cert-allowed-cn": null + }, + "per-table-memory-quota": 10485760, + "kv-client": { + "worker-concurrent": 8, + "worker-pool-size": 0, + "region-scan-limit": 40 + } +}` + + testCfgTestReplicaConfigMarshal1 = `{ + "case-sensitive": false, + "enable-old-value": true, + "force-replicate": true, + "check-gc-safe-point": true, + "filter": { + "rules": [ + "1.1" + ], + "ignore-txn-start-ts": null + }, + "mounter": { + "worker-num": 3 + }, + "sink": { + "dispatchers": null, + "protocol": "default" + }, + "cyclic-replication": { + "enable": false, + "replica-id": 0, + "filter-replica-ids": null, + "id-buckets": 0, + "sync-ddl": false + }, + "scheduler": { + "type": "table-number", + "polling-time": -1 + }, + "consistent": { + "level": "none", + "max-log-size": 64, + "flush-interval": 1000, + "storage": "" + } +}` + + testCfgTestReplicaConfigMarshal2 = `{ + "case-sensitive": false, + "enable-old-value": true, + "force-replicate": true, + "check-gc-safe-point": true, + "filter": { + "rules": [ + "1.1" + ], + "ignore-txn-start-ts": null + }, + "mounter": { + "worker-num": 3 + }, + "sink": { + "dispatchers": null, + "protocol": "default" + }, + "cyclic-replication": { + "enable": false, + "replica-id": 0, + "filter-replica-ids": null, + "id-buckets": 0, + "sync-ddl": false + }, + "scheduler": { + "type": "table-number", + "polling-time": -1 + }, + "consistent": { + "level": "none", + "max-log-size": 64, + "flush-interval": 1000, + "storage": "" + } +}` +) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 09430135d3a..e8193db6283 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -14,11 +14,20 @@ package config import ( + "bytes" + "encoding/json" "testing" "github.com/stretchr/testify/require" ) +func mustIdentJSON(t *testing.T, j string) string { + var buf bytes.Buffer + err := json.Indent(&buf, []byte(j), "", " ") + require.Nil(t, err) + return buf.String() +} + func TestReplicaConfigMarshal(t *testing.T) { t.Parallel() conf := GetDefaultReplicaConfig() @@ -28,9 +37,9 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.Mounter.WorkerNum = 3 b, err := conf.Marshal() require.Nil(t, err) - require.Equal(t, `{"case-sensitive":false,"enable-old-value":true,"force-replicate":true,"check-gc-safe-point":true,"filter":{"rules":["1.1"],"ignore-txn-start-ts":null},"mounter":{"worker-num":3},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}}`, b) + require.Equal(t, testCfgTestReplicaConfigMarshal1, mustIdentJSON(t, b)) conf2 := new(ReplicaConfig) - err = conf2.Unmarshal([]byte(`{"case-sensitive":false,"enable-old-value":true,"force-replicate":true,"check-gc-safe-point":true,"filter":{"rules":["1.1"],"ignore-txn-start-ts":null},"mounter":{"worker-num":3},"sink":{"dispatchers":null,"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}}`)) + err = conf2.Unmarshal([]byte(testCfgTestReplicaConfigMarshal2)) require.Nil(t, err) require.Equal(t, conf, conf2) } @@ -51,7 +60,7 @@ func TestReplicaConfigClone(t *testing.T) { func TestReplicaConfigOutDated(t *testing.T) { t.Parallel() conf2 := new(ReplicaConfig) - err := conf2.Unmarshal([]byte(`{"case-sensitive":false,"enable-old-value":true,"force-replicate":true,"check-gc-safe-point":true,"filter":{"rules":["1.1"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":3},"sink":{"dispatch-rules":[{"db-name":"a","tbl-name":"b","rule":"r1"},{"db-name":"a","tbl-name":"c","rule":"r2"},{"db-name":"a","tbl-name":"d","rule":"r2"}],"protocol":"default"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1},"consistent":{"level":"none","max-log-size":64,"flush-interval":1000,"storage":""}}`)) + err := conf2.Unmarshal([]byte(testCfgTestReplicaConfigOutDated)) require.Nil(t, err) conf := GetDefaultReplicaConfig() @@ -69,8 +78,7 @@ func TestReplicaConfigOutDated(t *testing.T) { func TestServerConfigMarshal(t *testing.T) { t.Parallel() - - rawConfig := `{"addr":"192.155.22.33:8887","advertise-addr":"","log-file":"","log-level":"info","log":{"file":{"max-size":300,"max-days":0,"max-backups":0}},"data-dir":"","gc-ttl":86400,"tz":"System","capture-session-ttl":10,"owner-flush-interval":200000000,"processor-flush-interval":100000000,"sorter":{"num-concurrent-worker":4,"chunk-size-limit":999,"max-memory-percentage":30,"max-memory-consumption":17179869184,"num-workerpool-goroutine":16,"sort-dir":"/tmp/sorter"},"security":{"ca-path":"","cert-path":"","key-path":"","cert-allowed-cn":null},"per-table-memory-quota":10485760,"kv-client":{"worker-concurrent":8,"worker-pool-size":0,"region-scan-limit":40}}` + rawConfig := testCfgTestServerConfigMarshal conf := GetDefaultServerConfig() conf.Addr = "192.155.22.33:8887" @@ -78,7 +86,7 @@ func TestServerConfigMarshal(t *testing.T) { b, err := conf.Marshal() require.Nil(t, err) - require.Equal(t, rawConfig, b) + require.Equal(t, rawConfig, mustIdentJSON(t, b)) conf2 := new(ServerConfig) err = conf2.Unmarshal([]byte(rawConfig)) require.Nil(t, err) @@ -124,3 +132,18 @@ func TestServerConfigValidateAndAdjust(t *testing.T) { require.Nil(t, conf.ValidateAndAdjust()) require.EqualValues(t, GetDefaultServerConfig().PerTableMemoryQuota, conf.PerTableMemoryQuota) } + +func TestSorterConfigValidateAndAdjust(t *testing.T) { + t.Parallel() + conf := GetDefaultServerConfig().Clone().Sorter + + require.Nil(t, conf.ValidateAndAdjust()) + conf.LevelDB.Compression = "none" + require.Nil(t, conf.ValidateAndAdjust()) + conf.LevelDB.Compression = "snappy" + require.Nil(t, conf.ValidateAndAdjust()) + conf.LevelDB.Compression = "invalid" + require.Error(t, conf.ValidateAndAdjust()) + conf.LevelDB.CleanupSpeedLimit = 0 + require.Error(t, conf.ValidateAndAdjust()) +} diff --git a/pkg/config/sorter.go b/pkg/config/sorter.go index 7ddef8b9218..9035fb66420 100644 --- a/pkg/config/sorter.go +++ b/pkg/config/sorter.go @@ -29,27 +29,96 @@ type SorterConfig struct { NumWorkerPoolGoroutine int `toml:"num-workerpool-goroutine" json:"num-workerpool-goroutine"` // the directory used to store the temporary files generated by the sorter SortDir string `toml:"sort-dir" json:"sort-dir"` + + // EnableLevelDB enables leveldb sorter. + // + // The default value is true. + // TODO: turn on after GA. + EnableLevelDB bool `toml:"enable-leveldb-sorter" json:"enable-leveldb-sorter"` + LevelDB LevelDBConfig `toml:"leveldb" json:"leveldb"` +} + +// LevelDBConfig represents leveldb sorter config. +type LevelDBConfig struct { + // LevelDBCount is the number of leveldb count. + // + // The default value is 16. + LevelDBCount int `toml:"leveldb-count" json:"leveldb-count"` + // LevelDBConcurrency is the maximum write and read concurrency. + // + // The default value is 256. + LevelDBConcurrency int `toml:"leveldb-concurrency" json:"leveldb-concurrency"` + // MaxOpenFiles is the maximum number of open FD by leveldb sorter. + // + // The default value is 10000. + MaxOpenFiles int `toml:"max-open-files" json:"max-open-files"` + // BlockSize the block size of leveldb sorter. + // + // The default value is 65536, 64KB. + BlockSize int `toml:"block-size" json:"block-size"` + // BlockCacheSize is the capacity of leveldb block cache. + // + // The default value is 0. + BlockCacheSize int `toml:"block-cache-size" json:"block-cache-size"` + // WriterBufferSize is the size of memory table of leveldb. + // + // The default value is 8388608, 8MiB. + WriterBufferSize int `toml:"writer-buffer-size" json:"writer-buffer-size"` + // Compression is the compression algorithm that is used by leveldb. + // Valid values are "none" or "snappy". + // + // The default value is "snappy". + Compression string `toml:"compression" json:"compression"` + // TargetFileSizeBase limits size of leveldb sst file that compaction generates. + // + // The default value is 8388608, 8MiB. + TargetFileSizeBase int `toml:"target-file-size-base" json:"target-file-size-base"` + // CompactionL0Trigger defines number of leveldb sst file at level-0 that will + // trigger compaction. + // + // The default value is 160. + CompactionL0Trigger int `toml:"compaction-l0-trigger" json:"compaction-l0-trigger"` + // WriteL0SlowdownTrigger defines number of leveldb sst file at level-0 that + // will trigger write slowdown. + // + // The default value is 1<<31 - 1. + WriteL0SlowdownTrigger int `toml:"write-l0-slowdown-trigger" json:"write-l0-slowdown-trigger"` + // WriteL0PauseTrigger defines number of leveldb sst file at level-0 that will + // pause write. + // + // The default value is 1<<31 - 1. + WriteL0PauseTrigger int `toml:"write-l0-pause-trigger" json:"write-l0-pause-trigger"` + // CleanupSpeedLimit limits clean up speed, based on key value entry count. + // + // The default value is 10000. + CleanupSpeedLimit int `toml:"cleanup-speed-limit" json:"cleanup-speed-limit"` } // ValidateAndAdjust validates and adjusts the sorter configuration func (c *SorterConfig) ValidateAndAdjust() error { if c.ChunkSizeLimit < 1*1024*1024 { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("chunk-size-limit should be at least 1MB") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("chunk-size-limit should be at least 1MB") } if c.NumConcurrentWorker < 1 { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("num-concurrent-worker should be at least 1") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-concurrent-worker should be at least 1") } if c.NumWorkerPoolGoroutine > 4096 { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at most 4096") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at most 4096") } if c.NumConcurrentWorker > c.NumWorkerPoolGoroutine { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("num-concurrent-worker larger than num-workerpool-goroutine is useless") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-concurrent-worker larger than num-workerpool-goroutine is useless") } if c.NumWorkerPoolGoroutine < 1 { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at least 1, larger than 8 is recommended") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("num-workerpool-goroutine should be at least 1, larger than 8 is recommended") } if c.MaxMemoryPressure < 0 || c.MaxMemoryPressure > 100 { - return cerror.ErrIllegalUnifiedSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage") + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("max-memory-percentage should be a percentage") + } + if c.LevelDB.Compression != "none" && c.LevelDB.Compression != "snappy" { + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("sorter.compression must be \"none\" or \"snappy\"") + } + if c.LevelDB.CleanupSpeedLimit <= 1 { + return cerror.ErrIllegalSorterParameter.GenWithStackByArgs("sorter.cleanup-speed-limit must be larger than 1") } return nil diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index 1d6abe860a6..780be7948fe 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -236,11 +236,11 @@ var ( ErrInvalidS3URI = errors.Normalize("invalid s3 uri: %s", errors.RFCCodeText("CDC:ErrInvalidS3URI")) ErrBufferLogTimeout = errors.Normalize("send row changed events to log buffer timeout", errors.RFCCodeText("CDC:ErrBufferLogTimeout")) - // unified sorter errors + // sorter errors ErrUnifiedSorterBackendTerminating = errors.Normalize("unified sorter backend is terminating", errors.RFCCodeText("CDC:ErrUnifiedSorterBackendTerminating")) - ErrIllegalUnifiedSorterParameter = errors.Normalize("illegal parameter for unified sorter: %s", errors.RFCCodeText("CDC:ErrIllegalUnifiedSorterParameter")) - ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled")) ErrUnifiedSorterIOError = errors.Normalize("unified sorter IO error. Make sure your sort-dir is configured correctly by passing a valid argument or toml file to `cdc server`, or if you use TiUP, review the settings in `tiup cluster edit-config`. Details: %s", errors.RFCCodeText("CDC:ErrUnifiedSorterIOError")) + ErrIllegalSorterParameter = errors.Normalize("illegal parameter for sorter: %s", errors.RFCCodeText("CDC:ErrIllegalSorterParameter")) + ErrAsyncIOCancelled = errors.Normalize("asynchronous IO operation is cancelled. Internal use only, report a bug if seen in log", errors.RFCCodeText("CDC:ErrAsyncIOCancelled")) ErrConflictingFileLocks = errors.Normalize("file lock conflict: %s", errors.RFCCodeText("ErrConflictingFileLocks")) ErrSortDirLockError = errors.Normalize("error encountered when locking sort-dir", errors.RFCCodeText("ErrSortDirLockError"))