Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler(ticdc): fix impossible spans panic #7948

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 33 additions & 17 deletions cdc/scheduler/internal/v3/keyspan/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,15 @@ import (
"go.uber.org/zap"
)

type splitSpans struct {
byAddTable bool
spans []tablepb.Span
}

// Reconciler reconciles span and table mapping, make sure spans are in
// a desired state and covers all table ranges.
type Reconciler struct {
tableSpans map[model.TableID][]tablepb.Span
tableSpans map[model.TableID]splitSpans
spanCache []tablepb.Span

regionCache RegionCache
Expand All @@ -44,7 +49,7 @@ func NewReconciler(
maxRegionPerSpan int,
) *Reconciler {
return &Reconciler{
tableSpans: make(map[int64][]tablepb.Span),
tableSpans: make(map[int64]splitSpans),
regionCache: regionCache,
changefeedID: changefeedID,
maxRegionPerSpan: maxRegionPerSpan,
Expand Down Expand Up @@ -81,28 +86,32 @@ func (m *Reconciler) Reconcile(
if len(coveredSpans) == 0 {
// No such spans in replications.
if _, ok := m.tableSpans[tableID]; ok {
// We have seen such spans before, it is impossible.
log.Panic("schedulerv3: impossible spans",
zap.String("changefeed", m.changefeedID.ID),
zap.String("namespace", m.changefeedID.Namespace),
zap.Int64("tableID", tableID),
zap.Stringer("spanStart", &tableStart),
zap.Stringer("spanEnd", &tableEnd))
// And we have seen such spans before, it means these spans are
// not yet be scheduled due to basic scheduler's batch add task
// rate limit.
continue
}
// And we have not seen such spans before, maybe:
// 1. it's a table being added when starting a changefeed
// or after owner switch.
// 4. it's a new table being created by DDL when a changefeed is running.
tableSpan := spanz.TableIDToComparableSpan(tableID)
spans := []tablepb.Span{tableSpan}
if compat.CheckSpanReplicationEnabled() {
m.tableSpans[tableID] = m.splitSpan(ctx, tableSpan)
} else {
// Do not split table if span replication is not enabled.
m.tableSpans[tableID] = []tablepb.Span{tableSpan}
spans = m.splitSpan(ctx, tableSpan)
}
m.tableSpans[tableID] = splitSpans{
byAddTable: true,
spans: spans,
}
updateCache = true
} else if len(holes) != 0 {
// There are some holes in the table span, maybe:
if spans, ok := m.tableSpans[tableID]; ok && spans.byAddTable {
// These spans are split by reconciler add table. It may be
// still in progress because of basic scheduler rate limit.
continue
}
// 3. owner switch after some captures failed.
log.Info("schedulerv3: detect owner switch after captures fail",
zap.String("changefeed", m.changefeedID.ID),
Expand All @@ -115,14 +124,21 @@ func (m *Reconciler) Reconcile(
zap.Stringer("foundEnd", &coveredSpans[len(coveredSpans)-1]))
for i := range holes {
holes[i].TableID = tableID
m.tableSpans[tableID] = append(coveredSpans, holes[i])
coveredSpans = append(coveredSpans, holes[i])
// TODO: maybe we should split holes too.
}
m.tableSpans[tableID] = splitSpans{
byAddTable: false,
spans: coveredSpans,
}
updateCache = true
} else {
// Found and no hole, maybe:
// 2. owner switch and no capture fails.
m.tableSpans[tableID] = coveredSpans
m.tableSpans[tableID] = splitSpans{
byAddTable: false,
spans: coveredSpans,
}
}
}

Expand Down Expand Up @@ -151,8 +167,8 @@ func (m *Reconciler) Reconcile(

if updateCache {
m.spanCache = make([]tablepb.Span, 0)
for _, spans := range m.tableSpans {
m.spanCache = append(m.spanCache, spans...)
for _, ss := range m.tableSpans {
m.spanCache = append(m.spanCache, ss.spans...)
}
}
return m.spanCache
Expand Down
60 changes: 47 additions & 13 deletions cdc/scheduler/internal/v3/keyspan/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,34 +157,34 @@ func TestReconcile(t *testing.T) {
reconciler := NewReconciler(model.ChangeFeedID{}, cache, cfg.RegionPerSpan)
spans := reconciler.Reconcile(ctx, []model.TableID{1}, reps, compat)
require.Equal(t, allSpan[:4], spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// Test 1. owner switch no capture fails.
for _, span := range reconciler.tableSpans[1] {
for _, span := range reconciler.tableSpans[1].spans {
reps.ReplaceOrInsert(span, nil)
}
reconciler = NewReconciler(model.ChangeFeedID{}, cache, cfg.RegionPerSpan)
spans = reconciler.Reconcile(ctx, []model.TableID{1}, reps, compat)
require.Equal(t, allSpan[:4], spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// Test 3. add table 2.
spans = reconciler.Reconcile(ctx, []model.TableID{1, 2}, reps, compat)
spanz.Sort(spans)
require.Equal(t, allSpan, spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
require.Equal(t, allSpan[4:], reconciler.tableSpans[2])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, allSpan[4:], reconciler.tableSpans[2].spans)
require.Equal(t, 2, len(reconciler.tableSpans))

// Test 4. drop table 2.
for _, span := range reconciler.tableSpans[2] {
for _, span := range reconciler.tableSpans[2].spans {
reps.ReplaceOrInsert(span, nil)
}
spans = reconciler.Reconcile(ctx, []model.TableID{1}, reps, compat)
require.Equal(t, allSpan[:4], spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// Test 2. Owner switch and some captures fail.
Expand All @@ -193,8 +193,8 @@ func TestReconcile(t *testing.T) {
spans = reconciler.Reconcile(ctx, []model.TableID{1}, reps, compat)
spanz.Sort(spans)
require.Equal(t, allSpan[:4], spans)
spanz.Sort(reconciler.tableSpans[1])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
spanz.Sort(reconciler.tableSpans[1].spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// End spans is missing.
Expand All @@ -203,8 +203,8 @@ func TestReconcile(t *testing.T) {
spans = reconciler.Reconcile(ctx, []model.TableID{1}, reps, compat)
spanz.Sort(spans)
require.Equal(t, allSpan[:4], spans)
spanz.Sort(reconciler.tableSpans[1])
require.Equal(t, allSpan[:4], reconciler.tableSpans[1])
spanz.Sort(reconciler.tableSpans[1].spans)
require.Equal(t, allSpan[:4], reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// 2 middle spans are missing.
Expand All @@ -221,8 +221,8 @@ func TestReconcile(t *testing.T) {
expectedSpan = append(expectedSpan, allSpan[3])
spanz.Sort(spans)
require.Equal(t, expectedSpan, spans)
spanz.Sort(reconciler.tableSpans[1])
require.Equal(t, expectedSpan, reconciler.tableSpans[1])
spanz.Sort(reconciler.tableSpans[1].spans)
require.Equal(t, expectedSpan, reconciler.tableSpans[1].spans)
require.Equal(t, 1, len(reconciler.tableSpans))
}

Expand Down Expand Up @@ -263,3 +263,37 @@ func TestCompatDisable(t *testing.T) {
require.Equal(t, allSpan[4:], spans[1:])
require.Len(t, spans, 3)
}

func TestBatchAddRateLimit(t *testing.T) {
t.Parallel()

allSpan, cache := prepareSpanCache(t, [][3]uint8{
{2, 0, 2},
{2, 2, 3},
{2, 3, 4},
})

cfg := &config.SchedulerConfig{RegionPerSpan: 1}
compat := compat.New(cfg, map[string]*model.CaptureInfo{})
ctx := context.Background()

// Add table 2.
reps := spanz.NewMap[*replication.ReplicationSet]()
reconciler := NewReconciler(model.ChangeFeedID{}, cache, cfg.RegionPerSpan)
spans := reconciler.Reconcile(ctx, []model.TableID{2}, reps, compat)
require.Equal(t, allSpan, spans)
require.Equal(t, allSpan, reconciler.tableSpans[2].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

// Simulate batch add rate limited
spans = reconciler.Reconcile(ctx, []model.TableID{2}, reps, compat)
require.Equal(t, allSpan, spans)
require.Equal(t, allSpan, reconciler.tableSpans[2].spans)
require.Equal(t, 1, len(reconciler.tableSpans))

reps.ReplaceOrInsert(allSpan[0], nil)
spans = reconciler.Reconcile(ctx, []model.TableID{2}, reps, compat)
require.Equal(t, allSpan, spans)
require.Equal(t, allSpan, reconciler.tableSpans[2].spans)
require.Equal(t, 1, len(reconciler.tableSpans))
}