Skip to content

Commit

Permalink
scheduler(ticdc): split span by region written keys (#8496)
Browse files Browse the repository at this point in the history
ref #7720
  • Loading branch information
overvenus authored Mar 15, 2023
1 parent 0eae523 commit 4891272
Show file tree
Hide file tree
Showing 21 changed files with 681 additions and 33 deletions.
4 changes: 4 additions & 0 deletions cdc/api/v2/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ func (c *ReplicaConfig) toInternalReplicaConfigWithOriginConfig(
res.Scheduler = &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: c.Scheduler.EnableTableAcrossNodes,
RegionThreshold: c.Scheduler.RegionThreshold,
WriteKeyThreshold: c.Scheduler.WriteKeyThreshold,
}
}
return res
Expand Down Expand Up @@ -410,6 +411,7 @@ func ToAPIReplicaConfig(c *config.ReplicaConfig) *ReplicaConfig {
res.Scheduler = &ChangefeedSchedulerConfig{
EnableTableAcrossNodes: cloned.Scheduler.EnableTableAcrossNodes,
RegionThreshold: cloned.Scheduler.RegionThreshold,
WriteKeyThreshold: cloned.Scheduler.WriteKeyThreshold,
}
}
return res
Expand Down Expand Up @@ -569,6 +571,8 @@ type ChangefeedSchedulerConfig struct {
EnableTableAcrossNodes bool `toml:"enable_table_across_nodes" json:"enable_table_across_nodes"`
// RegionThreshold is the region count threshold of splitting a table.
RegionThreshold int `toml:"region_threshold" json:"region_threshold"`
// WriteKeyThreshold is the written keys threshold of splitting a table.
WriteKeyThreshold int `toml:"write_key_threshold" json:"write_key_threshold"`
}

// EtcdData contains key/value pair of etcd data
Expand Down
4 changes: 3 additions & 1 deletion cdc/api/v2/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var defaultAPIConfig = &ReplicaConfig{
Scheduler.EnableTableAcrossNodes,
RegionThreshold: config.GetDefaultReplicaConfig().
Scheduler.RegionThreshold,
WriteKeyThreshold: config.GetDefaultReplicaConfig().
Scheduler.WriteKeyThreshold,
},
}

Expand Down Expand Up @@ -136,7 +138,7 @@ func TestToAPIReplicaConfig(t *testing.T) {
}
cfg.Mounter = &config.MounterConfig{WorkerNum: 11}
cfg.Scheduler = &config.ChangefeedSchedulerConfig{
EnableTableAcrossNodes: true, RegionThreshold: 10001,
EnableTableAcrossNodes: true, RegionThreshold: 10001, WriteKeyThreshold: 10001,
}
cfg2 := ToAPIReplicaConfig(cfg).ToInternalReplicaConfig()
require.Equal(t, "", cfg2.Sink.DispatchRules[0].DispatcherRule)
Expand Down
10 changes: 7 additions & 3 deletions cdc/model/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,11 +808,15 @@ func TestFixSchedulerIncompatible(t *testing.T) {
CreatorVersion: "6.7.0",
SinkURI: "mysql://root:test@127.0.0.1:3306/",
Config: &config.ReplicaConfig{
Scheduler: &config.ChangefeedSchedulerConfig{RegionThreshold: 1000},
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
Scheduler: &config.ChangefeedSchedulerConfig{
RegionThreshold: 1000, WriteKeyThreshold: 1000,
},
Sink: &config.SinkConfig{Protocol: config.ProtocolDefault.String()},
},
},
expectedScheduler: &config.ChangefeedSchedulerConfig{RegionThreshold: 1000},
expectedScheduler: &config.ChangefeedSchedulerConfig{
RegionThreshold: 1000, WriteKeyThreshold: 1000,
},
},
}

Expand Down
5 changes: 4 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ func NewCoordinator(
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.reconciler = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings)
coord.pdClock = up.PDClock
coord.changefeedEpoch = changefeedEpoch
coord.reconciler, err = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings)
if err != nil {
return nil, errors.Trace(err)
}
return coord, nil
}

Expand Down
2 changes: 1 addition & 1 deletion cdc/scheduler/internal/v3/keyspan/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,6 @@ func NewReconcilerForTests(
return &Reconciler{
tableSpans: make(map[int64]splittedSpans),
config: config,
splitter: newRegionCountSplitter(model.ChangeFeedID{}, cache),
splitter: []splitter{newRegionCountSplitter(model.ChangeFeedID{}, cache)},
}
}
24 changes: 19 additions & 5 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"go.uber.org/zap"
Expand All @@ -49,21 +51,28 @@ type Reconciler struct {
changefeedID model.ChangeFeedID
config *config.ChangefeedSchedulerConfig

splitter splitter
splitter []splitter
}

// NewReconciler returns a Reconciler.
func NewReconciler(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
config *config.ChangefeedSchedulerConfig,
) *Reconciler {
) (*Reconciler, error) {
pdapi, err := pdutil.NewPDAPIClient(up.PDClient, up.SecurityConfig)
if err != nil {
return nil, errors.Trace(err)
}
return &Reconciler{
tableSpans: make(map[int64]splittedSpans),
changefeedID: changefeedID,
config: config,
splitter: newRegionCountSplitter(changefeedID, up.RegionCache),
}
splitter: []splitter{
newWriteSplitter(changefeedID, pdapi),
newRegionCountSplitter(changefeedID, up.RegionCache),
},
}, nil
}

// Reconcile spans that need to be replicated based on current cluster status.
Expand Down Expand Up @@ -108,7 +117,12 @@ func (m *Reconciler) Reconcile(
tableSpan := spanz.TableIDToComparableSpan(tableID)
spans := []tablepb.Span{tableSpan}
if compat.CheckSpanReplicationEnabled() {
spans = m.splitter.split(ctx, tableSpan, len(aliveCaptures), m.config)
for _, splitter := range m.splitter {
spans = splitter.split(ctx, tableSpan, len(aliveCaptures), m.config)
if len(spans) > 1 {
break
}
}
}
m.tableSpans[tableID] = splittedSpans{
byAddTable: true,
Expand Down
150 changes: 150 additions & 0 deletions cdc/scheduler/internal/v3/keyspan/splitter_write.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package keyspan

import (
"context"
"encoding/hex"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/pdutil"
"go.uber.org/zap"
)

type writeSplitter struct {
changefeedID model.ChangeFeedID
pdAPIClient pdutil.PDAPIClient
}

func newWriteSplitter(
changefeedID model.ChangeFeedID, pdAPIClient pdutil.PDAPIClient,
) *writeSplitter {
return &writeSplitter{
changefeedID: changefeedID,
pdAPIClient: pdAPIClient,
}
}

func (m *writeSplitter) split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
) []tablepb.Span {
if config.WriteKeyThreshold == 0 {
return nil
}
regions, err := m.pdAPIClient.ScanRegions(ctx, span)
if err != nil {
// Skip split.
return nil
}
if totalCaptures <= 1 {
return []tablepb.Span{span}
}
info := splitRegionsByWrittenKeys(span.TableID, regions, config.WriteKeyThreshold, totalCaptures)
log.Info("schedulerv3: split span by written keys",
zap.Ints("counts", info.Counts),
zap.Ints("weights", info.Weights),
zap.String("span", span.String()))
return info.Spans
}

type splitRegionsInfo struct {
Counts []int
Weights []int
Spans []tablepb.Span
}

// splitRegionsByWrittenKeys returns a slice of regions that evenly split the range by write keys.
func splitRegionsByWrittenKeys(
tableID model.TableID, regions []pdutil.RegionInfo, writeKeyThreshold int, pages int,
) *splitRegionsInfo {
decodeKey := func(hexkey string) []byte {
key, _ := hex.DecodeString(hexkey)
return key
}
totalWriteNormalized := uint64(0)
totalWrite := totalWriteNormalized
for i := range regions {
totalWrite += regions[i].WrittenKeys
// Override 0 to 1 to reflect the baseline cost of a region.
// Also it makes split evenly when there is no write.
regions[i].WrittenKeys++
totalWriteNormalized += regions[i].WrittenKeys
}
if totalWrite < uint64(writeKeyThreshold) {
return &splitRegionsInfo{
Counts: []int{len(regions)},
Weights: []int{int(totalWriteNormalized)},
Spans: []tablepb.Span{{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[0].StartKey)),
EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)),
}},
}
}

writtenKeysPerPage := totalWriteNormalized / uint64(pages)
counts := make([]int, 0, pages)
weights := make([]int, 0, pages)
spans := make([]tablepb.Span, 0, pages)
accWrittenKeys, pageWrittenKeys := uint64(0), uint64(0)
pageStartIdx, pageLastIdx := 0, 0
for i := 1; i < pages; i++ {
for idx := pageStartIdx; idx < len(regions); idx++ {
restPages := pages - i
restRegions := len(regions) - idx
pageLastIdx = idx
currentWrittenKeys := regions[idx].WrittenKeys
if (idx > pageStartIdx) &&
((restPages >= restRegions) ||
(accWrittenKeys+currentWrittenKeys > writtenKeysPerPage)) {
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[pageStartIdx].StartKey)),
EndKey: tablepb.Key(decodeKey(regions[idx-1].EndKey)),
})
counts = append(counts, idx-pageStartIdx)
weights = append(weights, int(pageWrittenKeys))
pageWrittenKeys = 0
pageStartIdx = idx
writtenKeysPerPage = (totalWriteNormalized - accWrittenKeys) / uint64(restPages)
accWrittenKeys = 0
break
}
pageWrittenKeys += currentWrittenKeys
accWrittenKeys += currentWrittenKeys
}
}
// Always end with the last region.
spans = append(spans, tablepb.Span{
TableID: tableID,
StartKey: tablepb.Key(decodeKey(regions[pageLastIdx].StartKey)),
EndKey: tablepb.Key(decodeKey(regions[len(regions)-1].EndKey)),
})
counts = append(counts, len(regions)-pageLastIdx)
pageWrittenKeys = 0
for idx := pageLastIdx; idx < len(regions); idx++ {
pageWrittenKeys += regions[idx].WrittenKeys
}
weights = append(weights, int(pageWrittenKeys))

return &splitRegionsInfo{
Counts: counts,
Weights: weights,
Spans: spans,
}
}
Loading

0 comments on commit 4891272

Please sign in to comment.