Skip to content

Commit

Permalink
scheduler(ticdc): add splitter interface
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Mar 13, 2023
1 parent 04f7e22 commit eba7841
Show file tree
Hide file tree
Showing 10 changed files with 435 additions and 349 deletions.
3 changes: 1 addition & 2 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ func newSchedulerFromCtx(
ownerRev := ctx.GlobalVars().OwnerRevision
captureID := ctx.GlobalVars().CaptureInfo.ID
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, epoch, up.RegionCache, up.PDClock, cfg)
ctx, captureID, changeFeedID, messageServer, messageRouter, ownerRev, epoch, up, cfg)
return ret, errors.Trace(err)
}

Expand Down
9 changes: 4 additions & 5 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -77,8 +78,7 @@ func NewCoordinator(
messageRouter p2p.MessageRouter,
ownerRevision int64,
changefeedEpoch uint64,
regionCache keyspan.RegionCache,
pdClock pdutil.Clock,
up *upstream.Upstream,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := transport.NewTransport(
Expand All @@ -88,9 +88,8 @@ func NewCoordinator(
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.reconciler = keyspan.NewReconciler(
changefeedID, regionCache, cfg.ChangefeedSettings)
coord.pdClock = pdClock
coord.reconciler = keyspan.NewReconciler(changefeedID, up, cfg.ChangefeedSettings)
coord.pdClock = up.PDClock
coord.changefeedEpoch = changefeedEpoch
return coord, nil
}
Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ func newTestCoordinator(cfg *config.SchedulerConfig) (*coordinator, *transport.M
coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg)
trans := transport.NewMockTrans()
coord.trans = trans
coord.reconciler = keyspan.NewReconciler(
model.ChangeFeedID{}, keyspan.NewMockRegionCache(), cfg.ChangefeedSettings)
coord.reconciler = keyspan.NewReconcilerForTests(
keyspan.NewMockRegionCache(), cfg.ChangefeedSettings)
return coord, trans
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/scheduler/internal/v3/info_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ func TestInfoProvider(t *testing.T) {
}
coord := newCoordinator("a", model.ChangeFeedID{}, 1, cfg)
cfg.ChangefeedSettings = config.GetDefaultReplicaConfig().Scheduler
coord.reconciler = keyspan.NewReconciler(
model.ChangeFeedID{}, keyspan.NewMockRegionCache(), cfg.ChangefeedSettings)
coord.reconciler = keyspan.NewReconcilerForTests(
keyspan.NewMockRegionCache(), cfg.ChangefeedSettings)
coord.captureM.Captures = map[model.CaptureID]*member.CaptureStatus{
"a": {Tables: []tablepb.TableStatus{{
Span: tablepb.Span{TableID: 1},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package keyspan
import (
"bytes"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/tikv/client-go/v2/tikv"
)
Expand All @@ -32,16 +34,16 @@ type RegionCache interface {
LocateRegionByID(bo *tikv.Backoffer, regionID uint64) (*tikv.KeyLocation, error)
}

// MockCache mocks tikv.RegionCache.
type MockCache struct {
// mockCache mocks tikv.RegionCache.
type mockCache struct {
regions *spanz.BtreeMap[uint64]
}

// NewMockRegionCache returns a new MockCache.
func NewMockRegionCache() *MockCache { return &MockCache{regions: spanz.NewBtreeMap[uint64]()} }
func NewMockRegionCache() *mockCache { return &mockCache{regions: spanz.NewBtreeMap[uint64]()} }

// ListRegionIDsInKeyRange lists ids of regions in [startKey,endKey].
func (m *MockCache) ListRegionIDsInKeyRange(
func (m *mockCache) ListRegionIDsInKeyRange(
bo *tikv.Backoffer, startKey, endKey []byte,
) (regionIDs []uint64, err error) {
m.regions.Ascend(func(loc tablepb.Span, id uint64) bool {
Expand All @@ -56,7 +58,7 @@ func (m *MockCache) ListRegionIDsInKeyRange(
}

// LocateRegionByID searches for the region with ID.
func (m *MockCache) LocateRegionByID(
func (m *mockCache) LocateRegionByID(
bo *tikv.Backoffer, regionID uint64,
) (loc *tikv.KeyLocation, err error) {
m.regions.Ascend(func(span tablepb.Span, id uint64) bool {
Expand All @@ -71,3 +73,14 @@ func (m *MockCache) LocateRegionByID(
})
return
}

// NewReconcilerForTests returns a Reconciler.
func NewReconcilerForTests(
cache RegionCache, config *config.ChangefeedSchedulerConfig,
) *Reconciler {
return &Reconciler{
tableSpans: make(map[int64]splittedSpans),
config: config,
splitter: newRegionCountSplitter(model.ChangeFeedID{}, cache),
}
}
141 changes: 19 additions & 122 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
package keyspan

import (
"bytes"
"context"
"math"

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
Expand All @@ -26,36 +24,45 @@ import (
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/pkg/config"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/tikv/client-go/v2/tikv"
"github.com/pingcap/tiflow/pkg/upstream"
"go.uber.org/zap"
)

type splitSpans struct {
type splitter interface {
split(
ctx context.Context, span tablepb.Span, totalCaptures int,
config *config.ChangefeedSchedulerConfig,
) []tablepb.Span
}

type splittedSpans struct {
byAddTable bool
spans []tablepb.Span
}

// Reconciler reconciles span and table mapping, make sure spans are in
// a desired state and covers all table ranges.
type Reconciler struct {
tableSpans map[model.TableID]splitSpans
tableSpans map[model.TableID]splittedSpans
spanCache []tablepb.Span

regionCache RegionCache
changefeedID model.ChangeFeedID
config *config.ChangefeedSchedulerConfig

splitter splitter
}

// NewReconciler returns a Reconciler.
func NewReconciler(
changefeedID model.ChangeFeedID, regionCache RegionCache,
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
config *config.ChangefeedSchedulerConfig,
) *Reconciler {
return &Reconciler{
tableSpans: make(map[int64]splitSpans),
regionCache: regionCache,
tableSpans: make(map[int64]splittedSpans),
changefeedID: changefeedID,
config: config,
splitter: newRegionCountSplitter(changefeedID, up.RegionCache),
}
}

Expand Down Expand Up @@ -101,9 +108,9 @@ func (m *Reconciler) Reconcile(
tableSpan := spanz.TableIDToComparableSpan(tableID)
spans := []tablepb.Span{tableSpan}
if compat.CheckSpanReplicationEnabled() {
spans = m.splitSpan(ctx, tableSpan, len(aliveCaptures))
spans = m.splitter.split(ctx, tableSpan, len(aliveCaptures), m.config)
}
m.tableSpans[tableID] = splitSpans{
m.tableSpans[tableID] = splittedSpans{
byAddTable: true,
spans: spans,
}
Expand Down Expand Up @@ -135,7 +142,7 @@ func (m *Reconciler) Reconcile(
})
// TODO: maybe we should split holes too.
}
m.tableSpans[tableID] = splitSpans{
m.tableSpans[tableID] = splittedSpans{
byAddTable: false,
spans: spans,
}
Expand Down Expand Up @@ -184,113 +191,3 @@ func (m *Reconciler) Reconcile(
}
return m.spanCache
}

func (m *Reconciler) splitSpan(
ctx context.Context, span tablepb.Span, totalCaptures int,
) []tablepb.Span {
bo := tikv.NewBackoffer(ctx, 500)
regions, err := m.regionCache.ListRegionIDsInKeyRange(bo, span.StartKey, span.EndKey)
if err != nil {
log.Warn("schedulerv3: list regions failed, skip split span",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
return []tablepb.Span{span}
}
if len(regions) <= m.config.RegionThreshold || totalCaptures == 0 {
return []tablepb.Span{span}
}

totalRegions := len(regions)
if totalRegions == 0 {
totalCaptures = 1
}
stepper := newEvenlySplitStepper(totalCaptures, totalRegions)
spans := make([]tablepb.Span, 0, stepper.SpanCount())
start, end := 0, stepper.Step()
for {
startRegion, err := m.regionCache.LocateRegionByID(bo, regions[start])
if err != nil {
log.Warn("schedulerv3: get regions failed, skip split span",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
return []tablepb.Span{span}
}
endRegion, err := m.regionCache.LocateRegionByID(bo, regions[end-1])
if err != nil {
log.Warn("schedulerv3: get regions failed, skip split span",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Error(err))
return []tablepb.Span{span}
}
if len(spans) > 0 &&
bytes.Compare(spans[len(spans)-1].EndKey, startRegion.StartKey) > 0 {
log.Warn("schedulerv3: list region out of order detected",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Stringer("lastSpan", &spans[len(spans)-1]),
zap.Stringer("region", startRegion))
return []tablepb.Span{span}
}
spans = append(spans, tablepb.Span{
TableID: span.TableID,
StartKey: startRegion.StartKey,
EndKey: endRegion.EndKey,
})

if end == len(regions) {
break
}
start = end
step := stepper.Step()
if end+step < len(regions) {
end = end + step
} else {
end = len(regions)
}
}
// Make sure spans does not exceed [startKey, endKey).
spans[0].StartKey = span.StartKey
spans[len(spans)-1].EndKey = span.EndKey
return spans
}

type evenlySplitStepper struct {
spanCount int
regionPerSpan int
extraRegionPerSpan int
remain int
}

func newEvenlySplitStepper(totalCaptures int, totalRegion int) evenlySplitStepper {
extraRegionPerSpan := 0
regionPerSpan, remain := totalRegion/totalCaptures, totalRegion%totalCaptures
if regionPerSpan == 0 {
regionPerSpan = 1
extraRegionPerSpan = 0
totalCaptures = totalRegion
} else if remain != 0 {
// Evenly distributes the remaining regions.
extraRegionPerSpan = int(math.Ceil(float64(remain) / float64(totalCaptures)))
}
return evenlySplitStepper{
regionPerSpan: regionPerSpan,
spanCount: totalCaptures,
extraRegionPerSpan: extraRegionPerSpan,
remain: remain,
}
}

func (e *evenlySplitStepper) SpanCount() int {
return e.spanCount
}

func (e *evenlySplitStepper) Step() int {
if e.remain <= 0 {
return e.regionPerSpan
}
e.remain = e.remain - e.extraRegionPerSpan
return e.regionPerSpan + e.extraRegionPerSpan
}
Loading

0 comments on commit eba7841

Please sign in to comment.