Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler(ticdc): replace tableID with Span #7806

Merged
merged 4 commits into from
Dec 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 7 additions & 8 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import (
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
Expand All @@ -52,15 +52,14 @@ func newSchedulerFromCtx(
cfg := config.GetGlobalServerConfig().Debug
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, cfg.Scheduler, pdClock)
messageServer, messageRouter, ownerRev, up.RegionCache, up.PDClock, cfg.Scheduler)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context,
pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, pdClock)
return newSchedulerFromCtx(ctx, up)
}

type changefeed struct {
Expand Down Expand Up @@ -127,7 +126,7 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error)
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand Down Expand Up @@ -165,7 +164,7 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
c.newDDLPuller = newDDLPuller
Expand Down Expand Up @@ -540,7 +539,7 @@ LOOP:
zap.String("changefeed", c.id.ID))

// create scheduler
c.scheduler, err = c.newScheduler(ctx, c.upstream.PDClock)
c.scheduler, err = c.newScheduler(ctx, c.upstream)
if err != nil {
return errors.Trace(err)
}
Expand Down
3 changes: 1 addition & 2 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -216,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
})
Expand Down
5 changes: 2 additions & 3 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/txnutil/gc"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/version"
Expand Down Expand Up @@ -63,7 +62,7 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, pdClock pdutil.Clock) (scheduler.Scheduler, error),
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
Expand Down Expand Up @@ -103,7 +102,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
},
// new scheduler
func(
ctx cdcContext.Context, pdClock pdutil.Clock,
ctx cdcContext.Context, up *upstream.Upstream,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
Expand Down
42 changes: 30 additions & 12 deletions cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/scheduler/internal"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/compat"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/keyspan"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/member"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/replication"
"github.com/pingcap/tiflow/cdc/scheduler/internal/v3/scheduler"
Expand All @@ -31,6 +34,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/p2p"
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/version"
"go.uber.org/zap"
)
Expand All @@ -54,6 +58,8 @@ type coordinator struct {
replicationM *replication.Manager
captureM *member.CaptureManager
schedulerM *scheduler.Manager
reconciler *keyspan.Reconciler
compat *compat.Compat
pdClock pdutil.Clock

lastCollectTime time.Time
Expand All @@ -68,8 +74,9 @@ func NewCoordinator(
messageServer *p2p.MessageServer,
messageRouter p2p.MessageRouter,
ownerRevision int64,
cfg *config.SchedulerConfig,
regionCache keyspan.RegionCache,
pdClock pdutil.Clock,
cfg *config.SchedulerConfig,
) (internal.Scheduler, error) {
trans, err := transport.NewTransport(
ctx, changefeedID, transport.SchedulerRole, messageServer, messageRouter)
Expand All @@ -78,6 +85,7 @@ func NewCoordinator(
}
coord := newCoordinator(captureID, changefeedID, ownerRevision, cfg)
coord.trans = trans
coord.reconciler = keyspan.NewReconciler(changefeedID, regionCache, cfg.RegionPerSpan)
coord.pdClock = pdClock
return coord, nil
}
Expand All @@ -100,6 +108,7 @@ func newCoordinator(
captureID, changefeedID, revision, cfg.HeartbeatTick),
schedulerM: scheduler.NewSchedulerManager(changefeedID, cfg),
changefeedID: changefeedID,
compat: compat.New(cfg, map[model.CaptureID]*model.CaptureInfo{}),
}
}

Expand All @@ -120,6 +129,7 @@ func (c *coordinator) Tick(
}

// MoveTable implement the scheduler interface
// FIXME: tableID should be Span.
func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -134,7 +144,8 @@ func (c *coordinator) MoveTable(tableID model.TableID, target model.CaptureID) {
return
}

c.schedulerM.MoveTable(tableID, target)
span := spanz.TableIDToComparableSpan(tableID)
c.schedulerM.MoveTable(span, target)
}

// Rebalance implement the scheduler interface
Expand Down Expand Up @@ -171,11 +182,13 @@ func (c *coordinator) DrainCapture(target model.CaptureID) (int, error) {
}

var count int
for _, rep := range c.replicationM.ReplicationSets() {
if rep.Primary == target {
count++
}
}
c.replicationM.ReplicationSets().Ascend(
func(_ tablepb.Span, rep *replication.ReplicationSet) bool {
if rep.Primary == target {
count++
}
return true
})

if count == 0 {
log.Info("schedulerv3: drain capture request ignored, "+
Expand Down Expand Up @@ -239,12 +252,15 @@ func (c *coordinator) Close(ctx context.Context) {
// ===========

func (c *coordinator) poll(
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
ctx context.Context, checkpointTs model.Ts, currentTables []model.TableID,
aliveCaptures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
c.maybeCollectMetrics()
if c.compat.UpdateCaptureInfo(aliveCaptures) {
log.Info("schedulerv3: compat update capture info",
zap.Any("captures", aliveCaptures),
zap.Bool("spanReplicationEnabled", c.compat.CheckSpanReplicationEnabled()))
}

recvMsgs, err := c.recvMsgs(ctx)
if err != nil {
Expand Down Expand Up @@ -294,8 +310,9 @@ func (c *coordinator) poll(
// Generate schedule tasks based on the current status.
replications := c.replicationM.ReplicationSets()
runningTasks := c.replicationM.RunningTasks()
currentSpans := c.reconciler.Reconcile(ctx, currentTables, replications, c.compat)
allTasks := c.schedulerM.Schedule(
checkpointTs, currentTables, c.captureM.Captures, replications, runningTasks)
checkpointTs, currentSpans, c.captureM.Captures, replications, runningTasks)

// Handle generated schedule tasks.
msgs, err = c.replicationM.HandleTasks(allTasks)
Expand Down Expand Up @@ -329,6 +346,7 @@ func (c *coordinator) recvMsgs(ctx context.Context) ([]*schedulepb.Message, erro
n++
}
}
c.compat.AfterTransportReceive(recvMsgs[:n])
return recvMsgs[:n], nil
}

Expand All @@ -353,8 +371,8 @@ func (c *coordinator) sendMsgs(ctx context.Context, msgs []*schedulepb.Message)
ProcessorEpoch: epoch,
}
m.From = c.captureID

}
c.compat.BeforeTransportSend(msgs)
return c.trans.Send(ctx, msgs)
}

Expand Down
11 changes: 6 additions & 5 deletions cdc/scheduler/internal/v3/coordinator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
tableID := int64(10000 + i)
currentTables = append(currentTables, tableID)
captureID := fmt.Sprint(i % captureCount)
span := tablepb.Span{TableID: tableID}
rep, err := replication.NewReplicationSet(
tableID, 0, map[string]*tablepb.TableStatus{
span, 0, map[string]*tablepb.TableStatus{
captureID: {
TableID: tableID,
State: tablepb.TableStateReplicating,
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
},
}, model.ChangeFeedID{})
if err != nil {
Expand All @@ -163,8 +164,8 @@ func BenchmarkCoordinatorHeartbeatResponse(b *testing.B) {
heartbeatResp[captureID].HeartbeatResponse.Tables = append(
heartbeatResp[captureID].HeartbeatResponse.Tables,
tablepb.TableStatus{
TableID: tableID,
State: tablepb.TableStateReplicating,
Span: tablepb.Span{TableID: tableID},
State: tablepb.TableStateReplicating,
})
}
recvMsgs := make([]*schedulepb.Message, 0, len(heartbeatResp))
Expand Down
Loading