diff --git a/cdc/api/v2/model.go b/cdc/api/v2/model.go index dd86998cb0b..13552322fba 100644 --- a/cdc/api/v2/model.go +++ b/cdc/api/v2/model.go @@ -291,6 +291,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig( res.Scheduler = &config.ChangefeedSchedulerConfig{ EnableTableAcrossNodes: c.Scheduler.EnableTableAcrossNodes, RegionThreshold: c.Scheduler.RegionThreshold, + WriteKeyThreshold: c.Scheduler.WriteKeyThreshold, } } return res @@ -410,6 +411,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig { res.Scheduler = &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: cloned.Scheduler.EnableTableAcrossNodes, RegionThreshold: cloned.Scheduler.RegionThreshold, + WriteKeyThreshold: cloned.Scheduler.WriteKeyThreshold, } } return res @@ -569,6 +571,8 @@ type ChangefeedSchedulerConfig struct { EnableTableAcrossNodes bool `toml:"enable_table_across_nodes" json:"enable_table_across_nodes"` // RegionThreshold is the region count threshold of splitting a table. RegionThreshold int `toml:"region_threshold" json:"region_threshold"` + // WriteKeyThreshold is the written keys threshold of splitting a table. + WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"` } // EtcdData contains key/value pair of etcd data diff --git a/cdc/api/v2/model_test.go b/cdc/api/v2/model_test.go index f74c195b1b9..e8e66baa5d4 100644 --- a/cdc/api/v2/model_test.go +++ b/cdc/api/v2/model_test.go @@ -64,6 +64,8 @@ var defaultAPIConfig = &ReplicaConfig{ Scheduler.EnableTableAcrossNodes, RegionThreshold: config.GetDefaultReplicaConfig(). Scheduler.RegionThreshold, + WriteKeyThreshold: config.GetDefaultReplicaConfig(). + Scheduler.WriteKeyThreshold, }, } @@ -136,7 +138,7 @@ func TestToAPIReplicaConfig(t *testing.T) { } cfg.Mounter = &config.MounterConfig{WorkerNum: 11} cfg.Scheduler = &config.ChangefeedSchedulerConfig{ - EnableTableAcrossNodes: true, RegionThreshold: 10001, + EnableTableAcrossNodes: true, RegionThreshold: 10001, WriteKeyThreshold: 10001, } cfg2 := ToAPIReplicaConfig(cfg).ToInternalReplicaConfig() require.Equal(t, "", cfg2.Sink.DispatchRules[0].DispatcherRule) diff --git a/cdc/model/changefeed_test.go b/cdc/model/changefeed_test.go index ed33a59e429..8193ea54c95 100644 --- a/cdc/model/changefeed_test.go +++ b/cdc/model/changefeed_test.go @@ -808,11 +808,15 @@ func TestFixSchedulerIncompatible(t *testing.T) { CreatorVersion: "6.7.0", SinkURI: "mysql://root:test@127.0.0.1:3306/", Config: &config.ReplicaConfig{ - Scheduler: &config.ChangefeedSchedulerConfig{RegionThreshold: 1000}, - Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, + Scheduler: &config.ChangefeedSchedulerConfig{ + RegionThreshold: 1000, WriteKeyThreshold: 1000, + }, + Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()}, }, }, - expectedScheduler: &config.ChangefeedSchedulerConfig{RegionThreshold: 1000}, + expectedScheduler: &config.ChangefeedSchedulerConfig{ + RegionThreshold: 1000, WriteKeyThreshold: 1000, + }, }, } diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index 18ffbe17793..85794c301b4 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -88,9 +88,12 @@ func NewCoordinator( } coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg) coord.trans = trans - coord.reconciler = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings) coord.pdClock = up.PDClock coord.changefeedEpoch = changefeedEpoch + coord.reconciler, err = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings) + if err != nil { + return nil, errors.Trace(err) + } return coord, nil } diff --git a/cdc/scheduler/internal/v3/keyspan/mock.go b/cdc/scheduler/internal/v3/keyspan/mock.go index 88c19522fc0..a767508d17f 100644 --- a/cdc/scheduler/internal/v3/keyspan/mock.go +++ b/cdc/scheduler/internal/v3/keyspan/mock.go @@ -81,6 +81,6 @@ func NewReconcilerForTests( return &Reconciler{ tableSpans: make(map[int64]splittedSpans), config: config, - splitter: newRegionCountSplitter(model.ChangeFeedID{}, cache), + splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache)}, } } diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler.go b/cdc/scheduler/internal/v3/keyspan/reconciler.go index c91c3bc1612..88187debd71 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler.go @@ -23,6 +23,8 @@ import ( "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member" "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "go.uber.org/zap" @@ -49,7 +51,7 @@ type Reconciler struct { changefeedID model.ChangeFeedID config *config.ChangefeedSchedulerConfig - splitter splitter + splitter []splitter } // NewReconciler returns a Reconciler. @@ -57,13 +59,20 @@ func NewReconciler( changefeedID model.ChangeFeedID, up *upstream.Upstream, config *config.ChangefeedSchedulerConfig, -) *Reconciler { +) (*Reconciler, error) { + pdapi, err := pdutil.NewPDAPIClient(up.PDClient, up.SecurityConfig) + if err != nil { + return nil, errors.Trace(err) + } return &Reconciler{ tableSpans: make(map[int64]splittedSpans), changefeedID: changefeedID, config: config, - splitter: newRegionCountSplitter(changefeedID, up.RegionCache), - } + splitter: []splitter{ + newWriteSplitter(changefeedID, pdapi), + newRegionCountSplitter(changefeedID, up.RegionCache), + }, + }, nil } // Reconcile spans that need to be replicated based on current cluster status. @@ -108,7 +117,12 @@ func (m *Reconciler) Reconcile( tableSpan := spanz.TableIDToComparableSpan(tableID) spans := []tablepb.Span{tableSpan} if compat.CheckSpanReplicationEnabled() { - spans = m.splitter.split(ctx, tableSpan, len(aliveCaptures), m.config) + for _, splitter := range m.splitter { + spans = splitter.split(ctx, tableSpan, len(aliveCaptures), m.config) + if len(spans) > 1 { + break + } + } } m.tableSpans[tableID] = splittedSpans{ byAddTable: true, diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write.go b/cdc/scheduler/internal/v3/keyspan/splitter_write.go new file mode 100644 index 00000000000..8f7bc3a9a2d --- /dev/null +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write.go @@ -0,0 +1,150 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspan + +import ( + "context" + "encoding/hex" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" + "go.uber.org/zap" +) + +type writeSplitter struct { + changefeedID model.ChangeFeedID + pdAPIClient pdutil.PDAPIClient +} + +func newWriteSplitter( + changefeedID model.ChangeFeedID, pdAPIClient pdutil.PDAPIClient, +) *writeSplitter { + return &writeSplitter{ + changefeedID: changefeedID, + pdAPIClient: pdAPIClient, + } +} + +func (m *writeSplitter) split( + ctx context.Context, span tablepb.Span, totalCaptures int, + config *config.ChangefeedSchedulerConfig, +) []tablepb.Span { + if config.WriteKeyThreshold == 0 { + return nil + } + regions, err := m.pdAPIClient.ScanRegions(ctx, span) + if err != nil { + // Skip split. + return nil + } + if totalCaptures <= 1 { + return []tablepb.Span{span} + } + info := splitRegionsByWrittenKeys(span.TableID, regions, config.WriteKeyThreshold, totalCaptures) + log.Info("schedulerv3: split span by written keys", + zap.Ints("counts", info.Counts), + zap.Ints("weights", info.Weights), + zap.String("span", span.String())) + return info.Spans +} + +type splitRegionsInfo struct { + Counts []int + Weights []int + Spans []tablepb.Span +} + +// splitRegionsByWrittenKeys returns a slice of regions that evenly split the range by write keys. +func splitRegionsByWrittenKeys( + tableID model.TableID, regions []pdutil.RegionInfo, writeKeyThreshold int, pages int, +) *splitRegionsInfo { + decodeKey := func(hexkey string) []byte { + key, _ := hex.DecodeString(hexkey) + return key + } + totalWriteNormalized := uint64(0) + totalWrite := totalWriteNormalized + for i := range regions { + totalWrite += regions[i].WrittenKeys + // Override 0 to 1 to reflect the baseline cost of a region. + // Also it makes split evenly when there is no write. + regions[i].WrittenKeys++ + totalWriteNormalized += regions[i].WrittenKeys + } + if totalWrite < uint64(writeKeyThreshold) { + return &splitRegionsInfo{ + Counts: []int{len(regions)}, + Weights: []int{int(totalWriteNormalized)}, + Spans: []tablepb.Span{{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[0].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)), + }}, + } + } + + writtenKeysPerPage := totalWriteNormalized / uint64(pages) + counts := make([]int, 0, pages) + weights := make([]int, 0, pages) + spans := make([]tablepb.Span, 0, pages) + accWrittenKeys, pageWrittenKeys := uint64(0), uint64(0) + pageStartIdx, pageLastIdx := 0, 0 + for i := 1; i < pages; i++ { + for idx := pageStartIdx; idx < len(regions); idx++ { + restPages := pages - i + restRegions := len(regions) - idx + pageLastIdx = idx + currentWrittenKeys := regions[idx].WrittenKeys + if (idx > pageStartIdx) && + ((restPages >= restRegions) || + (accWrittenKeys+currentWrittenKeys > writtenKeysPerPage)) { + spans = append(spans, tablepb.Span{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[pageStartIdx].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[idx-1].EndKey)), + }) + counts = append(counts, idx-pageStartIdx) + weights = append(weights, int(pageWrittenKeys)) + pageWrittenKeys = 0 + pageStartIdx = idx + writtenKeysPerPage = (totalWriteNormalized - accWrittenKeys) / uint64(restPages) + accWrittenKeys = 0 + break + } + pageWrittenKeys += currentWrittenKeys + accWrittenKeys += currentWrittenKeys + } + } + // Always end with the last region. + spans = append(spans, tablepb.Span{ + TableID: tableID, + StartKey: tablepb.Key(decodeKey(regions[pageLastIdx].StartKey)), + EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)), + }) + counts = append(counts, len(regions)-pageLastIdx) + pageWrittenKeys = 0 + for idx := pageLastIdx; idx < len(regions); idx++ { + pageWrittenKeys += regions[idx].WrittenKeys + } + weights = append(weights, int(pageWrittenKeys)) + + return &splitRegionsInfo{ + Counts: counts, + Weights: weights, + Spans: spans, + } +} diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go new file mode 100644 index 00000000000..791207f3c55 --- /dev/null +++ b/cdc/scheduler/internal/v3/keyspan/splitter_write_test.go @@ -0,0 +1,211 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspan + +import ( + "context" + "encoding/hex" + "math" + "testing" + + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/stretchr/testify/require" +) + +func prepareRegionsInfo(writtenKeys [7]int) ([]pdutil.RegionInfo, map[int][]byte, map[int][]byte) { + regions := []pdutil.RegionInfo{ + pdutil.NewTestRegionInfo(2, []byte("a"), []byte("b"), uint64(writtenKeys[0])), + pdutil.NewTestRegionInfo(3, []byte("b"), []byte("c"), uint64(writtenKeys[1])), + pdutil.NewTestRegionInfo(4, []byte("c"), []byte("d"), uint64(writtenKeys[2])), + pdutil.NewTestRegionInfo(5, []byte("e"), []byte("f"), uint64(writtenKeys[3])), + pdutil.NewTestRegionInfo(6, []byte("f"), []byte("fa"), uint64(writtenKeys[4])), + pdutil.NewTestRegionInfo(7, []byte("fa"), []byte("fc"), uint64(writtenKeys[5])), + pdutil.NewTestRegionInfo(8, []byte("fc"), []byte("ff"), uint64(writtenKeys[6])), + } + startKeys := map[int][]byte{} + endKeys := map[int][]byte{} + for _, r := range regions { + b, _ := hex.DecodeString(r.StartKey) + startKeys[int(r.ID)] = b + } + for _, r := range regions { + b, _ := hex.DecodeString(r.EndKey) + endKeys[int(r.ID)] = b + } + return regions, startKeys, endKeys +} + +func cloneRegions(info []pdutil.RegionInfo) []pdutil.RegionInfo { + return append([]pdutil.RegionInfo{}, info...) +} + +func TestSplitRegionsByWrittenKeysUniform(t *testing.T) { + t.Parallel() + re := require.New(t) + + regions, startKeys, endKeys := prepareRegionsInfo( + [7]int{100, 100, 100, 100, 100, 100, 100}) + + info := splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 1) + re.Len(info.Counts, 1) + re.EqualValues(7, info.Counts[0]) + re.Len(info.Spans, 1) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[8], info.Spans[0].EndKey) + + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 2) // [2,3,4], [5,6,7,8] + re.Len(info.Counts, 2) + re.EqualValues(3, info.Counts[0]) + re.EqualValues(4, info.Counts[1]) + re.Len(info.Weights, 2) + re.EqualValues(303, info.Weights[0]) + re.EqualValues(404, info.Weights[1]) + re.Len(info.Spans, 2) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[4], info.Spans[0].EndKey) + re.EqualValues(startKeys[5], info.Spans[1].StartKey) + re.EqualValues(endKeys[8], info.Spans[1].EndKey) + + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, 3) // [2,3], [4,5,6], [7,8] + re.Len(info.Counts, 3) + re.EqualValues(2, info.Counts[0]) + re.EqualValues(2, info.Counts[1]) + re.EqualValues(3, info.Counts[2]) + re.Len(info.Weights, 3) + re.EqualValues(202, info.Weights[0]) + re.EqualValues(202, info.Weights[1]) + re.EqualValues(303, info.Weights[2]) + re.Len(info.Spans, 3) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[3], info.Spans[0].EndKey) + re.EqualValues(startKeys[4], info.Spans[1].StartKey) + re.EqualValues(endKeys[5], info.Spans[1].EndKey) + re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[8], info.Spans[2].EndKey) + + // Pages > regons + for p := 7; p <= 10; p++ { + info = splitRegionsByWrittenKeys(0, cloneRegions(regions), 0, p) + re.Len(info.Counts, 7) + for _, c := range info.Counts { + re.EqualValues(1, c) + } + re.Len(info.Weights, 7) + for _, w := range info.Weights { + re.EqualValues(101, w, info) + } + re.Len(info.Spans, 7) + for i, r := range info.Spans { + re.EqualValues(startKeys[2+i], r.StartKey) + re.EqualValues(endKeys[2+i], r.EndKey) + } + } +} + +func TestSplitRegionsByWrittenKeysHotspot1(t *testing.T) { + t.Parallel() + re := require.New(t) + + // Hotspots + regions, startKeys, endKeys := prepareRegionsInfo( + [7]int{100, 1, 100, 1, 1, 1, 100}) + + info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4], [5,6,7], [8] + re.Len(info.Counts, 4) + re.EqualValues(1, info.Counts[0]) + re.EqualValues(1, info.Counts[1]) + re.EqualValues(4, info.Counts[2]) + re.EqualValues(1, info.Counts[3]) + re.Len(info.Weights, 4) + re.EqualValues(101, info.Weights[0]) + re.EqualValues(2, info.Weights[1]) + re.EqualValues(107, info.Weights[2]) + re.EqualValues(101, info.Weights[3]) + re.Len(info.Spans, 4) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[2], info.Spans[0].EndKey) + re.EqualValues(startKeys[3], info.Spans[1].StartKey) + re.EqualValues(endKeys[3], info.Spans[1].EndKey) + re.EqualValues(startKeys[4], info.Spans[2].StartKey) + re.EqualValues(endKeys[7], info.Spans[2].EndKey) + re.EqualValues(startKeys[8], info.Spans[3].StartKey) + re.EqualValues(endKeys[8], info.Spans[3].EndKey) +} + +func TestSplitRegionsByWrittenKeysHotspot2(t *testing.T) { + t.Parallel() + re := require.New(t) + + // Hotspots + regions, startKeys, endKeys := prepareRegionsInfo( + [7]int{1000, 1, 1, 1, 100, 1, 99}) + + info := splitRegionsByWrittenKeys(0, regions, 0, 4) // [2], [3,4,5], [6,7], [8] + re.Len(info.Spans, 4) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[2], info.Spans[0].EndKey) + re.EqualValues(startKeys[3], info.Spans[1].StartKey) + re.EqualValues(endKeys[5], info.Spans[1].EndKey) + re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[7], info.Spans[2].EndKey) + re.EqualValues(startKeys[8], info.Spans[3].StartKey) + re.EqualValues(endKeys[8], info.Spans[3].EndKey) +} + +func TestSplitRegionsByWrittenKeysCold(t *testing.T) { + t.Parallel() + re := require.New(t) + + regions, startKeys, endKeys := prepareRegionsInfo([7]int{}) + info := splitRegionsByWrittenKeys(0, regions, 0, 3) // [2,3], [4,5], [6,7,8] + re.Len(info.Counts, 3) + re.EqualValues(2, info.Counts[0], info) + re.EqualValues(2, info.Counts[1]) + re.EqualValues(3, info.Counts[2]) + re.Len(info.Weights, 3) + re.EqualValues(2, info.Weights[0]) + re.EqualValues(2, info.Weights[1]) + re.EqualValues(3, info.Weights[2]) + re.Len(info.Spans, 3) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[3], info.Spans[0].EndKey) + re.EqualValues(startKeys[4], info.Spans[1].StartKey) + re.EqualValues(endKeys[5], info.Spans[1].EndKey) + re.EqualValues(startKeys[6], info.Spans[2].StartKey) + re.EqualValues(endKeys[8], info.Spans[2].EndKey) +} + +func TestSplitRegionsByWrittenKeysConfig(t *testing.T) { + t.Parallel() + re := require.New(t) + + regions, startKeys, endKeys := prepareRegionsInfo([7]int{1, 1, 1, 1, 1, 1, 1}) + info := splitRegionsByWrittenKeys(1, regions, math.MaxInt, 3) // [2,3,4,5,6,7,8] + re.Len(info.Counts, 1) + re.EqualValues(7, info.Counts[0], info) + re.Len(info.Weights, 1) + re.EqualValues(14, info.Weights[0]) + re.Len(info.Spans, 1) + re.EqualValues(startKeys[2], info.Spans[0].StartKey) + re.EqualValues(endKeys[8], info.Spans[0].EndKey) + re.EqualValues(1, info.Spans[0].TableID) + + s := writeSplitter{} + spans := s.split(context.Background(), tablepb.Span{}, 3, &config.ChangefeedSchedulerConfig{ + WriteKeyThreshold: 0, + }) + require.Empty(t, spans) +} diff --git a/cdc/scheduler/internal/v3/replication/replication_manager.go b/cdc/scheduler/internal/v3/replication/replication_manager.go index 588f43ec730..54ea317056c 100644 --- a/cdc/scheduler/internal/v3/replication/replication_manager.go +++ b/cdc/scheduler/internal/v3/replication/replication_manager.go @@ -548,6 +548,10 @@ func (r *Manager) AdvanceCheckpoint( log.Warn("schedulerv3: cannot advance checkpoint since missing span", zap.String("namespace", r.changefeedID.Namespace), zap.String("changefeed", r.changefeedID.ID), + zap.Bool("tableSpanFound", tableSpanFound), + zap.Bool("tableSpanStartFound", tableSpanStartFound), + zap.Bool("tableSpanEndFound", tableSpanEndFound), + zap.Bool("tableHasHole", tableHasHole), zap.Int64("tableID", tableID)) cannotProceed = true return false diff --git a/docs/swagger/docs.go b/docs/swagger/docs.go index 5c42d262627..809f97c4a3c 100644 --- a/docs/swagger/docs.go +++ b/docs/swagger/docs.go @@ -1856,6 +1856,10 @@ var doc = `{ "region_threshold": { "description": "RegionThreshold is the region count threshold of splitting a table.", "type": "integer" + }, + "write_key_threshold": { + "description": "WriteKeyThreshold is the written keys threshold of splitting a table.", + "type": "integer" } } }, diff --git a/docs/swagger/swagger.json b/docs/swagger/swagger.json index 750866ed840..12e27b2e9af 100644 --- a/docs/swagger/swagger.json +++ b/docs/swagger/swagger.json @@ -1837,6 +1837,10 @@ "region_threshold": { "description": "RegionThreshold is the region count threshold of splitting a table.", "type": "integer" + }, + "write_key_threshold": { + "description": "WriteKeyThreshold is the written keys threshold of splitting a table.", + "type": "integer" } } }, diff --git a/docs/swagger/swagger.yaml b/docs/swagger/swagger.yaml index f6cbe6da96e..2dd2ba57db5 100644 --- a/docs/swagger/swagger.yaml +++ b/docs/swagger/swagger.yaml @@ -378,6 +378,10 @@ definitions: description: RegionThreshold is the region count threshold of splitting a table. type: integer + write_key_threshold: + description: WriteKeyThreshold is the written keys threshold of splitting + a table. + type: integer type: object v2.ColumnSelector: properties: diff --git a/pkg/config/config_test_data.go b/pkg/config/config_test_data.go index 0916756f876..ce1927d7e9b 100644 --- a/pkg/config/config_test_data.go +++ b/pkg/config/config_test_data.go @@ -201,7 +201,8 @@ const ( "scheduler": { "enable-table-across-nodes": true, "region-per-span": 0, - "region-threshold": 100001 + "region-threshold": 100001, + "write-key-threshold": 100001 } }` @@ -258,7 +259,8 @@ const ( }, "scheduler": { "enable-table-across-nodes": true, - "region-threshold": 100001 + "region-threshold": 100001, + "write-key-threshold": 100001 } }` ) diff --git a/pkg/config/replica_config.go b/pkg/config/replica_config.go index b2e05134851..6609d30a583 100644 --- a/pkg/config/replica_config.go +++ b/pkg/config/replica_config.go @@ -71,6 +71,7 @@ var defaultReplicaConfig = &ReplicaConfig{ Scheduler: &ChangefeedSchedulerConfig{ EnableTableAcrossNodes: false, RegionThreshold: 100_000, + WriteKeyThreshold: 0, }, } diff --git a/pkg/config/replica_config_test.go b/pkg/config/replica_config_test.go index 1bd4cf91c25..28542419ecd 100644 --- a/pkg/config/replica_config_test.go +++ b/pkg/config/replica_config_test.go @@ -55,6 +55,7 @@ func TestReplicaConfigMarshal(t *testing.T) { conf.Sink.EnablePartitionSeparator = true conf.Scheduler.EnableTableAcrossNodes = true conf.Scheduler.RegionThreshold = 100001 + conf.Scheduler.WriteKeyThreshold = 100001 b, err := conf.Marshal() require.Nil(t, err) diff --git a/pkg/config/scheduler_config.go b/pkg/config/scheduler_config.go index 96a70cf0e93..566ffef2169 100644 --- a/pkg/config/scheduler_config.go +++ b/pkg/config/scheduler_config.go @@ -26,6 +26,8 @@ type ChangefeedSchedulerConfig struct { EnableTableAcrossNodes bool `toml:"enable-table-across-nodes" json:"enable-table-across-nodes"` // RegionThreshold is the region count threshold of splitting a table. RegionThreshold int `toml:"region-threshold" json:"region-threshold"` + // WriteKeyThreshold is the written keys threshold of splitting a table. + WriteKeyThreshold int `toml:"write-key-threshold" json:"write-key-threshold"` // Deprecated. RegionPerSpan int `toml:"region-per-span" json:"region-per-span"` } diff --git a/pkg/pdutil/api_client.go b/pkg/pdutil/api_client.go index 261743302cc..d8da13a5476 100644 --- a/pkg/pdutil/api_client.go +++ b/pkg/pdutil/api_client.go @@ -16,16 +16,24 @@ package pdutil import ( "bytes" "context" + "encoding/hex" "encoding/json" "fmt" + "io" "net/http" + "net/url" + "strconv" + "strings" "time" "github.com/pingcap/errors" "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/pkg/config" + cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/spanz" pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -34,6 +42,7 @@ const ( regionLabelPrefix = "/pd/api/v1/config/region-label/rules" gcServiceSafePointURL = "/pd/api/v1/gc/safepoint" healthyAPI = "/pd/api/v1/health" + scanRegionAPI = "/pd/api/v1/regions/key" // Split the default rule by following keys to keep metadata region isolated // from the normal data area. @@ -85,6 +94,16 @@ const ( defaultRequestTimeout = 5 * time.Second ) +// PDAPIClient is client for PD http API. +type PDAPIClient interface { + UpdateMetaLabel(ctx context.Context) error + ListGcServiceSafePoint(ctx context.Context) (*ListServiceGCSafepoint, error) + CollectMemberEndpoints(ctx context.Context) ([]string, error) + Healthy(ctx context.Context, endpoint string) error + ScanRegions(ctx context.Context, span tablepb.Span) ([]RegionInfo, error) + Close() +} + // pdAPIClient is the api client of Placement Driver, include grpc client and http client. type pdAPIClient struct { grpcClient pd.Client @@ -92,7 +111,7 @@ type pdAPIClient struct { } // NewPDAPIClient create a new pdAPIClient. -func NewPDAPIClient(pdClient pd.Client, conf *config.SecurityConfig) (*pdAPIClient, error) { +func NewPDAPIClient(pdClient pd.Client, conf *config.SecurityConfig) (PDAPIClient, error) { dialClient, err := httputil.NewClient(conf) if err != nil { return nil, errors.Trace(err) @@ -132,6 +151,128 @@ func (pc *pdAPIClient) UpdateMetaLabel(ctx context.Context) error { return err } +// NewTestRegionInfo creates a new RegionInfo for test purpose. +func NewTestRegionInfo(regionID uint64, start, end []byte, writtenKeys uint64) RegionInfo { + return RegionInfo{ + ID: regionID, + StartKey: hex.EncodeToString(start), + EndKey: hex.EncodeToString(end), + WrittenKeys: writtenKeys, + } +} + +// RegionInfo records detail region info for api usage. +// NOTE: This type is a copy of github.com/tikv/pd/server/api.RegionInfo. +// To reduce dependency tree, we do not import the api package directly. +type RegionInfo struct { + ID uint64 `json:"id"` + StartKey string `json:"start_key"` + EndKey string `json:"end_key"` + WrittenKeys uint64 `json:"written_keys"` +} + +// RegionsInfo contains some regions with the detailed region info. +// NOTE: This type is a copy of github.com/tikv/pd/server/api.RegionInfo. +// To reduce dependency tree, we do not import the api package directly. +type RegionsInfo struct { + Count int `json:"count"` + Regions []RegionInfo `json:"regions"` +} + +// ScanRegions is a reentrant function that updates the meta-region label of upstream cluster. +func (pc *pdAPIClient) ScanRegions(ctx context.Context, span tablepb.Span) ([]RegionInfo, error) { + scanLimit := 1024 + endpoints, err := pc.CollectMemberEndpoints(ctx) + if err != nil { + log.Warn("fail to collec pd member endpoints") + return nil, errors.Trace(err) + } + return pc.scanRegions(ctx, span, endpoints, scanLimit) +} + +func (pc *pdAPIClient) scanRegions( + ctx context.Context, span tablepb.Span, endpoints []string, scanLimit int, +) ([]RegionInfo, error) { + scan := func(endpoint string, startKey, endKey []byte) ([]RegionInfo, error) { + query := url.Values{} + query.Add("key", string(startKey)) + query.Add("end_key", string(endKey)) + query.Add("limit", strconv.Itoa(scanLimit)) + u, _ := url.Parse(endpoint + scanRegionAPI) + u.RawQuery = query.Encode() + resp, err := pc.httpClient.Get(ctx, u.String()) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + defer resp.Body.Close() + data, err := io.ReadAll(resp.Body) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + regions := &RegionsInfo{} + err = json.Unmarshal(data, regions) + if err != nil { + log.Warn("fail to scan regions", + zap.String("endpoint", endpoint), zap.Any("span", span)) + return nil, errors.Trace(err) + } + return regions.Regions, nil + } + + regions := []RegionInfo{} + startKey := span.StartKey + startKeyHex := strings.ToUpper(hex.EncodeToString(startKey)) + isFirstStartKey := true + for spanz.EndCompare(startKey, span.EndKey) < 0 || (len(startKey) == 0 && isFirstStartKey) { + for i, endpoint := range endpoints { + r, err := scan(endpoint, startKey, span.EndKey) + if err != nil && i+1 == len(endpoints) { + return nil, errors.Trace(err) + } + + if len(r) == 0 { + // Because start key is less than end key, there must be some regions. + log.Error("fail to scan region, missing region", + zap.String("endpoint", endpoint)) + return nil, cerror.WrapError(cerror.ErrInternalServerError, + fmt.Errorf("fail to scan region, missing region")) + } + if r[0].StartKey != startKeyHex { + r[0].StartKey = strings.ToUpper(hex.EncodeToString(startKey)) + log.Info("start key mismatch, adjust start key", + zap.String("startKey", startKeyHex), + zap.String("regionStartKey", r[0].StartKey), + zap.Uint64("regionID", r[0].ID)) + } + regions = append(regions, r...) + key, err := hex.DecodeString(regions[len(regions)-1].EndKey) + if err != nil { + log.Info("fail to decode region end key", + zap.String("endKey", regions[len(regions)-1].EndKey), + zap.Uint64("regionID", r[len(regions)-1].ID)) + return nil, errors.Trace(err) + } + startKey = tablepb.Key(key) + startKeyHex = strings.ToUpper(hex.EncodeToString(startKey)) + isFirstStartKey = false + break + } + } + if regions[len(regions)-1].EndKey != string(span.EndKey) { + regions[len(regions)-1].EndKey = strings.ToUpper(hex.EncodeToString(span.EndKey)) + log.Info("end key mismatch, adjust end key", + zap.String("endKey", strings.ToUpper(hex.EncodeToString(span.EndKey))), + zap.String("regionEndKey", regions[len(regions)-1].EndKey), + zap.Uint64("regionID", regions[len(regions)-1].ID)) + } + + return regions, nil +} + // ServiceSafePoint contains gc service safe point type ServiceSafePoint struct { ServiceID string `json:"service_id"` diff --git a/pkg/pdutil/api_client_test.go b/pkg/pdutil/api_client_test.go index 6001650b1d6..f5e338cf665 100644 --- a/pkg/pdutil/api_client_test.go +++ b/pkg/pdutil/api_client_test.go @@ -19,11 +19,14 @@ import ( "encoding/json" "net/http" "net/http/httptest" + "strconv" "testing" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/codec" + "github.com/pingcap/tiflow/cdc/processor/tablepb" cerror "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/httputil" "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" pd "github.com/tikv/pd/client" @@ -84,12 +87,12 @@ func TestMetaLabelFail(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // test url error - err = pc.patchMetaLabel(ctx) + err = pc.(*pdAPIClient).patchMetaLabel(ctx) require.Error(t, err) // test 404 mockClient.url = mockClient.testServer.URL - err = pc.patchMetaLabel(ctx) + err = pc.(*pdAPIClient).patchMetaLabel(ctx) require.Regexp(t, ".*404.*", err) err = pc.UpdateMetaLabel(ctx) @@ -109,7 +112,7 @@ func TestListGcServiceSafePoint(t *testing.T) { require.NoError(t, err) defer pc.Close() _, err = pc.ListGcServiceSafePoint(ctx) - require.Nil(t, err) + require.NoError(t, err) mockClient.testServer.Close() } @@ -148,17 +151,109 @@ func TestMetaLabelDecodeJSON(t *testing.T) { require.Len(t, meta.SetRules, 2) keys := meta.SetRules[1].Data.([]interface{})[0].(map[string]interface{}) startKey, err := hex.DecodeString(keys["start_key"].(string)) - require.Nil(t, err) + require.NoError(t, err) endKey, err := hex.DecodeString(keys["end_key"].(string)) - require.Nil(t, err) + require.NoError(t, err) _, startKey, err = codec.DecodeBytes(startKey, nil) - require.Nil(t, err) + require.NoError(t, err) require.EqualValues( t, spanz.JobTableID, tablecodec.DecodeTableID(startKey), keys["start_key"].(string)) _, endKey, err = codec.DecodeBytes(endKey, nil) - require.Nil(t, err) + require.NoError(t, err) require.EqualValues( t, spanz.JobTableID+1, tablecodec.DecodeTableID(endKey), keys["end_key"].(string)) } + +func TestScanRegions(t *testing.T) { + t.Parallel() + + regions := []RegionInfo{ + NewTestRegionInfo(2, []byte(""), []byte{0, 1}, 0), + NewTestRegionInfo(3, []byte{0, 1}, []byte{0, 2}, 1), + NewTestRegionInfo(4, []byte{0, 2}, []byte{0, 3}, 2), + NewTestRegionInfo(5, []byte{0, 2}, []byte{0, 4}, 3), // a merged region. + NewTestRegionInfo(6, []byte{0, 4}, []byte{1, 0}, 4), + NewTestRegionInfo(7, []byte{1, 0}, []byte{1, 1}, 5), + NewTestRegionInfo(8, []byte{1, 1}, []byte(""), 6), + } + var handler func() RegionsInfo + mockPDServer := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + startKey, _ := hex.DecodeString(r.URL.Query()["key"][0]) + endKey, _ := hex.DecodeString(r.URL.Query()["end_key"][0]) + limit, _ := strconv.Atoi(r.URL.Query()["limit"][0]) + t.Log(startKey, endKey, limit) + info := handler() + info.Count = len(info.Regions) + data, _ := json.Marshal(info) + t.Logf("%s", string(data)) + _, _ = w.Write(data) + }, + )) + defer mockPDServer.Close() + + httpcli, _ := httputil.NewClient(nil) + pc := pdAPIClient{httpClient: httpcli} + + i := 0 + handler = func() RegionsInfo { + start := i + end := i + 1 + i++ + if end > len(regions) { + return RegionsInfo{Regions: regions[start:]} + } + return RegionsInfo{Regions: regions[start:end]} + } + rs, err := pc.scanRegions(context.Background(), tablepb.Span{}, []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 7, len(rs)) + + handler = func() RegionsInfo { + return RegionsInfo{Regions: regions} + } + rs, err = pc.scanRegions(context.Background(), tablepb.Span{}, []string{mockPDServer.URL}, 1024) + require.NoError(t, err) + require.Equal(t, 7, len(rs)) + + i = 0 + handler = func() RegionsInfo { + if i != 0 { + require.FailNow(t, "must only request once") + } + i++ + return RegionsInfo{Regions: regions[2:3]} + } + rs, err = pc.scanRegions( + context.Background(), + tablepb.Span{StartKey: []byte{0, 2, 0}, EndKey: []byte{0, 3}}, + []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 1, len(rs)) + + i = 0 + handler = func() RegionsInfo { + if i == 0 { + i++ + return RegionsInfo{Regions: regions[2:3]} + } else if i == 1 { + i++ + return RegionsInfo{Regions: regions[3:4]} + } else if i == 2 { + i++ + return RegionsInfo{Regions: regions[4:5]} + } else { + require.FailNow(t, "must only request once") + return RegionsInfo{} + } + } + rs, err = pc.scanRegions( + context.Background(), + tablepb.Span{StartKey: []byte{0, 2, 0}, EndKey: []byte{0, 4, 0}}, + []string{mockPDServer.URL}, 1) + require.NoError(t, err) + require.Equal(t, 3, len(rs)) +} diff --git a/pkg/spanz/span.go b/pkg/spanz/span.go index 280852ea327..018e54d43fe 100644 --- a/pkg/spanz/span.go +++ b/pkg/spanz/span.go @@ -120,17 +120,17 @@ func KeyInSpan(k tablepb.Key, span tablepb.Span) bool { // StartCompare compares two start keys. // The result will be 0 if lhs==rhs, -1 if lhs < rhs, and +1 if lhs > rhs func StartCompare(lhs []byte, rhs []byte) int { - if lhs == nil && rhs == nil { + if len(lhs) == 0 && len(rhs) == 0 { return 0 } // Nil means Negative infinity. // It's difference with EndCompare. - if lhs == nil { + if len(lhs) == 0 { return -1 } - if rhs == nil { + if len(rhs) == 0 { return 1 } @@ -140,17 +140,17 @@ func StartCompare(lhs []byte, rhs []byte) int { // EndCompare compares two end keys. // The result will be 0 if lhs==rhs, -1 if lhs < rhs, and +1 if lhs > rhs func EndCompare(lhs []byte, rhs []byte) int { - if lhs == nil && rhs == nil { + if len(lhs) == 0 && len(rhs) == 0 { return 0 } // Nil means Positive infinity. // It's difference with StartCompare. - if lhs == nil { + if len(lhs) == 0 { return 1 } - if rhs == nil { + if len(rhs) == 0 { return -1 } @@ -160,8 +160,8 @@ func EndCompare(lhs []byte, rhs []byte) int { // Intersect return to intersect part of lhs and rhs span. // Return error if there's no intersect part func Intersect(lhs tablepb.Span, rhs tablepb.Span) (span tablepb.Span, err error) { - if lhs.StartKey != nil && EndCompare(lhs.StartKey, rhs.EndKey) >= 0 || - rhs.StartKey != nil && EndCompare(rhs.StartKey, lhs.EndKey) >= 0 { + if len(lhs.StartKey) != 0 && EndCompare(lhs.StartKey, rhs.EndKey) >= 0 || + len(rhs.StartKey) != 0 && EndCompare(rhs.StartKey, lhs.EndKey) >= 0 { return tablepb.Span{}, errors.ErrIntersectNoOverlap.GenWithStackByArgs(lhs, rhs) } diff --git a/pkg/spanz/span_test.go b/pkg/spanz/span_test.go index ffd61e54c6d..4a2bdb8cb42 100644 --- a/pkg/spanz/span_test.go +++ b/pkg/spanz/span_test.go @@ -31,8 +31,8 @@ func TestStartCompare(t *testing.T) { res int }{ {nil, nil, 0}, - {nil, []byte{}, -1}, - {[]byte{}, nil, 1}, + {nil, []byte{}, 0}, + {[]byte{}, nil, 0}, {[]byte{}, []byte{}, 0}, {[]byte{1}, []byte{2}, -1}, {[]byte{2}, []byte{1}, 1}, @@ -53,8 +53,8 @@ func TestEndCompare(t *testing.T) { res int }{ {nil, nil, 0}, - {nil, []byte{}, 1}, - {[]byte{}, nil, -1}, + {nil, []byte{}, 0}, + {[]byte{}, nil, 0}, {[]byte{}, []byte{}, 0}, {[]byte{1}, []byte{2}, -1}, {[]byte{2}, []byte{1}, 1}, diff --git a/tests/integration_tests/api_v2/model.go b/tests/integration_tests/api_v2/model.go index 26bfc1fb7e6..e8e4e84ab90 100644 --- a/tests/integration_tests/api_v2/model.go +++ b/tests/integration_tests/api_v2/model.go @@ -287,6 +287,8 @@ type ChangefeedSchedulerConfig struct { EnableTableAcrossNodes bool `toml:"enable_table_across_nodes" json:"enable_table_across_nodes"` // RegionThreshold is the region count threshold of splitting a table. RegionThreshold int `toml:"region_threshold" json:"region_threshold"` + // WriteKeyThreshold is the written keys threshold of splitting a table. + WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"` } // ChangeFeedInfo describes the detail of a ChangeFeed