diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index d463e6f4b08..1febabe58c5 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -44,6 +44,7 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -640,6 +641,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { return sink.IsMySQLCompatibleScheme(scheme), nil } +// getPullerSplitUpdateMode returns how to split update kv entries at puller. +// +// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone +// which means don't split any update kv entries at puller; +// If the sinkURI is mysql compatible, it has the following two cases: +// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways, +// which means split all update kv entries at puller; +// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart, +// which means split update kv entries whose commitTS is older than the replicate ts of sink. +func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + if !sink.IsMySQLCompatibleScheme(scheme) { + return sourcemanager.PullerSplitUpdateModeNone, nil + } + // must be mysql sink + isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, err + } + if isSinkInSafeMode { + return sourcemanager.PullerSplitUpdateModeAlways, nil + } + return sourcemanager.PullerSplitUpdateModeAtStart, nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { if p.initialized { @@ -696,17 +726,21 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } - isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI) + pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.changefeed.Info.SinkURI, p.changefeed.Info.Config) if err != nil { return errors.Trace(err) } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, p.changefeed.Info.Config.BDRMode, - isMysqlBackend) + sortEngine, pullerSplitUpdateMode, + p.changefeed.Info.Config.BDRMode) p.sourceManager.name = "SourceManager" p.sourceManager.spawn(stdCtx) + isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sinkManager.r = sinkmanager.New( p.changefeedID, p.changefeed.Info, p.upstream, p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index 370b8a6b6d1..f54e333f4a4 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sinkmanager" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" @@ -39,6 +40,7 @@ import ( redoPkg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -751,3 +753,73 @@ func TestProcessorDostNotStuckInInit(t *testing.T) { require.Nil(t, p.Close()) tester.MustApplyPatches() } + +func TestGetPullerSplitUpdateMode(t *testing.T) { + testCases := []struct { + sinkURI string + config *config.ReplicaConfig + mode sourcemanager.PullerSplitUpdateMode + }{ + { + sinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeNone, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + } + for _, tc := range testCases { + mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config) + require.Nil(t, err) + require.Equal(t, tc.mode, mode) + } +} diff --git a/cdc/processor/sinkmanager/redo_log_worker_test.go b/cdc/processor/sinkmanager/redo_log_worker_test.go index b7cce3775cb..18cc5beb69f 100644 --- a/cdc/processor/sinkmanager/redo_log_worker_test.go +++ b/cdc/processor/sinkmanager/redo_log_worker_test.go @@ -62,7 +62,7 @@ func (suite *redoLogWorkerSuite) createWorker( ) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), - &entry.MockMountGroup{}, sortEngine, false, false) + &entry.MockMountGroup{}, sortEngine, sourcemanager.PullerSplitUpdateModeNone, false) go func() { _ = sm.Run(ctx) }() // To avoid refund or release panics. diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index 586ce0c138c..9a31a23aacc 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -129,7 +129,7 @@ func (suite *tableSinkWorkerSuite) createWorker( ) (*sinkWorker, engine.SortEngine) { sortEngine := memory.New(context.Background()) sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}), - &entry.MockMountGroup{}, sortEngine, false, false) + &entry.MockMountGroup{}, sortEngine, sourcemanager.PullerSplitUpdateModeNone, false) go func() { sm.Run(ctx) }() // To avoid refund or release panics. diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index b3345ff4387..e7ccf3cdc15 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -32,6 +32,16 @@ import ( const defaultMaxBatchSize = 256 +// PullerSplitUpdateMode is the mode to split update events in puller. +type PullerSplitUpdateMode int32 + +// PullerSplitUpdateMode constants. +const ( + PullerSplitUpdateModeNone PullerSplitUpdateMode = 0 + PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 + PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 +) + // SourceManager is the manager of the source engine and puller. type SourceManager struct { ctx context.Context @@ -50,11 +60,11 @@ type SourceManager struct { pullers spanz.SyncMap // Used to collect errors in running. errChan chan error + // Used to specify the behavior of splitting update events in puller. + splitUpdateMode PullerSplitUpdateMode // Used to indicate whether the changefeed is in BDR mode. bdrMode bool - safeModeAtStart bool - // pullerWrapperCreator is used to create a puller wrapper. // Only used for testing. pullerWrapperCreator func(changefeed model.ChangeFeedID, @@ -72,8 +82,8 @@ func New( up *upstream.Upstream, mg entry.MounterGroup, engine engine.SortEngine, + splitUpdateMode PullerSplitUpdateMode, bdrMode bool, - safeModeAtStart bool, ) *SourceManager { return &SourceManager{ ready: make(chan struct{}), @@ -82,8 +92,8 @@ func New( mg: mg, engine: engine, errChan: make(chan error, 16), + splitUpdateMode: splitUpdateMode, bdrMode: bdrMode, - safeModeAtStart: safeModeAtStart, pullerWrapperCreator: pullerwrapper.NewPullerWrapper, } } @@ -117,7 +127,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(span, startTs) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + if raw == nil || !raw.IsUpdate() { + return false + } + switch m.splitUpdateMode { + case PullerSplitUpdateModeNone: + return false + case PullerSplitUpdateModeAlways: + return true + case PullerSplitUpdateModeAtStart: + return isOldUpdateKVEntry(raw, getReplicaTs) + default: + log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode))) + } + log.Panic("Shouldn't reach here") + return false } p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry) p.Start(m.ctx, m.up, m.engine, m.errChan) diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 6ad2795a6a4..ffb6ee02479 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -212,24 +212,26 @@ func mergeConfig( urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} - dest.SafeMode = replicaConfig.Sink.SafeMode - if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil { - mConfig := replicaConfig.Sink.MySQLConfig - dest.WorkerCount = mConfig.WorkerCount - dest.MaxTxnRow = mConfig.MaxTxnRow - dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount - dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize - dest.TiDBTxnMode = mConfig.TiDBTxnMode - dest.SSLCa = mConfig.SSLCa - dest.SSLCert = mConfig.SSLCert - dest.SSLKey = mConfig.SSLKey - dest.TimeZone = mConfig.TimeZone - dest.WriteTimeout = mConfig.WriteTimeout - dest.ReadTimeout = mConfig.ReadTimeout - dest.Timeout = mConfig.Timeout - dest.EnableBatchDML = mConfig.EnableBatchDML - dest.EnableMultiStatement = mConfig.EnableMultiStatement - dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + if replicaConfig != nil && replicaConfig.Sink != nil { + dest.SafeMode = replicaConfig.Sink.SafeMode + if replicaConfig.Sink.MySQLConfig != nil { + mConfig := replicaConfig.Sink.MySQLConfig + dest.WorkerCount = mConfig.WorkerCount + dest.MaxTxnRow = mConfig.MaxTxnRow + dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount + dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize + dest.TiDBTxnMode = mConfig.TiDBTxnMode + dest.SSLCa = mConfig.SSLCa + dest.SSLCert = mConfig.SSLCert + dest.SSLKey = mConfig.SSLKey + dest.TimeZone = mConfig.TimeZone + dest.WriteTimeout = mConfig.WriteTimeout + dest.ReadTimeout = mConfig.ReadTimeout + dest.Timeout = mConfig.Timeout + dest.EnableBatchDML = mConfig.EnableBatchDML + dest.EnableMultiStatement = mConfig.EnableMultiStatement + dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + } } if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) @@ -237,6 +239,31 @@ func mergeConfig( return dest, nil } +// IsSinkSafeMode returns whether the sink is in safe mode. +func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) { + if sinkURI == nil { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI") + } + + scheme := strings.ToLower(sinkURI.Scheme) + if !sink.IsMySQLCompatibleScheme(scheme) { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme) + } + req := &http.Request{URL: sinkURI} + urlParameter := &urlConfig{} + if err := binding.Query.Bind(req, urlParameter); err != nil { + return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + var err error + if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil { + return false, err + } + if urlParameter.SafeMode == nil { + return defaultSafeMode, nil + } + return *urlParameter.SafeMode, nil +} + func getWorkerCount(values *urlConfig, workerCount *int) error { if values.WorkerCount == nil { return nil