diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 76bdbfb502f..5b03d5b22f5 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" + "github.com/pingcap/tiflow/pkg/sink/mysql" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/prometheus/client_golang/prometheus" @@ -598,6 +599,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { return sink.IsMySQLCompatibleScheme(scheme), nil } +// getPullerSplitUpdateMode returns how to split update kv entries at puller. +// +// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone +// which means don't split any update kv entries at puller; +// If the sinkURI is mysql compatible, it has the following two cases: +// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways, +// which means split all update kv entries at puller; +// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart, +// which means split update kv entries whose commitTS is older than the replicate ts of sink. +func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + if !sink.IsMySQLCompatibleScheme(scheme) { + return sourcemanager.PullerSplitUpdateModeNone, nil + } + // must be mysql sink + isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config) + if err != nil { + return sourcemanager.PullerSplitUpdateModeNone, err + } + if isSinkInSafeMode { + return sourcemanager.PullerSplitUpdateModeAlways, nil + } + return sourcemanager.PullerSplitUpdateModeAtStart, nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { if p.initialized.Load() { @@ -657,19 +687,23 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { return errors.Trace(err) } - isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) + pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig) if err != nil { return errors.Trace(err) } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, - sortEngine, util.GetOrZero(cfConfig.BDRMode), - util.GetOrZero(cfConfig.EnableTableMonitor), - isMysqlBackend) + sortEngine, pullerSplitUpdateMode, + util.GetOrZero(cfConfig.BDRMode), + util.GetOrZero(cfConfig.EnableTableMonitor)) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) + isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) + if err != nil { + return errors.Trace(err) + } p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index b610be1fdf1..a1fd68b9764 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/entry/schema" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/processor/sinkmanager" + "github.com/pingcap/tiflow/cdc/processor/sourcemanager" "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/redo" "github.com/pingcap/tiflow/cdc/scheduler" @@ -41,6 +42,7 @@ import ( redoPkg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" + "github.com/pingcap/tiflow/pkg/util" "github.com/stretchr/testify/require" ) @@ -803,3 +805,73 @@ func TestProcessorNotInitialized(t *testing.T) { p, _, _ := initProcessor4Test(t, &liveness, false, globalVars, changefeedVars) require.Nil(t, p.WriteDebugInfo(os.Stdout)) } + +func TestGetPullerSplitUpdateMode(t *testing.T) { + testCases := []struct { + sinkURI string + config *config.ReplicaConfig + mode sourcemanager.PullerSplitUpdateMode + }{ + { + sinkURI: "kafka://127.0.0.1:9092/ticdc-test2", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeNone, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: nil, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAtStart, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(false), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + { + sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false", + config: &config.ReplicaConfig{ + Sink: &config.SinkConfig{ + SafeMode: util.AddressOf(true), + }, + }, + mode: sourcemanager.PullerSplitUpdateModeAlways, + }, + } + for _, tc := range testCases { + mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config) + require.Nil(t, err) + require.Equal(t, tc.mode, mode) + } +} diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 3afb21f26bb..a47dde6554d 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -35,6 +35,16 @@ import ( const defaultMaxBatchSize = 256 +// PullerSplitUpdateMode is the mode to split update events in puller. +type PullerSplitUpdateMode int32 + +// PullerSplitUpdateMode constants. +const ( + PullerSplitUpdateModeNone PullerSplitUpdateMode = 0 + PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1 + PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2 +) + // SourceManager is the manager of the source engine and puller. type SourceManager struct { ready chan struct{} @@ -51,7 +61,7 @@ type SourceManager struct { // Used to indicate whether the changefeed is in BDR mode. bdrMode bool - safeModeAtStart bool + splitUpdateMode PullerSplitUpdateMode enableTableMonitor bool puller *puller.MultiplexingPuller @@ -63,11 +73,11 @@ func New( up *upstream.Upstream, mg entry.MounterGroup, engine sorter.SortEngine, + splitUpdateMode PullerSplitUpdateMode, bdrMode bool, enableTableMonitor bool, - safeModeAtStart bool, ) *SourceManager { - return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart) + return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor) } // NewForTest creates a new source manager for testing. @@ -97,9 +107,9 @@ func newSourceManager( up *upstream.Upstream, mg entry.MounterGroup, engine sorter.SortEngine, + splitUpdateMode PullerSplitUpdateMode, bdrMode bool, enableTableMonitor bool, - safeModeAtStart bool, ) *SourceManager { mgr := &SourceManager{ ready: make(chan struct{}), @@ -107,9 +117,9 @@ func newSourceManager( up: up, mg: mg, engine: engine, + splitUpdateMode: splitUpdateMode, bdrMode: bdrMode, enableTableMonitor: enableTableMonitor, - safeModeAtStart: safeModeAtStart, } serverConfig := config.GetGlobalServerConfig() @@ -164,7 +174,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo m.engine.AddTable(span, startTs) shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + if raw == nil || !raw.IsUpdate() { + return false + } + switch m.splitUpdateMode { + case PullerSplitUpdateModeNone: + return false + case PullerSplitUpdateModeAlways: + return true + case PullerSplitUpdateModeAtStart: + return isOldUpdateKVEntry(raw, getReplicaTs) + default: + log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode))) + } + log.Panic("Shouldn't reach here") + return false } // Only nil in unit tests. diff --git a/pkg/sink/mysql/config.go b/pkg/sink/mysql/config.go index 4adf68639a8..a10c2574264 100644 --- a/pkg/sink/mysql/config.go +++ b/pkg/sink/mysql/config.go @@ -223,24 +223,26 @@ func mergeConfig( urlParameters *urlConfig, ) (*urlConfig, error) { dest := &urlConfig{} - dest.SafeMode = replicaConfig.Sink.SafeMode - if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil { - mConfig := replicaConfig.Sink.MySQLConfig - dest.WorkerCount = mConfig.WorkerCount - dest.MaxTxnRow = mConfig.MaxTxnRow - dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount - dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize - dest.TiDBTxnMode = mConfig.TiDBTxnMode - dest.SSLCa = mConfig.SSLCa - dest.SSLCert = mConfig.SSLCert - dest.SSLKey = mConfig.SSLKey - dest.TimeZone = mConfig.TimeZone - dest.WriteTimeout = mConfig.WriteTimeout - dest.ReadTimeout = mConfig.ReadTimeout - dest.Timeout = mConfig.Timeout - dest.EnableBatchDML = mConfig.EnableBatchDML - dest.EnableMultiStatement = mConfig.EnableMultiStatement - dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + if replicaConfig != nil && replicaConfig.Sink != nil { + dest.SafeMode = replicaConfig.Sink.SafeMode + if replicaConfig.Sink.MySQLConfig != nil { + mConfig := replicaConfig.Sink.MySQLConfig + dest.WorkerCount = mConfig.WorkerCount + dest.MaxTxnRow = mConfig.MaxTxnRow + dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount + dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize + dest.TiDBTxnMode = mConfig.TiDBTxnMode + dest.SSLCa = mConfig.SSLCa + dest.SSLCert = mConfig.SSLCert + dest.SSLKey = mConfig.SSLKey + dest.TimeZone = mConfig.TimeZone + dest.WriteTimeout = mConfig.WriteTimeout + dest.ReadTimeout = mConfig.ReadTimeout + dest.Timeout = mConfig.Timeout + dest.EnableBatchDML = mConfig.EnableBatchDML + dest.EnableMultiStatement = mConfig.EnableMultiStatement + dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement + } } if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil { return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) @@ -248,6 +250,31 @@ func mergeConfig( return dest, nil } +// IsSinkSafeMode returns whether the sink is in safe mode. +func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) { + if sinkURI == nil { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI") + } + + scheme := strings.ToLower(sinkURI.Scheme) + if !sink.IsMySQLCompatibleScheme(scheme) { + return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme) + } + req := &http.Request{URL: sinkURI} + urlParameter := &urlConfig{} + if err := binding.Query.Bind(req, urlParameter); err != nil { + return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err) + } + var err error + if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil { + return false, err + } + if urlParameter.SafeMode == nil { + return defaultSafeMode, nil + } + return *urlParameter.SafeMode, nil +} + func getWorkerCount(values *urlConfig, workerCount *int) error { if values.WorkerCount == nil { return nil