Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cdc: move scheduler and agent to package base #5396

Merged
merged 10 commits into from
May 13, 2022
36 changes: 29 additions & 7 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
schedulerv2 "github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/scheduler"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand All @@ -39,12 +39,33 @@ import (
"go.uber.org/zap"
)

// newSchedulerV2FromCtx creates a new schedulerV2 from context.
// This function is factored out to facilitate unit testing.
func newSchedulerV2FromCtx(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
ret, err := scheduler.NewScheduler(
ctx, changeFeedID, startTs, messageServer, messageRouter, ownerRev)
if err != nil {
return nil, errors.Trace(err)
}
return ret, nil
}

func newScheduler(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error) {
return newSchedulerV2FromCtx(ctx, startTs)
}

type changefeed struct {
id model.ChangeFeedID
state *orchestrator.ChangefeedReactorState

upStream *upstream.Upstream
scheduler scheduler
scheduler scheduler.Scheduler
barriers *barriers
feedStateManager *feedStateManager
redoManager redo.LogManager
Expand Down Expand Up @@ -85,7 +106,7 @@ type changefeed struct {

newDDLPuller func(ctx cdcContext.Context, upStream *upstream.Upstream, startTs uint64) (DDLPuller, error)
newSink func() DDLSink
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler, error)
newScheduler func(ctx cdcContext.Context, startTs uint64) (scheduler.Scheduler, error)
}

func newChangefeed(id model.ChangeFeedID, upStream *upstream.Upstream) *changefeed {
Expand Down Expand Up @@ -233,7 +254,8 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed
}

startTime := time.Now()
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(ctx, c.state, c.schema.AllPhysicalTables(), captures)
newCheckpointTs, newResolvedTs, err := c.scheduler.Tick(
ctx, c.state.Status.CheckpointTs, c.schema.AllPhysicalTables(), captures)
costTime := time.Since(startTime)
if costTime > schedulerLogsWarnDuration {
log.Warn("scheduler tick took too long",
Expand All @@ -249,7 +271,7 @@ func (c *changefeed) tick(ctx cdcContext.Context, state *orchestrator.Changefeed

// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if newCheckpointTs != schedulerv2.CheckpointCannotProceed {
if newCheckpointTs != scheduler.CheckpointCannotProceed {
if newResolvedTs > barrierTs {
newResolvedTs = barrierTs
}
Expand Down Expand Up @@ -669,8 +691,8 @@ func (c *changefeed) Close(ctx cdcContext.Context) {
}

// GetInfoProvider returns an InfoProvider if one is available.
func (c *changefeed) GetInfoProvider() schedulerv2.InfoProvider {
if provider, ok := c.scheduler.(schedulerv2.InfoProvider); ok {
func (c *changefeed) GetInfoProvider() scheduler.InfoProvider {
if provider, ok := c.scheduler.(scheduler.InfoProvider); ok {
return provider
}
return nil
Expand Down
11 changes: 7 additions & 4 deletions cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
timodel "github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -147,8 +148,8 @@ type mockScheduler struct {
}

func (m *mockScheduler) Tick(
ctx cdcContext.Context,
state *orchestrator.ChangefeedReactorState,
ctx context.Context,
checkpointTs model.Ts,
currentTables []model.TableID,
captures map[model.CaptureID]*model.CaptureInfo,
) (newCheckpointTs, newResolvedTs model.Ts, err error) {
Expand All @@ -163,7 +164,7 @@ func (m *mockScheduler) MoveTable(tableID model.TableID, target model.CaptureID)
func (m *mockScheduler) Rebalance() {}

// Close closes the scheduler and releases resources.
func (m *mockScheduler) Close(ctx cdcContext.Context) {}
func (m *mockScheduler) Close(ctx context.Context) {}

func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
*changefeed, *orchestrator.ChangefeedReactorState,
Expand All @@ -183,7 +184,9 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T) (
recordDDLHistory: false,
}
})
cf.newScheduler = func(ctx cdcContext.Context, startTs uint64) (scheduler, error) {
cf.newScheduler = func(
ctx cdcContext.Context, startTs uint64,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
}
cf.upStream = upStream
Expand Down
54 changes: 34 additions & 20 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
tablepipeline "github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/cdc/sink/metrics"
"github.com/pingcap/tiflow/cdc/sorter/memory"
Expand Down Expand Up @@ -80,9 +81,9 @@ type processor struct {

lazyInit func(ctx cdcContext.Context) error
createTablePipeline func(ctx cdcContext.Context, tableID model.TableID, replicaInfo *model.TableReplicaInfo) (tablepipeline.TablePipeline, error)
newAgent func(ctx cdcContext.Context) (processorAgent, error)
newAgent func(ctx cdcContext.Context) (scheduler.Agent, error)

agent processorAgent
agent scheduler.Agent
checkpointTs model.Ts
resolvedTs model.Ts

Expand All @@ -104,31 +105,37 @@ func (p *processor) checkReadyForMessages() bool {
}

// AddTable implements TableExecutor interface.
func (p *processor) AddTable(ctx cdcContext.Context, tableID model.TableID, startTs model.Ts) (bool, error) {
func (p *processor) AddTable(
ctx context.Context, tableID model.TableID, startTs model.Ts,
) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

log.Info("adding table",
zap.Int64("tableID", tableID),
cdcContext.ZapFieldChangefeed(ctx))
err := p.addTable(ctx, tableID, &model.TableReplicaInfo{StartTs: startTs})
zap.Uint64("checkpointTs", startTs),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID))
err := p.addTable(ctx.(cdcContext.Context), tableID, &model.TableReplicaInfo{StartTs: startTs})
if err != nil {
return false, errors.Trace(err)
}
return true, nil
}

// RemoveTable implements TableExecutor interface.
func (p *processor) RemoveTable(ctx cdcContext.Context, tableID model.TableID) (bool, error) {
func (p *processor) RemoveTable(ctx context.Context, tableID model.TableID) (bool, error) {
if !p.checkReadyForMessages() {
return false, nil
}

table, ok := p.tables[tableID]
if !ok {
log.Warn("table which will be deleted is not found",
cdcContext.ZapFieldChangefeed(ctx), zap.Int64("tableID", tableID))
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
return true, nil
}

Expand All @@ -137,7 +144,8 @@ func (p *processor) RemoveTable(ctx cdcContext.Context, tableID model.TableID) (
// We use a Debug log because it is conceivable for the pipeline to block for a legitimate reason,
// and we do not want to alarm the user.
log.Debug("AsyncStop has failed, possible due to a full pipeline",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Uint64("checkpointTs", table.CheckpointTs()),
zap.Int64("tableID", tableID))
return false, nil
Expand All @@ -146,15 +154,16 @@ func (p *processor) RemoveTable(ctx cdcContext.Context, tableID model.TableID) (
}

// IsAddTableFinished implements TableExecutor interface.
func (p *processor) IsAddTableFinished(ctx cdcContext.Context, tableID model.TableID) bool {
func (p *processor) IsAddTableFinished(ctx context.Context, tableID model.TableID) bool {
if !p.checkReadyForMessages() {
return false
}

table, exist := p.tables[tableID]
if !exist {
log.Panic("table which was added is not found",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
}
localResolvedTs := p.resolvedTs
Expand All @@ -176,27 +185,30 @@ func (p *processor) IsAddTableFinished(ctx cdcContext.Context, tableID model.Tab
return false
}
log.Info("Add Table finished",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
return true
}

// IsRemoveTableFinished implements TableExecutor interface.
func (p *processor) IsRemoveTableFinished(ctx cdcContext.Context, tableID model.TableID) bool {
func (p *processor) IsRemoveTableFinished(ctx context.Context, tableID model.TableID) bool {
if !p.checkReadyForMessages() {
return false
}

table, exist := p.tables[tableID]
if !exist {
log.Panic("table which was deleted is not found",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))
return true
}
if table.Status() != tablepipeline.TableStatusStopped {
log.Debug("the table is still not stopped",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Uint64("checkpointTs", table.CheckpointTs()),
zap.Int64("tableID", tableID))
return false
Expand All @@ -206,7 +218,8 @@ func (p *processor) IsRemoveTableFinished(ctx cdcContext.Context, tableID model.
table.Wait()
delete(p.tables, tableID)
log.Info("Remove Table finished",
cdcContext.ZapFieldChangefeed(ctx),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeedID.ID),
zap.Int64("tableID", tableID))

return true
Expand Down Expand Up @@ -475,14 +488,14 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
}
opts[metrics.OptCaptureAddr] = ctx.GlobalVars().CaptureInfo.AdvertiseAddr
log.Info("processor try new sink",
zap.String("namespace", p.changefeed.ID.Namespace),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeed.ID.ID))

start := time.Now()
s, err := sink.New(stdCtx, p.changefeed.ID, p.changefeed.Info.SinkURI, p.filter, p.changefeed.Info.Config, opts, errCh)
if err != nil {
log.Info("processor new sink failed",
zap.String("namespace", p.changefeed.ID.Namespace),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeed.ID.ID),
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
Expand All @@ -509,10 +522,11 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
return nil
}

func (p *processor) newAgentImpl(ctx cdcContext.Context) (processorAgent, error) {
func (p *processor) newAgentImpl(ctx cdcContext.Context) (scheduler.Agent, error) {
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ret, err := newAgent(ctx, messageServer, messageRouter, p, p.changefeedID)
etcdClient := ctx.GlobalVars().EtcdClient
ret, err := scheduler.NewAgent(ctx, messageServer, messageRouter, etcdClient, p, p.changefeedID)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -882,7 +896,7 @@ func (p *processor) flushRedoLogMeta(ctx context.Context) error {

func (p *processor) Close() error {
log.Info("processor closing ...",
zap.String("namespace", p.changefeed.ID.Namespace),
zap.String("namespace", p.changefeedID.Namespace),
zap.String("changefeed", p.changefeed.ID.ID))
for _, tbl := range p.tables {
tbl.Cancel()
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,14 @@ func (s *mockSchemaStorage) DoGC(ts uint64) uint64 {

type mockAgent struct {
// dummy to satisfy the interface
processorAgent
scheduler.Agent

executor scheduler.TableExecutor
lastCheckpointTs model.Ts
isClosed bool
}

func (a *mockAgent) Tick(_ cdcContext.Context) error {
func (a *mockAgent) Tick(_ context.Context) error {
if len(a.executor.GetAllCurrentTables()) == 0 {
return nil
}
Expand Down
36 changes: 36 additions & 0 deletions cdc/scheduler/internal/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package internal

import (
"context"

"github.com/pingcap/tiflow/cdc/model"
)

// Agent is an interface for an object inside Processor that is responsible
// for receiving commands from the Owner.
// Ideally the processor should drive the Agent by Tick.
//
// Note that Agent is not thread-safe
type Agent interface {
// Tick is called periodically by the processor to drive the Agent's internal logic.
Tick(ctx context.Context) error

// GetLastSentCheckpointTs returns the last checkpoint-ts already sent to the Owner.
GetLastSentCheckpointTs() (checkpointTs model.Ts)

// Close closes the messenger and does the necessary cleanup.
Close() error
}
Loading