Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ticdc: disable span replication in mysql/tidb changefeed #7909

Merged
merged 10 commits into from
Dec 23, 2022
8 changes: 5 additions & 3 deletions cdc/capture/capture.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,9 @@ type captureImpl struct {
captureInfo *model.CaptureInfo,
upstreamManager *upstream.Manager,
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) processor.Manager
newOwner func(upstreamManager *upstream.Manager) owner.Owner
newOwner func(upstreamManager *upstream.Manager, cfg *config.SchedulerConfig) owner.Owner
}

// NewCapture returns a new Capture instance
Expand Down Expand Up @@ -238,7 +239,8 @@ func (c *captureImpl) reset(ctx context.Context) error {
return cerror.WrapError(cerror.ErrNewCaptureFailed, err)
}

c.processorManager = c.newProcessorManager(c.info, c.upstreamManager, &c.liveness)
c.processorManager = c.newProcessorManager(
c.info, c.upstreamManager, &c.liveness, c.config.Debug.Scheduler)
if c.session != nil {
// It can't be handled even after it fails, so we ignore it.
_ = c.session.Close()
Expand Down Expand Up @@ -462,7 +464,7 @@ func (c *captureImpl) campaignOwner(ctx cdcContext.Context) error {
zap.String("captureID", c.info.ID),
zap.Int64("ownerRev", ownerRev))

owner := c.newOwner(c.upstreamManager)
owner := c.newOwner(c.upstreamManager, c.config.Debug.Scheduler)
c.setOwner(owner)

globalState := orchestrator.NewGlobalState(c.EtcdClient.GetClusterID())
Expand Down
31 changes: 22 additions & 9 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/puller"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand All @@ -42,24 +43,23 @@ import (
// newSchedulerFromCtx creates a new scheduler from context.
// This function is factored out to facilitate unit testing.
func newSchedulerFromCtx(
ctx cdcContext.Context, up *upstream.Upstream,
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (ret scheduler.Scheduler, err error) {
changeFeedID := ctx.ChangefeedVars().ID
messageServer := ctx.GlobalVars().MessageServer
messageRouter := ctx.GlobalVars().MessageRouter
ownerRev := ctx.GlobalVars().OwnerRevision
captureID := ctx.GlobalVars().CaptureInfo.ID
cfg := config.GetGlobalServerConfig().Debug
ret, err = scheduler.NewScheduler(
ctx, captureID, changeFeedID,
messageServer, messageRouter, ownerRev, up.RegionCache, up.PDClock, cfg.Scheduler)
messageServer, messageRouter, ownerRev, up.RegionCache, up.PDClock, cfg)
return ret, errors.Trace(err)
}

func newScheduler(
ctx cdcContext.Context, up *upstream.Upstream,
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error) {
return newSchedulerFromCtx(ctx, up)
return newSchedulerFromCtx(ctx, up, cfg)
}

type changefeed struct {
Expand All @@ -68,6 +68,7 @@ type changefeed struct {
state *orchestrator.ChangefeedReactorState

upstream *upstream.Upstream
cfg *config.SchedulerConfig
scheduler scheduler.Scheduler
// barriers will be created when a changefeed is initialized
// and will be destroyed when a changefeed is closed.
Expand Down Expand Up @@ -126,7 +127,9 @@ type changefeed struct {
) (puller.DDLPuller, error)

newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(error)) DDLSink
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error)
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error)

lastDDLTs uint64 // Timestamp of the last executed DDL. Only used for tests.
}
Expand All @@ -135,6 +138,7 @@ func newChangefeed(
id model.ChangeFeedID,
state *orchestrator.ChangefeedReactorState,
up *upstream.Upstream,
cfg *config.SchedulerConfig,
) *changefeed {
c := &changefeed{
id: id,
Expand All @@ -152,6 +156,7 @@ func newChangefeed(
newSink: newDDLSink,
}
c.newScheduler = newScheduler
c.cfg = cfg
return c
}

Expand All @@ -164,9 +169,12 @@ func newChangefeed4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
) *changefeed {
c := newChangefeed(id, state, up)
cfg := config.NewDefaultSchedulerConfig()
c := newChangefeed(id, state, up, cfg)
c.newDDLPuller = newDDLPuller
c.newSink = newSink
c.newScheduler = newScheduler
Expand Down Expand Up @@ -539,7 +547,12 @@ LOOP:
zap.String("changefeed", c.id.ID))

// create scheduler
c.scheduler, err = c.newScheduler(ctx, c.upstream)
// TODO: Remove the hack once span replication is compatible with all sinks.
cfg := *c.cfg
if !sink.IsSinkCompatibleWithSpanReplication(c.state.Info.SinkURI) {
cfg.RegionPerSpan = 0
}
c.scheduler, err = c.newScheduler(ctx, c.upstream, &cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/owner/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func createChangefeed4Test(ctx cdcContext.Context, t *testing.T,
},
// new scheduler
func(
ctx cdcContext.Context, up *upstream.Upstream,
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
})
Expand Down
11 changes: 9 additions & 2 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/scheduler"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -131,7 +132,9 @@ type ownerImpl struct {
id model.ChangeFeedID,
state *orchestrator.ChangefeedReactorState,
up *upstream.Upstream,
cfg *config.SchedulerConfig,
) *changefeed
cfg *config.SchedulerConfig

// removedChangefeed is a workload of https://github.com/pingcap/tiflow/issues/7657
// by delaying recreate changefeed with the same ID.
Expand All @@ -141,7 +144,10 @@ type ownerImpl struct {
}

// NewOwner creates a new Owner
func NewOwner(upstreamManager *upstream.Manager) Owner {
func NewOwner(
upstreamManager *upstream.Manager,
cfg *config.SchedulerConfig,
) Owner {
return &ownerImpl{
upstreamManager: upstreamManager,
changefeeds: make(map[model.ChangeFeedID]*changefeed),
Expand All @@ -150,6 +156,7 @@ func NewOwner(upstreamManager *upstream.Manager) Owner {
logLimiter: rate.NewLimiter(versionInconsistentLogRate, versionInconsistentLogRate),
removedChangefeed: make(map[model.ChangeFeedID]time.Time),
removedSinkURI: make(map[url.URL]time.Time),
cfg: cfg,
}
}

Expand Down Expand Up @@ -206,7 +213,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt
upstreamInfo := state.Upstreams[changefeedState.Info.UpstreamID]
up = o.upstreamManager.AddUpstream(upstreamInfo)
}
cfReactor = o.newChangefeed(changefeedID, changefeedState, up)
cfReactor = o.newChangefeed(changefeedID, changefeedState, up, o.cfg)
o.changefeeds[changefeedID] = cfReactor
}
ctx = cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
Expand Down
11 changes: 7 additions & 4 deletions cdc/owner/owner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,20 @@ func newOwner4Test(
changefeed model.ChangeFeedID,
) (puller.DDLPuller, error),
newSink func(changefeedID model.ChangeFeedID, info *model.ChangeFeedInfo, reportErr func(err error)) DDLSink,
newScheduler func(ctx cdcContext.Context, up *upstream.Upstream) (scheduler.Scheduler, error),
newScheduler func(
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error),
pdClient pd.Client,
) Owner {
m := upstream.NewManager4Test(pdClient)
o := NewOwner(m).(*ownerImpl)
o := NewOwner(m, config.NewDefaultSchedulerConfig()).(*ownerImpl)
// Most tests do not need to test bootstrap.
o.bootstrapped = true
o.newChangefeed = func(
id model.ChangeFeedID,
state *orchestrator.ChangefeedReactorState,
up *upstream.Upstream,
cfg *config.SchedulerConfig,
) *changefeed {
return newChangefeed4Test(id, state, up, newDDLPuller, newSink, newScheduler)
}
Expand Down Expand Up @@ -102,7 +105,7 @@ func createOwner4Test(ctx cdcContext.Context, t *testing.T) (*ownerImpl, *orches
},
// new scheduler
func(
ctx cdcContext.Context, up *upstream.Upstream,
ctx cdcContext.Context, up *upstream.Upstream, cfg *config.SchedulerConfig,
) (scheduler.Scheduler, error) {
return &mockScheduler{}, nil
},
Expand Down Expand Up @@ -417,7 +420,7 @@ func TestAdminJob(t *testing.T) {
func TestUpdateGCSafePoint(t *testing.T) {
mockPDClient := &gc.MockPDClient{}
m := upstream.NewManager4Test(mockPDClient)
o := NewOwner(m).(*ownerImpl)
o := NewOwner(m, config.NewDefaultSchedulerConfig()).(*ownerImpl)
ctx := cdcContext.NewBackendContext4Test(true)
ctx, cancel := cdcContext.WithCancel(ctx)
defer cancel()
Expand Down
16 changes: 15 additions & 1 deletion cdc/processor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/orchestrator"
Expand Down Expand Up @@ -71,7 +73,9 @@ type managerImpl struct {
model.ChangeFeedID,
*upstream.Upstream,
*model.Liveness,
*config.SchedulerConfig,
) *processor
cfg *config.SchedulerConfig

metricProcessorCloseDuration prometheus.Observer
}
Expand All @@ -81,6 +85,7 @@ func NewManager(
captureInfo *model.CaptureInfo,
upstreamManager *upstream.Manager,
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) Manager {
return &managerImpl{
captureInfo: captureInfo,
Expand All @@ -90,6 +95,7 @@ func NewManager(
upstreamManager: upstreamManager,
newProcessor: newProcessor,
metricProcessorCloseDuration: processorCloseDuration,
cfg: cfg,
}
}

Expand Down Expand Up @@ -118,7 +124,15 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta
up = m.upstreamManager.AddUpstream(upstreamInfo)
}
failpoint.Inject("processorManagerHandleNewChangefeedDelay", nil)
p = m.newProcessor(changefeedState, m.captureInfo, changefeedID, up, m.liveness)

// TODO: Remove the hack once span replication is compatible with
// all sinks.
cfg := *m.cfg
if !sink.IsSinkCompatibleWithSpanReplication(changefeedState.Info.SinkURI) {
cfg.RegionPerSpan = 0
}
p = m.newProcessor(
changefeedState, m.captureInfo, changefeedID, up, m.liveness, &cfg)
m.processors[changefeedID] = p
}
ctx := cdcContext.WithChangefeedVars(ctx, &cdcContext.ChangefeedVars{
Expand Down
12 changes: 8 additions & 4 deletions cdc/processor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,17 @@ func NewManager4Test(
liveness *model.Liveness,
) *managerImpl {
captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"}
m := NewManager(captureInfo, upstream.NewManager4Test(nil), liveness).(*managerImpl)
cfg := config.NewDefaultSchedulerConfig()
m := NewManager(captureInfo, upstream.NewManager4Test(nil), liveness, cfg).(*managerImpl)
m.newProcessor = func(
state *orchestrator.ChangefeedReactorState,
captureInfo *model.CaptureInfo,
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) *processor {
return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness)
return newProcessor4Test(t, state, captureInfo, createTablePipeline, m.liveness, cfg)
}
return m
}
Expand Down Expand Up @@ -236,7 +238,8 @@ func TestClose(t *testing.T) {

func TestSendCommandError(t *testing.T) {
liveness := model.LivenessCaptureAlive
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness).(*managerImpl)
cfg := config.NewDefaultSchedulerConfig()
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness, cfg).(*managerImpl)
ctx, cancel := context.WithCancel(context.TODO())
cancel()
// Use unbuffered channel to stable test.
Expand Down Expand Up @@ -298,7 +301,8 @@ func TestManagerLiveness(t *testing.T) {

func TestQueryTableCount(t *testing.T) {
liveness := model.LivenessCaptureAlive
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness).(*managerImpl)
cfg := config.NewDefaultSchedulerConfig()
m := NewManager(&model.CaptureInfo{ID: "capture-test"}, nil, &liveness, cfg).(*managerImpl)
ctx := context.TODO()
// Add some tables to processor.
tables := spanz.NewMap[tablepb.TablePipeline]()
Expand Down
3 changes: 3 additions & 0 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ type processor struct {
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error)
newAgent func(cdcContext.Context, *model.Liveness) (scheduler.Agent, error)
cfg *config.SchedulerConfig

liveness *model.Liveness
agent scheduler.Agent
Expand Down Expand Up @@ -537,6 +538,7 @@ func newProcessor(
changefeedID model.ChangeFeedID,
up *upstream.Upstream,
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) *processor {
p := &processor{
changefeed: state,
Expand Down Expand Up @@ -580,6 +582,7 @@ func newProcessor(
p.createTablePipeline = p.createTablePipelineImpl
p.lazyInit = p.lazyInitImpl
p.newAgent = p.newAgentImpl
p.cfg = cfg
return p
}

Expand Down
7 changes: 5 additions & 2 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
mocksink "github.com/pingcap/tiflow/cdc/sink/mock"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/etcd"
Expand All @@ -48,12 +49,13 @@ func newProcessor4Test(
ctx cdcContext.Context, span tablepb.Span, replicaInfo *model.TableReplicaInfo,
) (tablepb.TablePipeline, error),
liveness *model.Liveness,
cfg *config.SchedulerConfig,
) *processor {
up := upstream.NewUpstream4Test(nil)
p := newProcessor(
state,
captureInfo,
model.ChangeFeedID4Test("processor-test", "processor-test"), up, liveness)
model.ChangeFeedID4Test("processor-test", "processor-test"), up, liveness, cfg)
p.lazyInit = func(ctx cdcContext.Context) error {
p.agent = &mockAgent{executor: p}
p.sinkV1 = mocksink.NewNormalMockSink()
Expand Down Expand Up @@ -106,7 +108,8 @@ func initProcessor4Test(
changefeed := orchestrator.NewChangefeedReactorState(
etcd.DefaultCDCClusterID, ctx.ChangefeedVars().ID)
captureInfo := &model.CaptureInfo{ID: "capture-test", AdvertiseAddr: "127.0.0.1:0000"}
p := newProcessor4Test(t, changefeed, captureInfo, newMockTablePipeline, liveness)
cfg := config.NewDefaultSchedulerConfig()
p := newProcessor4Test(t, changefeed, captureInfo, newMockTablePipeline, liveness, cfg)

captureID := ctx.GlobalVars().CaptureInfo.ID
changefeedID := ctx.ChangefeedVars().ID
Expand Down
9 changes: 9 additions & 0 deletions cdc/sink/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package sink
import (
"context"
"net/url"
"strings"

"github.com/pingcap/tiflow/cdc/contextutil"
"github.com/pingcap/tiflow/cdc/model"
Expand Down Expand Up @@ -143,3 +144,11 @@ func checkBDRMode(ctx context.Context, sinkURI *url.URL, replicaConfig *config.R
}
return nil
}

// IsSinkCompatibleWithSpanReplication returns true if the sink uri is
// compatible with span replication.
func IsSinkCompatibleWithSpanReplication(sinkURI string) bool {
u, err := url.Parse(sinkURI)
return err == nil &&
(strings.Contains(u.Scheme, "kafka") || strings.Contains(u.Scheme, "blackhole"))
}
Loading