Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

puller(ticdc): always split update kv entries in sink safe mode (#11224) #11655

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -640,6 +641,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
if p.initialized {
Expand Down Expand Up @@ -696,17 +726,21 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) {
return errors.Trace(err)
}

isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.changefeed.Info.SinkURI, p.changefeed.Info.Config)
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, p.changefeed.Info.Config.BDRMode,
isMysqlBackend)
sortEngine, pullerSplitUpdateMode,
p.changefeed.Info.Config.BDRMode)
p.sourceManager.name = "SourceManager"
p.sourceManager.spawn(stdCtx)

isMysqlBackend, err := isMysqlCompatibleBackend(p.changefeed.Info.SinkURI)
if err != nil {
return errors.Trace(err)
}
p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.changefeed.Info, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
Expand Down
72 changes: 72 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -39,6 +40,7 @@ import (
redoPkg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -751,3 +753,73 @@ func TestProcessorDostNotStuckInInit(t *testing.T) {
require.Nil(t, p.Close())
tester.MustApplyPatches()
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/redo_log_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (suite *redoLogWorkerSuite) createWorker(
) (*redoWorker, engine.SortEngine, *mockRedoDMLManager) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false, false)
&entry.MockMountGroup{}, sortEngine, sourcemanager.PullerSplitUpdateModeNone, false)
go func() { _ = sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (suite *tableSinkWorkerSuite) createWorker(
) (*sinkWorker, engine.SortEngine) {
sortEngine := memory.New(context.Background())
sm := sourcemanager.New(suite.testChangefeedID, upstream.NewUpstream4Test(&MockPD{}),
&entry.MockMountGroup{}, sortEngine, false, false)
&entry.MockMountGroup{}, sortEngine, sourcemanager.PullerSplitUpdateModeNone, false)
go func() { sm.Run(ctx) }()

// To avoid refund or release panics.
Expand Down
34 changes: 29 additions & 5 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,16 @@ import (

const defaultMaxBatchSize = 256

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
ctx context.Context
Expand All @@ -50,11 +60,11 @@ type SourceManager struct {
pullers spanz.SyncMap
// Used to collect errors in running.
errChan chan error
// Used to specify the behavior of splitting update events in puller.
splitUpdateMode PullerSplitUpdateMode
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool

// pullerWrapperCreator is used to create a puller wrapper.
// Only used for testing.
pullerWrapperCreator func(changefeed model.ChangeFeedID,
Expand All @@ -72,8 +82,8 @@ func New(
up *upstream.Upstream,
mg entry.MounterGroup,
engine engine.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
safeModeAtStart bool,
) *SourceManager {
return &SourceManager{
ready: make(chan struct{}),
Expand All @@ -82,8 +92,8 @@ func New(
mg: mg,
engine: engine,
errChan: make(chan error, 16),
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
safeModeAtStart: safeModeAtStart,
pullerWrapperCreator: pullerwrapper.NewPullerWrapper,
}
}
Expand Down Expand Up @@ -117,7 +127,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo
// Add table to the engine first, so that the engine can receive the events from the puller.
m.engine.AddTable(span, startTs)
shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}
p := m.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry)
p.Start(m.ctx, m.up, m.engine, m.errChan)
Expand Down
63 changes: 45 additions & 18 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,31 +212,58 @@ func mergeConfig(
urlParameters *urlConfig,
) (*urlConfig, error) {
dest := &urlConfig{}
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
if replicaConfig != nil && replicaConfig.Sink != nil {
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
}
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
return dest, nil
}

// IsSinkSafeMode returns whether the sink is in safe mode.
func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) {
if sinkURI == nil {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI")
}

scheme := strings.ToLower(sinkURI.Scheme)
if !sink.IsMySQLCompatibleScheme(scheme) {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme)
}
req := &http.Request{URL: sinkURI}
urlParameter := &urlConfig{}
if err := binding.Query.Bind(req, urlParameter); err != nil {
return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
var err error
if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil {
return false, err
}
if urlParameter.SafeMode == nil {
return defaultSafeMode, nil
}
return *urlParameter.SafeMode, nil
}

func getWorkerCount(values *urlConfig, workerCount *int) error {
if values.WorkerCount == nil {
return nil
Expand Down
Loading