Skip to content

Commit

Permalink
scheduler(ticdc): replace tableID with Span (#7806)
Browse files Browse the repository at this point in the history
ref #7720
  • Loading branch information
overvenus authored Dec 16, 2022
1 parent f2424c7 commit 92772d4
Show file tree
Hide file tree
Showing 28 changed files with 1,074 additions and 793 deletions.
15 changes: 7 additions & 8 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
Expand All @@ -52,15 +52,14 @@ func newSchedulerFromCtx(
cfg := config.GetGlobalServerConfig().Debug
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, cfg.Scheduler, pdClock)
messageServer, messageRouter, ownerRev, up.RegionCache, up.PDClock, cfg.Scheduler)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context,
pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, pdClock)
return newSchedulerFromCtx(ctx, up)
}

type changefeed struct {
Expand Down Expand Up @@ -127,7 +126,7 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error)
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand Down Expand Up @@ -165,7 +164,7 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -540,7 +539,7 @@ LOOP:
zap.String("changefeed", c.id.ID))

// create scheduler
c.scheduler, err = c.newScheduler(ctx, c.upstream.PDClock)
c.scheduler, err = c.newScheduler(ctx, c.upstream)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -216,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
})
Expand Down
5 changes: 2 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -63,7 +62,7 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -103,7 +102,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
Expand Down
42 changes: 30 additions & 12 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/keyspan"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/scheduler"
Expand All @@ -31,6 +34,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)
Expand All @@ -54,6 +58,8 @@ type coordinator struct {
replicationM *replication.Manager
captureM *member.CaptureManager
schedulerM *scheduler.Manager
reconciler *keyspan.Reconciler
compat *compat.Compat
pdClock pdutil.Clock

lastCollectTime time.Time
Expand All @@ -68,8 +74,9 @@ func NewCoordinator(
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
regionCache keyspan.RegionCache,
pdClock pdutil.Clock,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := transport.NewTransport(
ctx, changefeedID, transport.SchedulerRole, messageServer, messageRouter)
Expand All @@ -78,6 +85,7 @@ func NewCoordinator(
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.reconciler = keyspan.NewReconciler(changefeedID, regionCache, cfg.RegionPerSpan)
coord.pdClock = pdClock
return coord, nil
}
Expand All @@ -100,6 +108,7 @@ func newCoordinator(
captureID, changefeedID, revision, cfg.HeartbeatTick),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}),
}
}

Expand All @@ -120,6 +129,7 @@ func (c *coordinator) Tick(
}

// MoveTable implement the scheduler interface
// FIXME: tableID should be Span.
func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -134,7 +144,8 @@ func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
return
}

c.schedulerM.MoveTable(tableID, target)
span := spanz.TableIDToComparableSpan(tableID)
c.schedulerM.MoveTable(span, target)
}

// Rebalance implement the scheduler interface
Expand Down Expand Up @@ -171,11 +182,13 @@ func (c *coordinator) DrainCapture(target model.CaptureID) (int, error) {
}

var count int
for _, rep := range c.replicationM.ReplicationSets() {
if rep.Primary == target {
count++
}
}
c.replicationM.ReplicationSets().Ascend(
func(_ tablepb.Span, rep *replication.ReplicationSet) bool {
if rep.Primary == target {
count++
}
return true
})

if count == 0 {
log.Info("schedulerv3: drain capture request ignored, "+
Expand Down Expand Up @@ -239,12 +252,15 @@ func (c *coordinator) Close(ctx context.Context) {
// ===========

func (c *coordinator) poll(
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.maybeCollectMetrics()
if c.compat.UpdateCaptureInfo(aliveCaptures) {
log.Info("schedulerv3: compat update capture info",
zap.Any("captures", aliveCaptures),
zap.Bool("spanReplicationEnabled", c.compat.CheckSpanReplicationEnabled()))
}

recvMsgs, err := c.recvMsgs(ctx)
if err != nil {
Expand Down Expand Up @@ -294,8 +310,9 @@ func (c *coordinator) poll(
// Generate schedule tasks based on the current status.
replications := c.replicationM.ReplicationSets()
runningTasks := c.replicationM.RunningTasks()
currentSpans := c.reconciler.Reconcile(ctx, currentTables, replications, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentTables, c.captureM.Captures, replications, runningTasks)
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)

// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
Expand Down Expand Up @@ -329,6 +346,7 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro
n++
}
}
c.compat.AfterTransportReceive(recvMsgs[:n])
return recvMsgs[:n], nil
}

Expand All @@ -353,8 +371,8 @@ func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message)
ProcessorEpoch: epoch,
}
m.From = c.captureID

}
c.compat.BeforeTransportSend(msgs)
return c.trans.Send(ctx, msgs)
}

Expand Down
11 changes: 6 additions & 5 deletions cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
tableID := int64(10000 + i)
currentTables = append(currentTables, tableID)
captureID := fmt.Sprint(i % captureCount)
span := tablepb.Span{TableID: tableID}
rep, err := replication.NewReplicationSet(
tableID, 0, map[string]*tablepb.TableStatus{
span, 0, map[string]*tablepb.TableStatus{
captureID: {
TableID: tableID,
State: tablepb.TableStateReplicating,
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
},
}, model.ChangeFeedID{})
if err != nil {
Expand All @@ -163,8 +164,8 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
heartbeatResp[captureID].HeartbeatResponse.Tables = append(
heartbeatResp[captureID].HeartbeatResponse.Tables,
tablepb.TableStatus{
TableID: tableID,
State: tablepb.TableStateReplicating,
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
})
}
recvMsgs := make([]*schedulepb.Message, 0, len(heartbeatResp))
Expand Down
Loading

0 comments on commit 92772d4

Please sign in to comment.