From eba7841b8ed134f68715ace70851fe820668f3e3 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 7 Mar 2023 13:42:41 +0800 Subject: [PATCH] scheduler(ticdc): add splitter interface Signed-off-by: Neil Shen --- cdc/owner/changefeed.go | 3 +- cdc/scheduler/internal/v3/coordinator.go | 9 +- cdc/scheduler/internal/v3/coordinator_test.go | 4 +- .../internal/v3/info_provider_test.go | 4 +- .../v3/keyspan/{region_cache.go => mock.go} | 23 +- .../internal/v3/keyspan/reconciler.go | 141 ++--------- .../internal/v3/keyspan/reconciler_test.go | 211 +--------------- .../v3/keyspan/splitter_region_count.go | 152 ++++++++++++ .../v3/keyspan/splitter_region_count_test.go | 229 ++++++++++++++++++ cdc/scheduler/rexport.go | 8 +- 10 files changed, 435 insertions(+), 349 deletions(-) rename cdc/scheduler/internal/v3/keyspan/{region_cache.go => mock.go} (76%) create mode 100644 cdc/scheduler/internal/v3/keyspan/splitter_region_count.go create mode 100644 cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 9900b36e5ea..0aaee07698e 100644 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -56,8 +56,7 @@ func newSchedulerFromCtx( ownerRev := ctx.GlobalVars().OwnerRevision captureID := ctx.GlobalVars().CaptureInfo.ID ret, err = scheduler.NewScheduler( - ctx, captureID, changeFeedID, - messageServer, messageRouter, ownerRev, epoch, up.RegionCache, up.PDClock, cfg) + ctx, captureID, changeFeedID, messageServer, messageRouter, ownerRev, epoch, up, cfg) return ret, errors.Trace(err) } diff --git a/cdc/scheduler/internal/v3/coordinator.go b/cdc/scheduler/internal/v3/coordinator.go index f8b9cc60298..18ffbe17793 100644 --- a/cdc/scheduler/internal/v3/coordinator.go +++ b/cdc/scheduler/internal/v3/coordinator.go @@ -35,6 +35,7 @@ import ( "github.com/pingcap/tiflow/pkg/p2p" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/spanz" + "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/version" "go.uber.org/zap" ) @@ -77,8 +78,7 @@ func NewCoordinator( messageRouter p2p.MessageRouter, ownerRevision int64, changefeedEpoch uint64, - regionCache keyspan.RegionCache, - pdClock pdutil.Clock, + up *upstream.Upstream, cfg *config.SchedulerConfig, ) (internal.Scheduler, error) { trans, err := transport.NewTransport( @@ -88,9 +88,8 @@ func NewCoordinator( } coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg) coord.trans = trans - coord.reconciler = keyspan.NewReconciler( - changefeedID, regionCache, cfg.ChangefeedSettings) - coord.pdClock = pdClock + coord.reconciler = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings) + coord.pdClock = up.PDClock coord.changefeedEpoch = changefeedEpoch return coord, nil } diff --git a/cdc/scheduler/internal/v3/coordinator_test.go b/cdc/scheduler/internal/v3/coordinator_test.go index ad6a58de024..7ba9df2d325 100644 --- a/cdc/scheduler/internal/v3/coordinator_test.go +++ b/cdc/scheduler/internal/v3/coordinator_test.go @@ -204,8 +204,8 @@ func newTestCoordinator(cfg *config.SchedulerConfig) (*coordinator, *transport.M coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg) trans := transport.NewMockTrans() coord.trans = trans - coord.reconciler = keyspan.NewReconciler( - model.ChangeFeedID{}, keyspan.NewMockRegionCache(), cfg.ChangefeedSettings) + coord.reconciler = keyspan.NewReconcilerForTests( + keyspan.NewMockRegionCache(), cfg.ChangefeedSettings) return coord, trans } diff --git a/cdc/scheduler/internal/v3/info_provider_test.go b/cdc/scheduler/internal/v3/info_provider_test.go index 4fc64c0169f..73e4ed7a024 100644 --- a/cdc/scheduler/internal/v3/info_provider_test.go +++ b/cdc/scheduler/internal/v3/info_provider_test.go @@ -36,8 +36,8 @@ func TestInfoProvider(t *testing.T) { } coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg) cfg.ChangefeedSettings = config.GetDefaultReplicaConfig().Scheduler - coord.reconciler = keyspan.NewReconciler( - model.ChangeFeedID{}, keyspan.NewMockRegionCache(), cfg.ChangefeedSettings) + coord.reconciler = keyspan.NewReconcilerForTests( + keyspan.NewMockRegionCache(), cfg.ChangefeedSettings) coord.captureM.Captures = map[model.CaptureID]*member.CaptureStatus{ "a": {Tables: []tablepb.TableStatus{{ Span: tablepb.Span{TableID: 1}, diff --git a/cdc/scheduler/internal/v3/keyspan/region_cache.go b/cdc/scheduler/internal/v3/keyspan/mock.go similarity index 76% rename from cdc/scheduler/internal/v3/keyspan/region_cache.go rename to cdc/scheduler/internal/v3/keyspan/mock.go index 531d02e7760..88c19522fc0 100644 --- a/cdc/scheduler/internal/v3/keyspan/region_cache.go +++ b/cdc/scheduler/internal/v3/keyspan/mock.go @@ -16,7 +16,9 @@ package keyspan import ( "bytes" + "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/spanz" "github.com/tikv/client-go/v2/tikv" ) @@ -32,16 +34,16 @@ type RegionCache interface { LocateRegionByID(bo *tikv.Backoffer, regionID uint64) (*tikv.KeyLocation, error) } -// MockCache mocks tikv.RegionCache. -type MockCache struct { +// mockCache mocks tikv.RegionCache. +type mockCache struct { regions *spanz.BtreeMap[uint64] } // NewMockRegionCache returns a new MockCache. -func NewMockRegionCache() *MockCache { return &MockCache{regions: spanz.NewBtreeMap[uint64]()} } +func NewMockRegionCache() *mockCache { return &mockCache{regions: spanz.NewBtreeMap[uint64]()} } // ListRegionIDsInKeyRange lists ids of regions in [startKey,endKey]. -func (m *MockCache) ListRegionIDsInKeyRange( +func (m *mockCache) ListRegionIDsInKeyRange( bo *tikv.Backoffer, startKey, endKey []byte, ) (regionIDs []uint64, err error) { m.regions.Ascend(func(loc tablepb.Span, id uint64) bool { @@ -56,7 +58,7 @@ func (m *MockCache) ListRegionIDsInKeyRange( } // LocateRegionByID searches for the region with ID. -func (m *MockCache) LocateRegionByID( +func (m *mockCache) LocateRegionByID( bo *tikv.Backoffer, regionID uint64, ) (loc *tikv.KeyLocation, err error) { m.regions.Ascend(func(span tablepb.Span, id uint64) bool { @@ -71,3 +73,14 @@ func (m *MockCache) LocateRegionByID( }) return } + +// NewReconcilerForTests returns a Reconciler. +func NewReconcilerForTests( + cache RegionCache, config *config.ChangefeedSchedulerConfig, +) *Reconciler { + return &Reconciler{ + tableSpans: make(map[int64]splittedSpans), + config: config, + splitter: newRegionCountSplitter(model.ChangeFeedID{}, cache), + } +} diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler.go b/cdc/scheduler/internal/v3/keyspan/reconciler.go index d0ac3a1a091..c91c3bc1612 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler.go @@ -14,9 +14,7 @@ package keyspan import ( - "bytes" "context" - "math" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" @@ -26,11 +24,18 @@ import ( "github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication" "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/spanz" - "github.com/tikv/client-go/v2/tikv" + "github.com/pingcap/tiflow/pkg/upstream" "go.uber.org/zap" ) -type splitSpans struct { +type splitter interface { + split( + ctx context.Context, span tablepb.Span, totalCaptures int, + config *config.ChangefeedSchedulerConfig, + ) []tablepb.Span +} + +type splittedSpans struct { byAddTable bool spans []tablepb.Span } @@ -38,24 +43,26 @@ type splitSpans struct { // Reconciler reconciles span and table mapping, make sure spans are in // a desired state and covers all table ranges. type Reconciler struct { - tableSpans map[model.TableID]splitSpans + tableSpans map[model.TableID]splittedSpans spanCache []tablepb.Span - regionCache RegionCache changefeedID model.ChangeFeedID config *config.ChangefeedSchedulerConfig + + splitter splitter } // NewReconciler returns a Reconciler. func NewReconciler( - changefeedID model.ChangeFeedID, regionCache RegionCache, + changefeedID model.ChangeFeedID, + up *upstream.Upstream, config *config.ChangefeedSchedulerConfig, ) *Reconciler { return &Reconciler{ - tableSpans: make(map[int64]splitSpans), - regionCache: regionCache, + tableSpans: make(map[int64]splittedSpans), changefeedID: changefeedID, config: config, + splitter: newRegionCountSplitter(changefeedID, up.RegionCache), } } @@ -101,9 +108,9 @@ func (m *Reconciler) Reconcile( tableSpan := spanz.TableIDToComparableSpan(tableID) spans := []tablepb.Span{tableSpan} if compat.CheckSpanReplicationEnabled() { - spans = m.splitSpan(ctx, tableSpan, len(aliveCaptures)) + spans = m.splitter.split(ctx, tableSpan, len(aliveCaptures), m.config) } - m.tableSpans[tableID] = splitSpans{ + m.tableSpans[tableID] = splittedSpans{ byAddTable: true, spans: spans, } @@ -135,7 +142,7 @@ func (m *Reconciler) Reconcile( }) // TODO: maybe we should split holes too. } - m.tableSpans[tableID] = splitSpans{ + m.tableSpans[tableID] = splittedSpans{ byAddTable: false, spans: spans, } @@ -184,113 +191,3 @@ func (m *Reconciler) Reconcile( } return m.spanCache } - -func (m *Reconciler) splitSpan( - ctx context.Context, span tablepb.Span, totalCaptures int, -) []tablepb.Span { - bo := tikv.NewBackoffer(ctx, 500) - regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey) - if err != nil { - log.Warn("schedulerv3: list regions failed, skip split span", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Error(err)) - return []tablepb.Span{span} - } - if len(regions) <= m.config.RegionThreshold || totalCaptures == 0 { - return []tablepb.Span{span} - } - - totalRegions := len(regions) - if totalRegions == 0 { - totalCaptures = 1 - } - stepper := newEvenlySplitStepper(totalCaptures, totalRegions) - spans := make([]tablepb.Span, 0, stepper.SpanCount()) - start, end := 0, stepper.Step() - for { - startRegion, err := m.regionCache.LocateRegionByID(bo, regions[start]) - if err != nil { - log.Warn("schedulerv3: get regions failed, skip split span", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Error(err)) - return []tablepb.Span{span} - } - endRegion, err := m.regionCache.LocateRegionByID(bo, regions[end-1]) - if err != nil { - log.Warn("schedulerv3: get regions failed, skip split span", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Error(err)) - return []tablepb.Span{span} - } - if len(spans) > 0 && - bytes.Compare(spans[len(spans)-1].EndKey, startRegion.StartKey) > 0 { - log.Warn("schedulerv3: list region out of order detected", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Stringer("lastSpan", &spans[len(spans)-1]), - zap.Stringer("region", startRegion)) - return []tablepb.Span{span} - } - spans = append(spans, tablepb.Span{ - TableID: span.TableID, - StartKey: startRegion.StartKey, - EndKey: endRegion.EndKey, - }) - - if end == len(regions) { - break - } - start = end - step := stepper.Step() - if end+step < len(regions) { - end = end + step - } else { - end = len(regions) - } - } - // Make sure spans does not exceed [startKey, endKey). - spans[0].StartKey = span.StartKey - spans[len(spans)-1].EndKey = span.EndKey - return spans -} - -type evenlySplitStepper struct { - spanCount int - regionPerSpan int - extraRegionPerSpan int - remain int -} - -func newEvenlySplitStepper(totalCaptures int, totalRegion int) evenlySplitStepper { - extraRegionPerSpan := 0 - regionPerSpan, remain := totalRegion/totalCaptures, totalRegion%totalCaptures - if regionPerSpan == 0 { - regionPerSpan = 1 - extraRegionPerSpan = 0 - totalCaptures = totalRegion - } else if remain != 0 { - // Evenly distributes the remaining regions. - extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(totalCaptures))) - } - return evenlySplitStepper{ - regionPerSpan: regionPerSpan, - spanCount: totalCaptures, - extraRegionPerSpan: extraRegionPerSpan, - remain: remain, - } -} - -func (e *evenlySplitStepper) SpanCount() int { - return e.spanCount -} - -func (e *evenlySplitStepper) Step() int { - if e.remain <= 0 { - return e.regionPerSpan - } - e.remain = e.remain - e.extraRegionPerSpan - return e.regionPerSpan + e.extraRegionPerSpan -} diff --git a/cdc/scheduler/internal/v3/keyspan/reconciler_test.go b/cdc/scheduler/internal/v3/keyspan/reconciler_test.go index 20526a1b195..9da2aeef7b0 100644 --- a/cdc/scheduler/internal/v3/keyspan/reconciler_test.go +++ b/cdc/scheduler/internal/v3/keyspan/reconciler_test.go @@ -15,7 +15,6 @@ package keyspan import ( "context" - "fmt" "testing" "github.com/pingcap/tiflow/cdc/model" @@ -28,209 +27,9 @@ import ( "github.com/stretchr/testify/require" ) -func TestSplitSpan(t *testing.T) { - t.Parallel() - - cache := NewMockRegionCache() - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, 2) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, 4) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_4"), EndKey: []byte("t2_2")}, 5) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t2_2"), EndKey: []byte("t2_3")}, 6) - - cases := []struct { - totalCaptures int - span tablepb.Span - expectSpans []tablepb.Span - }{ - { - totalCaptures: 7, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region - {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region - {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region - }, - }, - { - totalCaptures: 6, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region - {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region - {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region - }, - }, - { - totalCaptures: 5, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region - {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region - {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region - }, - }, - { - totalCaptures: 4, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region - {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region - }, - }, - { - totalCaptures: 3, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region - {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region - {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region - }, - }, - { - totalCaptures: 2, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_3")}, // 3 region - {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t2")}, // 2 region - }, - }, - { - totalCaptures: 1, - span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - expectSpans: []tablepb.Span{ - {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, // 5 region - }, - }, - } - - for i, cs := range cases { - reconciler := NewReconciler(model.ChangeFeedID{}, cache, &config.ChangefeedSchedulerConfig{ - EnableTableAcrossNodes: true, - RegionThreshold: 1, - }) - spans := reconciler.splitSpan(context.Background(), cs.span, cs.totalCaptures) - require.Equalf(t, cs.expectSpans, spans, "%d %s", i, &cs.span) - } -} - -func TestEvenlySplitSpan(t *testing.T) { - t.Parallel() - - cache := NewMockRegionCache() - totalRegion := 1000 - for i := 0; i < totalRegion; i++ { - cache.regions.ReplaceOrInsert(tablepb.Span{ - StartKey: []byte(fmt.Sprintf("t1_%09d", i)), - EndKey: []byte(fmt.Sprintf("t1_%09d", i+1)), - }, uint64(i+1)) - } - - cases := []struct { - totalCaptures int - expectSpansMin int - expectSpansMax int - }{ - { - totalCaptures: 0, - expectSpansMin: 1000, - expectSpansMax: 1000, - }, - { - totalCaptures: 1, - expectSpansMin: 1000, - expectSpansMax: 1000, - }, - { - totalCaptures: 3, - expectSpansMin: 333, - expectSpansMax: 334, - }, - { - totalCaptures: 7, - expectSpansMin: 142, - expectSpansMax: 143, - }, - { - totalCaptures: 999, - expectSpansMin: 1, - expectSpansMax: 2, - }, - { - totalCaptures: 1000, - expectSpansMin: 1, - expectSpansMax: 1, - }, - { - totalCaptures: 2000, - expectSpansMin: 1, - expectSpansMax: 1, - }, - } - for i, cs := range cases { - reconciler := NewReconciler(model.ChangeFeedID{}, cache, &config.ChangefeedSchedulerConfig{ - EnableTableAcrossNodes: true, - RegionThreshold: 1, - }) - spans := reconciler.splitSpan( - context.Background(), - tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, - cs.totalCaptures, - ) - if cs.totalCaptures == 0 { - require.Equalf(t, 1, len(spans), "%d %v", i, cs) - } else if cs.totalCaptures <= 1000 { - require.Equalf(t, cs.totalCaptures, len(spans), "%d %v", i, cs) - } else { - require.Equalf(t, 1000, len(spans), "%d %v", i, cs) - } - - for _, span := range spans { - start, end := 0, 1000 - if len(span.StartKey) > len("t1") { - _, err := fmt.Sscanf(string(span.StartKey), "t1_%d", &start) - require.Nil(t, err, "%d %v %s", i, cs, span.StartKey) - } - if len(span.EndKey) > len("t2") { - _, err := fmt.Sscanf(string(span.EndKey), "t1_%d", &end) - require.Nil(t, err, "%d %v %s", i, cs, span.EndKey) - } - require.GreaterOrEqual(t, end-start, cs.expectSpansMin, "%d %v", i, cs) - require.LessOrEqual(t, end-start, cs.expectSpansMax, "%d %v", i, cs) - } - } -} - -func TestSplitSpanRegionOutOfOrder(t *testing.T) { - t.Parallel() - - cache := NewMockRegionCache() - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_4")}, 2) - cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3) - - reconciler := NewReconciler(model.ChangeFeedID{}, cache, &config.ChangefeedSchedulerConfig{ - EnableTableAcrossNodes: true, - RegionThreshold: 1, - }) - span := tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")} - spans := reconciler.splitSpan(context.Background(), span, 1) - require.Equal( - t, []tablepb.Span{{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans) -} - func prepareSpanCache( t *testing.T, ss [][3]uint8, // table ID, start key suffix, end key suffix. -) ([]tablepb.Span, *MockCache) { +) ([]tablepb.Span, *mockCache) { cache := NewMockRegionCache() allSpan := make([]tablepb.Span, 0) for i, s := range ss { @@ -283,7 +82,7 @@ func TestReconcile(t *testing.T) { // Test 1. changefeed initialization. reps := spanz.NewBtreeMap[*replication.ReplicationSet]() - reconciler := NewReconciler(model.ChangeFeedID{}, cache, cfg.ChangefeedSettings) + reconciler := NewReconcilerForTests(cache, cfg.ChangefeedSettings) currentTables := &replication.TableRanges{} currentTables.UpdateTables([]model.TableID{1}) spans := reconciler.Reconcile(ctx, currentTables, reps, captures, compat) @@ -295,7 +94,7 @@ func TestReconcile(t *testing.T) { for _, span := range reconciler.tableSpans[1].spans { reps.ReplaceOrInsert(span, nil) } - reconciler = NewReconciler(model.ChangeFeedID{}, cache, cfg.ChangefeedSettings) + reconciler = NewReconcilerForTests(cache, cfg.ChangefeedSettings) currentTables.UpdateTables([]model.TableID{1}) spans = reconciler.Reconcile(ctx, currentTables, reps, captures, compat) require.Equal(t, allSpan[:4], spans) @@ -391,7 +190,7 @@ func TestCompatDisable(t *testing.T) { require.False(t, cm.CheckSpanReplicationEnabled()) ctx := context.Background() reps := spanz.NewBtreeMap[*replication.ReplicationSet]() - reconciler := NewReconciler(model.ChangeFeedID{}, cache, cfg.ChangefeedSettings) + reconciler := NewReconcilerForTests(cache, cfg.ChangefeedSettings) currentTables := &replication.TableRanges{} currentTables.UpdateTables([]model.TableID{1}) spans := reconciler.Reconcile(ctx, currentTables, reps, captures, cm) @@ -438,7 +237,7 @@ func TestBatchAddRateLimit(t *testing.T) { // Add table 2. reps := spanz.NewBtreeMap[*replication.ReplicationSet]() - reconciler := NewReconciler(model.ChangeFeedID{}, cache, cfg.ChangefeedSettings) + reconciler := NewReconcilerForTests(cache, cfg.ChangefeedSettings) currentTables := &replication.TableRanges{} currentTables.UpdateTables([]model.TableID{2}) spans := reconciler.Reconcile(ctx, currentTables, reps, captures, compat) diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go new file mode 100644 index 00000000000..49678971c1d --- /dev/null +++ b/cdc/scheduler/internal/v3/keyspan/splitter_region_count.go @@ -0,0 +1,152 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspan + +import ( + "bytes" + "context" + "math" + + "github.com/pingcap/log" + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/tikv/client-go/v2/tikv" + "go.uber.org/zap" +) + +type regionCountSplitter struct { + changefeedID model.ChangeFeedID + regionCache RegionCache +} + +func newRegionCountSplitter( + changefeedID model.ChangeFeedID, regionCache RegionCache, +) *regionCountSplitter { + return ®ionCountSplitter{ + changefeedID: changefeedID, + regionCache: regionCache, + } +} + +func (m *regionCountSplitter) split( + ctx context.Context, span tablepb.Span, totalCaptures int, + config *config.ChangefeedSchedulerConfig, +) []tablepb.Span { + bo := tikv.NewBackoffer(ctx, 500) + regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey) + if err != nil { + log.Warn("schedulerv3: list regions failed, skip split span", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + return []tablepb.Span{span} + } + if len(regions) <= config.RegionThreshold || totalCaptures == 0 { + return []tablepb.Span{span} + } + + totalRegions := len(regions) + if totalRegions == 0 { + totalCaptures = 1 + } + stepper := newEvenlySplitStepper(totalCaptures, totalRegions) + spans := make([]tablepb.Span, 0, stepper.SpanCount()) + start, end := 0, stepper.Step() + for { + startRegion, err := m.regionCache.LocateRegionByID(bo, regions[start]) + if err != nil { + log.Warn("schedulerv3: get regions failed, skip split span", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + return []tablepb.Span{span} + } + endRegion, err := m.regionCache.LocateRegionByID(bo, regions[end-1]) + if err != nil { + log.Warn("schedulerv3: get regions failed, skip split span", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Error(err)) + return []tablepb.Span{span} + } + if len(spans) > 0 && + bytes.Compare(spans[len(spans)-1].EndKey, startRegion.StartKey) > 0 { + log.Warn("schedulerv3: list region out of order detected", + zap.String("namespace", m.changefeedID.Namespace), + zap.String("changefeed", m.changefeedID.ID), + zap.Stringer("lastSpan", &spans[len(spans)-1]), + zap.Stringer("region", startRegion)) + return []tablepb.Span{span} + } + spans = append(spans, tablepb.Span{ + TableID: span.TableID, + StartKey: startRegion.StartKey, + EndKey: endRegion.EndKey, + }) + + if end == len(regions) { + break + } + start = end + step := stepper.Step() + if end+step < len(regions) { + end = end + step + } else { + end = len(regions) + } + } + // Make sure spans does not exceed [startKey, endKey). + spans[0].StartKey = span.StartKey + spans[len(spans)-1].EndKey = span.EndKey + return spans +} + +type evenlySplitStepper struct { + spanCount int + regionPerSpan int + extraRegionPerSpan int + remain int +} + +func newEvenlySplitStepper(totalCaptures int, totalRegion int) evenlySplitStepper { + extraRegionPerSpan := 0 + regionPerSpan, remain := totalRegion/totalCaptures, totalRegion%totalCaptures + if regionPerSpan == 0 { + regionPerSpan = 1 + extraRegionPerSpan = 0 + totalCaptures = totalRegion + } else if remain != 0 { + // Evenly distributes the remaining regions. + extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(totalCaptures))) + } + return evenlySplitStepper{ + regionPerSpan: regionPerSpan, + spanCount: totalCaptures, + extraRegionPerSpan: extraRegionPerSpan, + remain: remain, + } +} + +func (e *evenlySplitStepper) SpanCount() int { + return e.spanCount +} + +func (e *evenlySplitStepper) Step() int { + if e.remain <= 0 { + return e.regionPerSpan + } + e.remain = e.remain - e.extraRegionPerSpan + return e.regionPerSpan + e.extraRegionPerSpan +} diff --git a/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go b/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go new file mode 100644 index 00000000000..5063f1504bb --- /dev/null +++ b/cdc/scheduler/internal/v3/keyspan/splitter_region_count_test.go @@ -0,0 +1,229 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspan + +import ( + "context" + "fmt" + "testing" + + "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" + "github.com/pingcap/tiflow/pkg/config" + "github.com/stretchr/testify/require" +) + +func TestRegionCountSplitSpan(t *testing.T) { + t.Parallel() + + cache := NewMockRegionCache() + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, 2) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, 4) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_4"), EndKey: []byte("t2_2")}, 5) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t2_2"), EndKey: []byte("t2_3")}, 6) + + cases := []struct { + totalCaptures int + span tablepb.Span + expectSpans []tablepb.Span + }{ + { + totalCaptures: 7, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region + }, + }, + { + totalCaptures: 6, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region + }, + }, + { + totalCaptures: 5, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_1")}, // 1 region + {TableID: 1, StartKey: []byte("t1_1"), EndKey: []byte("t1_2")}, // 1 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region + }, + }, + { + totalCaptures: 4, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, // 1 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t1_4")}, // 1 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region + }, + }, + { + totalCaptures: 3, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_2")}, // 2 region + {TableID: 1, StartKey: []byte("t1_2"), EndKey: []byte("t1_4")}, // 2 region + {TableID: 1, StartKey: []byte("t1_4"), EndKey: []byte("t2")}, // 1 region + }, + }, + { + totalCaptures: 2, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t1_3")}, // 3 region + {TableID: 1, StartKey: []byte("t1_3"), EndKey: []byte("t2")}, // 2 region + }, + }, + { + totalCaptures: 1, + span: tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + expectSpans: []tablepb.Span{ + {TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, // 5 region + }, + }, + } + + for i, cs := range cases { + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) + cfg := &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: true, + RegionThreshold: 1, + } + spans := splitter.split(context.Background(), cs.span, cs.totalCaptures, cfg) + require.Equalf(t, cs.expectSpans, spans, "%d %s", i, &cs.span) + } +} + +func TestRegionCountEvenlySplitSpan(t *testing.T) { + t.Parallel() + + cache := NewMockRegionCache() + totalRegion := 1000 + for i := 0; i < totalRegion; i++ { + cache.regions.ReplaceOrInsert(tablepb.Span{ + StartKey: []byte(fmt.Sprintf("t1_%09d", i)), + EndKey: []byte(fmt.Sprintf("t1_%09d", i+1)), + }, uint64(i+1)) + } + + cases := []struct { + totalCaptures int + expectSpansMin int + expectSpansMax int + }{ + { + totalCaptures: 0, + expectSpansMin: 1000, + expectSpansMax: 1000, + }, + { + totalCaptures: 1, + expectSpansMin: 1000, + expectSpansMax: 1000, + }, + { + totalCaptures: 3, + expectSpansMin: 333, + expectSpansMax: 334, + }, + { + totalCaptures: 7, + expectSpansMin: 142, + expectSpansMax: 143, + }, + { + totalCaptures: 999, + expectSpansMin: 1, + expectSpansMax: 2, + }, + { + totalCaptures: 1000, + expectSpansMin: 1, + expectSpansMax: 1, + }, + { + totalCaptures: 2000, + expectSpansMin: 1, + expectSpansMax: 1, + }, + } + for i, cs := range cases { + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) + cfg := &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: true, + RegionThreshold: 1, + } + spans := splitter.split( + context.Background(), + tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}, + cs.totalCaptures, + cfg, + ) + if cs.totalCaptures == 0 { + require.Equalf(t, 1, len(spans), "%d %v", i, cs) + } else if cs.totalCaptures <= 1000 { + require.Equalf(t, cs.totalCaptures, len(spans), "%d %v", i, cs) + } else { + require.Equalf(t, 1000, len(spans), "%d %v", i, cs) + } + + for _, span := range spans { + start, end := 0, 1000 + if len(span.StartKey) > len("t1") { + _, err := fmt.Sscanf(string(span.StartKey), "t1_%d", &start) + require.Nil(t, err, "%d %v %s", i, cs, span.StartKey) + } + if len(span.EndKey) > len("t2") { + _, err := fmt.Sscanf(string(span.EndKey), "t1_%d", &end) + require.Nil(t, err, "%d %v %s", i, cs, span.EndKey) + } + require.GreaterOrEqual(t, end-start, cs.expectSpansMin, "%d %v", i, cs) + require.LessOrEqual(t, end-start, cs.expectSpansMax, "%d %v", i, cs) + } + } +} + +func TestSplitSpanRegionOutOfOrder(t *testing.T) { + t.Parallel() + + cache := NewMockRegionCache() + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_0"), EndKey: []byte("t1_1")}, 1) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_1"), EndKey: []byte("t1_4")}, 2) + cache.regions.ReplaceOrInsert(tablepb.Span{StartKey: []byte("t1_2"), EndKey: []byte("t1_3")}, 3) + + splitter := newRegionCountSplitter(model.ChangeFeedID{}, cache) + cfg := &config.ChangefeedSchedulerConfig{ + EnableTableAcrossNodes: true, + RegionThreshold: 1, + } + span := tablepb.Span{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")} + spans := splitter.split(context.Background(), span, 1, cfg) + require.Equal( + t, []tablepb.Span{{TableID: 1, StartKey: []byte("t1"), EndKey: []byte("t2")}}, spans) +} diff --git a/cdc/scheduler/rexport.go b/cdc/scheduler/rexport.go index 8b7734845d6..2c68c10f60b 100644 --- a/cdc/scheduler/rexport.go +++ b/cdc/scheduler/rexport.go @@ -23,9 +23,8 @@ import ( "github.com/pingcap/tiflow/pkg/config" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/p2p" - "github.com/pingcap/tiflow/pkg/pdutil" + "github.com/pingcap/tiflow/pkg/upstream" "github.com/prometheus/client_golang/prometheus" - "github.com/tikv/client-go/v2/tikv" ) // TableExecutor is an abstraction for "Processor". @@ -87,13 +86,12 @@ func NewScheduler( messageRouter p2p.MessageRouter, ownerRevision int64, changefeedEpoch uint64, - regionCache *tikv.RegionCache, - pdClock pdutil.Clock, + up *upstream.Upstream, cfg *config.SchedulerConfig, ) (Scheduler, error) { return v3.NewCoordinator( ctx, captureID, changeFeedID, messageServer, messageRouter, ownerRevision, - changefeedEpoch, regionCache, pdClock, cfg) + changefeedEpoch, up, cfg) } // InitMetrics registers all metrics used in scheduler