Skip to content

Commit

Permalink
puller(ticdc): always split update kv entries in sink safe mode (#11224)
Browse files Browse the repository at this point in the history
close #11231
  • Loading branch information
lidezhu authored and ti-chi-bot committed Oct 12, 2024
1 parent 92c1bbd commit daf3eb4
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 28 deletions.
42 changes: 38 additions & 4 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tiflow/pkg/pdutil"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/mysql"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/prometheus/client_golang/prometheus"
Expand Down Expand Up @@ -598,6 +599,35 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) {
return sink.IsMySQLCompatibleScheme(scheme), nil
}

// getPullerSplitUpdateMode returns how to split update kv entries at puller.
//
// If the sinkURI is not mysql compatible, it returns PullerSplitUpdateModeNone
// which means don't split any update kv entries at puller;
// If the sinkURI is mysql compatible, it has the following two cases:
// 1. if the user config safe mode in sink module, it returns PullerSplitUpdateModeAlways,
// which means split all update kv entries at puller;
// 2. if the user does not config safe mode in sink module, it returns PullerSplitUpdateModeAtStart,
// which means split update kv entries whose commitTS is older than the replicate ts of sink.
func getPullerSplitUpdateMode(sinkURIStr string, config *config.ReplicaConfig) (sourcemanager.PullerSplitUpdateMode, error) {
sinkURI, err := url.Parse(sinkURIStr)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, cerror.WrapError(cerror.ErrSinkURIInvalid, err)
}

Check warning on line 615 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L614-L615

Added lines #L614 - L615 were not covered by tests
scheme := sink.GetScheme(sinkURI)
if !sink.IsMySQLCompatibleScheme(scheme) {
return sourcemanager.PullerSplitUpdateModeNone, nil
}
// must be mysql sink
isSinkInSafeMode, err := mysql.IsSinkSafeMode(sinkURI, config)
if err != nil {
return sourcemanager.PullerSplitUpdateModeNone, err
}

Check warning on line 624 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L623-L624

Added lines #L623 - L624 were not covered by tests
if isSinkInSafeMode {
return sourcemanager.PullerSplitUpdateModeAlways, nil
}
return sourcemanager.PullerSplitUpdateModeAtStart, nil
}

// lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick.
func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
if p.initialized.Load() {
Expand Down Expand Up @@ -657,19 +687,23 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) {
return errors.Trace(err)
}

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
pullerSplitUpdateMode, err := getPullerSplitUpdateMode(p.latestInfo.SinkURI, cfConfig)

Check warning on line 690 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L690

Added line #L690 was not covered by tests
if err != nil {
return errors.Trace(err)
}
p.sourceManager.r = sourcemanager.New(
p.changefeedID, p.upstream, p.mg.r,
sortEngine, util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor),
isMysqlBackend)
sortEngine, pullerSplitUpdateMode,
util.GetOrZero(cfConfig.BDRMode),
util.GetOrZero(cfConfig.EnableTableMonitor))

Check warning on line 698 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L696-L698

Added lines #L696 - L698 were not covered by tests
p.sourceManager.name = "SourceManager"
p.sourceManager.changefeedID = p.changefeedID
p.sourceManager.spawn(prcCtx)

isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI)
if err != nil {
return errors.Trace(err)
}

Check warning on line 706 in cdc/processor/processor.go

View check run for this annotation

Codecov / codecov/patch

cdc/processor/processor.go#L703-L706

Added lines #L703 - L706 were not covered by tests
p.sinkManager.r = sinkmanager.New(
p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream,
p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend)
Expand Down
72 changes: 72 additions & 0 deletions cdc/processor/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/entry/schema"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sinkmanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/scheduler"
Expand All @@ -41,6 +42,7 @@ import (
redoPkg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/pingcap/tiflow/pkg/util"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -803,3 +805,73 @@ func TestProcessorNotInitialized(t *testing.T) {
p, _, _ := initProcessor4Test(t, &liveness, false, globalVars, changefeedVars)
require.Nil(t, p.WriteDebugInfo(os.Stdout))
}

func TestGetPullerSplitUpdateMode(t *testing.T) {
testCases := []struct {
sinkURI string
config *config.ReplicaConfig
mode sourcemanager.PullerSplitUpdateMode
}{
{
sinkURI: "kafka://127.0.0.1:9092/ticdc-test2",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeNone,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false",
config: nil,
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAtStart,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=true",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(false),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
{
sinkURI: "mysql://root:test@127.0.0.1:3306/?safe-mode=false",
config: &config.ReplicaConfig{
Sink: &config.SinkConfig{
SafeMode: util.AddressOf(true),
},
},
mode: sourcemanager.PullerSplitUpdateModeAlways,
},
}
for _, tc := range testCases {
mode, err := getPullerSplitUpdateMode(tc.sinkURI, tc.config)
require.Nil(t, err)
require.Equal(t, tc.mode, mode)
}
}
36 changes: 30 additions & 6 deletions cdc/processor/sourcemanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,16 @@ import (

const defaultMaxBatchSize = 256

// PullerSplitUpdateMode is the mode to split update events in puller.
type PullerSplitUpdateMode int32

// PullerSplitUpdateMode constants.
const (
PullerSplitUpdateModeNone PullerSplitUpdateMode = 0
PullerSplitUpdateModeAtStart PullerSplitUpdateMode = 1
PullerSplitUpdateModeAlways PullerSplitUpdateMode = 2
)

// SourceManager is the manager of the source engine and puller.
type SourceManager struct {
ready chan struct{}
Expand All @@ -51,7 +61,7 @@ type SourceManager struct {
// Used to indicate whether the changefeed is in BDR mode.
bdrMode bool

safeModeAtStart bool
splitUpdateMode PullerSplitUpdateMode

enableTableMonitor bool
puller *puller.MultiplexingPuller
Expand All @@ -63,11 +73,11 @@ func New(
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
return newSourceManager(changefeedID, up, mg, engine, bdrMode, enableTableMonitor, safeModeAtStart)
return newSourceManager(changefeedID, up, mg, engine, splitUpdateMode, bdrMode, enableTableMonitor)
}

// NewForTest creates a new source manager for testing.
Expand Down Expand Up @@ -97,19 +107,19 @@ func newSourceManager(
up *upstream.Upstream,
mg entry.MounterGroup,
engine sorter.SortEngine,
splitUpdateMode PullerSplitUpdateMode,
bdrMode bool,
enableTableMonitor bool,
safeModeAtStart bool,
) *SourceManager {
mgr := &SourceManager{
ready: make(chan struct{}),
changefeedID: changefeedID,
up: up,
mg: mg,
engine: engine,
splitUpdateMode: splitUpdateMode,
bdrMode: bdrMode,
enableTableMonitor: enableTableMonitor,
safeModeAtStart: safeModeAtStart,
}

serverConfig := config.GetGlobalServerConfig()
Expand Down Expand Up @@ -164,7 +174,21 @@ func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs mo
m.engine.AddTable(span, startTs)

shouldSplitKVEntry := func(raw *model.RawKVEntry) bool {
return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs)
if raw == nil || !raw.IsUpdate() {
return false
}
switch m.splitUpdateMode {
case PullerSplitUpdateModeNone:
return false
case PullerSplitUpdateModeAlways:
return true
case PullerSplitUpdateModeAtStart:
return isOldUpdateKVEntry(raw, getReplicaTs)
default:
log.Panic("Unknown split update mode", zap.Int32("mode", int32(m.splitUpdateMode)))
}
log.Panic("Shouldn't reach here")
return false
}

// Only nil in unit tests.
Expand Down
63 changes: 45 additions & 18 deletions pkg/sink/mysql/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,31 +223,58 @@ func mergeConfig(
urlParameters *urlConfig,
) (*urlConfig, error) {
dest := &urlConfig{}
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink != nil && replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
if replicaConfig != nil && replicaConfig.Sink != nil {
dest.SafeMode = replicaConfig.Sink.SafeMode
if replicaConfig.Sink.MySQLConfig != nil {
mConfig := replicaConfig.Sink.MySQLConfig
dest.WorkerCount = mConfig.WorkerCount
dest.MaxTxnRow = mConfig.MaxTxnRow
dest.MaxMultiUpdateRowCount = mConfig.MaxMultiUpdateRowCount
dest.MaxMultiUpdateRowSize = mConfig.MaxMultiUpdateRowSize
dest.TiDBTxnMode = mConfig.TiDBTxnMode
dest.SSLCa = mConfig.SSLCa
dest.SSLCert = mConfig.SSLCert
dest.SSLKey = mConfig.SSLKey
dest.TimeZone = mConfig.TimeZone
dest.WriteTimeout = mConfig.WriteTimeout
dest.ReadTimeout = mConfig.ReadTimeout
dest.Timeout = mConfig.Timeout
dest.EnableBatchDML = mConfig.EnableBatchDML
dest.EnableMultiStatement = mConfig.EnableMultiStatement
dest.EnableCachePreparedStatement = mConfig.EnableCachePreparedStatement
}
}
if err := mergo.Merge(dest, urlParameters, mergo.WithOverride); err != nil {
return nil, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
return dest, nil
}

// IsSinkSafeMode returns whether the sink is in safe mode.
func IsSinkSafeMode(sinkURI *url.URL, replicaConfig *config.ReplicaConfig) (bool, error) {
if sinkURI == nil {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("fail to open MySQL sink, empty SinkURI")
}

Check warning on line 257 in pkg/sink/mysql/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/mysql/config.go#L254-L257

Added lines #L254 - L257 were not covered by tests

scheme := strings.ToLower(sinkURI.Scheme)
if !sink.IsMySQLCompatibleScheme(scheme) {
return false, cerror.ErrMySQLInvalidConfig.GenWithStack("can't create MySQL sink with unsupported scheme: %s", scheme)
}
req := &http.Request{URL: sinkURI}
urlParameter := &urlConfig{}
if err := binding.Query.Bind(req, urlParameter); err != nil {
return false, cerror.WrapError(cerror.ErrMySQLInvalidConfig, err)
}
var err error
if urlParameter, err = mergeConfig(replicaConfig, urlParameter); err != nil {
return false, err
}
if urlParameter.SafeMode == nil {
return defaultSafeMode, nil
}
return *urlParameter.SafeMode, nil

Check warning on line 275 in pkg/sink/mysql/config.go

View check run for this annotation

Codecov / codecov/patch

pkg/sink/mysql/config.go#L259-L275

Added lines #L259 - L275 were not covered by tests
}

func getWorkerCount(values *urlConfig, workerCount *int) error {
if values.WorkerCount == nil {
return nil
Expand Down

0 comments on commit daf3eb4

Please sign in to comment.