diff --git a/cdc/model/kv.go b/cdc/model/kv.go index 201c4b4f494..56aceac8077 100644 --- a/cdc/model/kv.go +++ b/cdc/model/kv.go @@ -16,6 +16,7 @@ package model import ( + "errors" "fmt" "github.com/pingcap/tiflow/cdc/processor/tablepb" @@ -110,3 +111,20 @@ func (v *RawKVEntry) String() string { func (v *RawKVEntry) ApproximateDataSize() int64 { return int64(len(v.Key) + len(v.Value) + len(v.OldValue)) } + +// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. +type ShouldSplitKVEntry func(raw *RawKVEntry) bool + +// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. +func SplitUpdateKVEntry(raw *RawKVEntry) (*RawKVEntry, *RawKVEntry, error) { + if raw == nil { + return nil, nil, errors.New("nil event cannot be split") + } + deleteKVEntry := *raw + deleteKVEntry.Value = nil + + insertKVEntry := *raw + insertKVEntry.OldValue = nil + + return &deleteKVEntry, &insertKVEntry, nil +} diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 8098bb32419..f63b77fb583 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -215,13 +215,13 @@ func (p *processor) AddTableSpan( zap.Bool("isPrepare", isPrepare)) } - p.sinkManager.r.AddTable( + table := p.sinkManager.r.AddTable( span, startTs, p.latestInfo.TargetTs) if p.redo.r.Enabled() { p.redo.r.AddTable(span, startTs) } - p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs) + p.sourceManager.r.AddTable(span, p.getTableName(ctx, span.TableID), startTs, table.GetReplicaTs) return true, nil } @@ -478,18 +478,6 @@ func isProcessorIgnorableError(err error) bool { return false } -// needPullerSafeModeAtStart returns true if the scheme is mysql compatible. -// pullerSafeMode means to split all update kv entries whose commitTS -// is older then the start time of this changefeed. -func needPullerSafeModeAtStart(sinkURIStr string) (bool, error) { - sinkURI, err := url.Parse(sinkURIStr) - if err != nil { - return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) - } - scheme := sink.GetScheme(sinkURI) - return sink.IsMySQLCompatibleScheme(scheme), nil -} - // Tick implements the `orchestrator.State` interface // the `info` parameter is sent by metadata store, the `info` must be the latest value snapshot. // the `status` parameter is sent by metadata store, the `status` must be the latest value snapshot. @@ -589,6 +577,16 @@ func (p *processor) tick(ctx cdcContext.Context) (error, error) { return nil, warning } +// isMysqlCompatibleBackend returns true if the sinkURIStr is mysql compatible. +func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { + sinkURI, err := url.Parse(sinkURIStr) + if err != nil { + return false, cerror.WrapError(cerror.ErrSinkURIInvalid, err) + } + scheme := sink.GetScheme(sinkURI) + return sink.IsMySQLCompatibleScheme(scheme), nil +} + // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { if p.initialized.Load() { @@ -652,21 +650,21 @@ func (p *processor) lazyInitImpl(etcdCtx cdcContext.Context) (err error) { return errors.Trace(err) } - pullerSafeModeAtStart, err := needPullerSafeModeAtStart(p.latestInfo.SinkURI) + isMysqlBackend, err := isMysqlCompatibleBackend(p.latestInfo.SinkURI) if err != nil { return errors.Trace(err) } p.sourceManager.r = sourcemanager.New( p.changefeedID, p.upstream, p.mg.r, sortEngine, util.GetOrZero(cfConfig.BDRMode), - pullerSafeModeAtStart) + isMysqlBackend) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID p.sourceManager.spawn(prcCtx) p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, - p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r) + p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) p.sinkManager.name = "SinkManager" p.sinkManager.changefeedID = p.changefeedID p.sinkManager.spawn(prcCtx) diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 2ce276a47f4..83ceb618782 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -130,6 +130,9 @@ type SinkManager struct { // wg is used to wait for all workers to exit. wg sync.WaitGroup + // isMysqlBackend indicates whether the backend is MySQL compatible. + isMysqlBackend bool + // Metric for table sink. metricsTableSinkTotalRows prometheus.Counter @@ -145,6 +148,7 @@ func New( schemaStorage entry.SchemaStorage, redoDMLMgr redo.DMLManager, sourceManager *sourcemanager.SourceManager, + isMysqlBackend bool, ) *SinkManager { m := &SinkManager{ changefeedID: changefeedID, @@ -158,7 +162,7 @@ func New( sinkTaskChan: make(chan *sinkTask), sinkWorkerAvailable: make(chan struct{}, 1), sinkRetry: retry.NewInfiniteErrorRetry(), - + isMysqlBackend: isMysqlBackend, metricsTableSinkTotalRows: tablesinkmetrics.TotalRowsCountCounter. WithLabelValues(changefeedID.Namespace, changefeedID.ID), @@ -305,6 +309,11 @@ func (m *SinkManager) Run(ctx context.Context, warnings ...chan<- error) (err er if cerror.IsDupEntryError(err) { return errors.Trace(err) } + + if m.isMysqlBackend { + // For MySQL backend, we should restart sink. Let owner to handle the error. + return errors.Trace(err) + } } // If the error is retryable, we should retry to re-establish the internal resources. @@ -840,7 +849,7 @@ func (m *SinkManager) UpdateBarrierTs(globalBarrierTs model.Ts, tableBarrier map } // AddTable adds a table(TableSink) to the sink manager. -func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) { +func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs model.Ts) *tableSinkWrapper { sinkWrapper := newTableSinkWrapper( m.changefeedID, span, @@ -868,7 +877,6 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), zap.Stringer("span", &span)) - return } m.sinkMemQuota.AddTable(span) m.redoMemQuota.AddTable(span) @@ -878,6 +886,7 @@ func (m *SinkManager) AddTable(span tablepb.Span, startTs model.Ts, targetTs mod zap.Stringer("span", &span), zap.Uint64("startTs", startTs), zap.Uint64("version", sinkWrapper.version)) + return sinkWrapper } // StartTable sets the table(TableSink) state to replicating. diff --git a/cdc/processor/sinkmanager/manager_test.go b/cdc/processor/sinkmanager/manager_test.go index 4b245ea2647..d96902414c7 100644 --- a/cdc/processor/sinkmanager/manager_test.go +++ b/cdc/processor/sinkmanager/manager_test.go @@ -129,7 +129,7 @@ func TestAddTable(t *testing.T) { require.Equal(t, 0, manager.sinkProgressHeap.len(), "Not started table shout not in progress heap") err := manager.StartTable(span, 1) require.NoError(t, err) - require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs) + require.Equal(t, uint64(0x7ffffffffffbffff), tableSink.(*tableSinkWrapper).replicateTs.Load()) progress := manager.sinkProgressHeap.pop() require.Equal(t, span, progress.span) @@ -355,7 +355,7 @@ func TestSinkManagerRunWithErrors(t *testing.T) { span := spanz.TableIDToComparableSpan(1) - source.AddTable(span, "test", 100) + source.AddTable(span, "test", 100, func() model.Ts { return 0 }) manager.AddTable(span, 100, math.MaxUint64) manager.StartTable(span, 100) source.Add(span, model.NewResolvedPolymorphicEvent(0, 101)) diff --git a/cdc/processor/sinkmanager/manager_test_helper.go b/cdc/processor/sinkmanager/manager_test_helper.go index 73163921dee..2bc8c96b003 100644 --- a/cdc/processor/sinkmanager/manager_test_helper.go +++ b/cdc/processor/sinkmanager/manager_test_helper.go @@ -71,7 +71,7 @@ func CreateManagerWithMemEngine( sourceManager.WaitForReady(ctx) sinkManager := New(changefeedID, changefeedInfo.SinkURI, - changefeedInfo.Config, up, schemaStorage, nil, sourceManager) + changefeedInfo.Config, up, schemaStorage, nil, sourceManager, false) go func() { handleError(sinkManager.Run(ctx)) }() sinkManager.WaitForReady(ctx) @@ -92,6 +92,6 @@ func NewManagerWithMemEngine( schemaStorage := &entry.MockSchemaStorage{Resolved: math.MaxUint64} sourceManager := sourcemanager.NewForTest(changefeedID, up, mg, sortEngine, false) sinkManager := New(changefeedID, changefeedInfo.SinkURI, - changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager) + changefeedInfo.Config, up, schemaStorage, redoMgr, sourceManager, false) return sinkManager, sourceManager, sortEngine } diff --git a/cdc/processor/sinkmanager/redo_log_worker.go b/cdc/processor/sinkmanager/redo_log_worker.go index 89f2c7d604d..c2b31ca6b36 100644 --- a/cdc/processor/sinkmanager/redo_log_worker.go +++ b/cdc/processor/sinkmanager/redo_log_worker.go @@ -141,7 +141,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) (finalErr e // NOTICE: The event can be filtered by the event filter. if e.Row != nil { // For all events, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + e.Row.ReplicatingTs = task.tableSink.replicateTs.Load() x, size = handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) } diff --git a/cdc/processor/sinkmanager/table_sink_worker.go b/cdc/processor/sinkmanager/table_sink_worker.go index 42c493f3918..82501caa836 100644 --- a/cdc/processor/sinkmanager/table_sink_worker.go +++ b/cdc/processor/sinkmanager/table_sink_worker.go @@ -237,7 +237,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (finalErr e // NOTICE: The event can be filtered by the event filter. if e.Row != nil { // For all rows, we add table replicate ts, so mysql sink can determine safe-mode. - e.Row.ReplicatingTs = task.tableSink.replicateTs + e.Row.ReplicatingTs = task.tableSink.GetReplicaTs() x, size := handleRowChangedEvents(w.changefeedID, task.span, e) advancer.appendEvents(x, size) allEventSize += size diff --git a/cdc/processor/sinkmanager/table_sink_wrapper.go b/cdc/processor/sinkmanager/table_sink_wrapper.go index 0978fd6dd70..420184507db 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper.go @@ -15,6 +15,7 @@ package sinkmanager import ( "context" + "math" "sort" "sync" "sync/atomic" @@ -77,7 +78,7 @@ type tableSinkWrapper struct { receivedSorterResolvedTs atomic.Uint64 // replicateTs is the ts that the table sink has started to replicate. - replicateTs model.Ts + replicateTs atomic.Uint64 genReplicateTs func(ctx context.Context) (model.Ts, error) // lastCleanTime indicates the last time the table has been cleaned. @@ -90,6 +91,11 @@ type tableSinkWrapper struct { rangeEventCountsMu sync.Mutex } +// GetReplicaTs returns the replicate ts of the table sink. +func (t *tableSinkWrapper) GetReplicaTs() model.Ts { + return t.replicateTs.Load() +} + type rangeEventCount struct { // firstPos and lastPos are used to merge many rangeEventCount into one. firstPos engine.Position @@ -132,31 +138,34 @@ func newTableSinkWrapper( res.receivedSorterResolvedTs.Store(startTs) res.barrierTs.Store(startTs) + res.replicateTs.Store(math.MaxUint64) return res } func (t *tableSinkWrapper) start(ctx context.Context, startTs model.Ts) (err error) { - if t.replicateTs != 0 { + if t.replicateTs.Load() != math.MaxUint64 { log.Panic("The table sink has already started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), zap.Uint64("startTs", startTs), - zap.Uint64("oldReplicateTs", t.replicateTs), + zap.Uint64("oldReplicateTs", t.replicateTs.Load()), ) } // FIXME(qupeng): it can be re-fetched later instead of fails. - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is started", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), zap.Uint64("startTs", startTs), - zap.Uint64("replicateTs", t.replicateTs), + zap.Uint64("replicateTs", ts), ) // This start ts maybe greater than the initial start ts of the table sink. @@ -379,14 +388,16 @@ func (t *tableSinkWrapper) checkTableSinkHealth() (err error) { // committed at downstream but we don't know. So we need to update `replicateTs` // of the table so that we can re-send those events later. func (t *tableSinkWrapper) restart(ctx context.Context) (err error) { - if t.replicateTs, err = t.genReplicateTs(ctx); err != nil { + ts, err := t.genReplicateTs(ctx) + if err != nil { return errors.Trace(err) } + t.replicateTs.Store(ts) log.Info("Sink is restarted", zap.String("namespace", t.changefeed.Namespace), zap.String("changefeed", t.changefeed.ID), zap.Stringer("span", &t.span), - zap.Uint64("replicateTs", t.replicateTs)) + zap.Uint64("replicateTs", ts)) return nil } diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index f2765e0f596..cd5d937c15d 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -17,7 +17,6 @@ import ( "context" "time" - "github.com/pingcap/errors" "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/entry" "github.com/pingcap/tiflow/cdc/kv" @@ -29,14 +28,10 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/puller" "github.com/pingcap/tiflow/pkg/config" - cerrors "github.com/pingcap/tiflow/pkg/errors" - "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/txnutil" "github.com/pingcap/tiflow/pkg/upstream" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" - pd "github.com/tikv/pd/client" "go.uber.org/zap" ) @@ -48,8 +43,7 @@ type pullerWrapperCreator func( tableName string, startTs model.Ts, bdrMode bool, - shouldSplitKVEntry pullerwrapper.ShouldSplitKVEntry, - splitUpdateKVEntry pullerwrapper.SplitUpdateKVEntry, + shouldSplitKVEntry model.ShouldSplitKVEntry, ) pullerwrapper.Wrapper type tablePullers struct { @@ -78,8 +72,6 @@ type SourceManager struct { engine engine.SortEngine // Used to indicate whether the changefeed is in BDR mode. bdrMode bool - // startTs is the timestamp when SourceManager starts. - startTs model.Ts // if `config.GetGlobalServerConfig().KVClient.EnableMultiplexing` is true `tablePullers` // will be used. Otherwise `multiplexingPuller` will be used instead. @@ -113,21 +105,8 @@ func NewForTest( return newSourceManager(changefeedID, up, mg, engine, bdrMode, false, false, pullerwrapper.NewPullerWrapperForTest) } -func isOldUpdateKVEntry(raw *model.RawKVEntry, thresholdTs model.Ts) bool { - return raw != nil && raw.IsUpdate() && raw.CRTs < thresholdTs -} - -func splitUpdateKVEntry(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) { - if raw == nil { - return nil, nil, errors.New("nil event cannot be split") - } - deleteKVEntry := *raw - deleteKVEntry.Value = nil - - insertKVEntry := *raw - insertKVEntry.OldValue = nil - - return &deleteKVEntry, &insertKVEntry, nil +func isOldUpdateKVEntry(raw *model.RawKVEntry, getReplicaTs func() model.Ts) bool { + return raw != nil && raw.IsUpdate() && raw.CRTs < getReplicaTs() } func newSourceManager( @@ -158,19 +137,20 @@ func newSourceManager( } // AddTable adds a table to the source manager. Start puller and register table to the engine. -func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts) { +func (m *SourceManager) AddTable(span tablepb.Span, tableName string, startTs model.Ts, getReplicaTs func() model.Ts) { // Add table to the engine first, so that the engine can receive the events from the puller. m.engine.AddTable(span, startTs) + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return m.safeModeAtStart && isOldUpdateKVEntry(raw, getReplicaTs) + } + if m.multiplexing { - m.multiplexingPuller.puller.Subscribe([]tablepb.Span{span}, startTs, tableName) + m.multiplexingPuller.puller.Subscribe([]tablepb.Span{span}, startTs, tableName, shouldSplitKVEntry) return } - shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) - } - p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry, splitUpdateKVEntry) + p := m.tablePullers.pullerWrapperCreator(m.changefeedID, span, tableName, startTs, m.bdrMode, shouldSplitKVEntry) p.Start(m.tablePullers.ctx, m.up, m.engine, m.tablePullers.errChan) m.tablePullers.Store(span, p) } @@ -231,11 +211,6 @@ func (m *SourceManager) GetTableSorterStats(span tablepb.Span) engine.TableStats // Run implements util.Runnable. func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { - startTs, err := getCurrentTs(ctx, m.up.PDClient) - if err != nil { - return err - } - m.startTs = startTs if m.multiplexing { serverConfig := config.GetGlobalServerConfig() grpcPool := sharedconn.NewConnAndClientPool(m.up.SecurityConfig, kv.GetGlobalGrpcMetrics()) @@ -244,14 +219,10 @@ func (m *SourceManager) Run(ctx context.Context, _ ...chan<- error) error { m.up.PDClient, grpcPool, m.up.RegionCache, m.up.PDClock, txnutil.NewLockerResolver(m.up.KVStorage.(tikv.Storage), m.changefeedID), ) - shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { - return m.safeModeAtStart && isOldUpdateKVEntry(raw, m.startTs) - } + m.multiplexingPuller.puller = pullerwrapper.NewMultiplexingPullerWrapper( m.changefeedID, client, m.engine, int(serverConfig.KVClient.FrontierConcurrent), - shouldSplitKVEntry, - splitUpdateKVEntry, ) close(m.ready) @@ -309,23 +280,3 @@ func (m *SourceManager) Close() { func (m *SourceManager) Add(span tablepb.Span, events ...*model.PolymorphicEvent) { m.engine.Add(span, events...) } - -func getCurrentTs(ctx context.Context, pdClient pd.Client) (model.Ts, error) { - backoffBaseDelayInMs := int64(100) - totalRetryDuration := 10 * time.Second - var replicateTs model.Ts - err := retry.Do(ctx, func() error { - phy, logic, err := pdClient.GetTS(ctx) - if err != nil { - return errors.Trace(err) - } - replicateTs = oracle.ComposeTS(phy, logic) - return nil - }, retry.WithBackoffBaseDelay(backoffBaseDelayInMs), - retry.WithTotalRetryDuratoin(totalRetryDuration), - retry.WithIsRetryableErr(cerrors.IsRetryableError)) - if err != nil { - return model.Ts(0), errors.Trace(err) - } - return replicateTs, nil -} diff --git a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go index 847eb171594..d8259590f7c 100644 --- a/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/dummy_puller_wrapper.go @@ -34,8 +34,7 @@ func NewPullerWrapperForTest( tableName string, startTs model.Ts, bdrMode bool, - shouldSplitKVEntry ShouldSplitKVEntry, - splitUpdateKVEntry SplitUpdateKVEntry, + shouldSplitKVEntry model.ShouldSplitKVEntry, ) Wrapper { return &dummyPullerWrapper{} } diff --git a/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go b/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go index 115f02b8297..1a25f0e158e 100644 --- a/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/multiplexing_puller_wrapper.go @@ -37,10 +37,8 @@ func NewMultiplexingPullerWrapper( client *kv.SharedClient, eventSortEngine engine.SortEngine, frontiers int, - shouldSplitKVEntry ShouldSplitKVEntry, - splitUpdateKVEntry SplitUpdateKVEntry, ) *MultiplexingWrapper { - consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span) error { + consume := func(ctx context.Context, raw *model.RawKVEntry, spans []tablepb.Span, shouldSplitKVEntry model.ShouldSplitKVEntry) error { if len(spans) > 1 { log.Panic("DML puller subscribes multiple spans", zap.String("namespace", changefeed.Namespace), @@ -48,7 +46,7 @@ func NewMultiplexingPullerWrapper( } if raw != nil { if shouldSplitKVEntry(raw) { - deleteKVEntry, insertKVEntry, err := splitUpdateKVEntry(raw) + deleteKVEntry, insertKVEntry, err := model.SplitUpdateKVEntry(raw) if err != nil { return err } diff --git a/cdc/processor/sourcemanager/puller/puller_wrapper.go b/cdc/processor/sourcemanager/puller/puller_wrapper.go index 1fa332e0dc3..8dd933abaec 100644 --- a/cdc/processor/sourcemanager/puller/puller_wrapper.go +++ b/cdc/processor/sourcemanager/puller/puller_wrapper.go @@ -40,12 +40,6 @@ type Wrapper interface { Close() } -// ShouldSplitKVEntry checks whether the raw kv entry should be splitted. -type ShouldSplitKVEntry func(raw *model.RawKVEntry) bool - -// SplitUpdateKVEntry splits the raw kv entry into a delete entry and an insert entry. -type SplitUpdateKVEntry func(raw *model.RawKVEntry) (*model.RawKVEntry, *model.RawKVEntry, error) - // WrapperImpl is a wrapper of puller used by source manager. type WrapperImpl struct { changefeed model.ChangeFeedID @@ -55,8 +49,7 @@ type WrapperImpl struct { startTs model.Ts bdrMode bool - shouldSplitKVEntry ShouldSplitKVEntry - splitUpdateKVEntry SplitUpdateKVEntry + shouldSplitKVEntry model.ShouldSplitKVEntry // cancel is used to cancel the puller when remove or close the table. cancel context.CancelFunc @@ -71,8 +64,7 @@ func NewPullerWrapper( tableName string, startTs model.Ts, bdrMode bool, - shouldSplitKVEntry ShouldSplitKVEntry, - splitUpdateKVEntry SplitUpdateKVEntry, + shouldSplitKVEntry model.ShouldSplitKVEntry, ) Wrapper { return &WrapperImpl{ changefeed: changefeed, @@ -81,7 +73,6 @@ func NewPullerWrapper( startTs: startTs, bdrMode: bdrMode, shouldSplitKVEntry: shouldSplitKVEntry, - splitUpdateKVEntry: splitUpdateKVEntry, } } @@ -140,7 +131,7 @@ func (n *WrapperImpl) Start( continue } if n.shouldSplitKVEntry(rawKV) { - deleteKVEntry, insertKVEntry, err := n.splitUpdateKVEntry(rawKV) + deleteKVEntry, insertKVEntry, err := model.SplitUpdateKVEntry(rawKV) if err != nil { return err } diff --git a/cdc/puller/ddl_puller.go b/cdc/puller/ddl_puller.go index f3d39316470..2357e87ee2c 100644 --- a/cdc/puller/ddl_puller.go +++ b/cdc/puller/ddl_puller.go @@ -612,7 +612,8 @@ func NewDDLJobPuller( pdCli, grpcPool, regionCache, pdClock, txnutil.NewLockerResolver(kvStorage.(tikv.Storage), changefeed), ) - consume := func(ctx context.Context, raw *model.RawKVEntry, _ []tablepb.Span) error { + + consume := func(ctx context.Context, raw *model.RawKVEntry, _ []tablepb.Span, _ model.ShouldSplitKVEntry) error { select { case <-ctx.Done(): return ctx.Err() @@ -623,7 +624,7 @@ func NewDDLJobPuller( slots, hasher := 1, func(tablepb.Span, int) int { return 0 } mp.MultiplexingPuller = NewMultiplexingPuller(changefeed, client, consume, slots, hasher, 1) - mp.Subscribe(spans, checkpointTs, memorysorter.DDLPullerTableName) + mp.Subscribe(spans, checkpointTs, memorysorter.DDLPullerTableName, func(_ *model.RawKVEntry) bool { return false }) } else { jobPuller.puller.Puller = New( ctx, pdCli, up.GrpcPool, regionCache, kvStorage, pdClock, diff --git a/cdc/puller/multiplexing_puller.go b/cdc/puller/multiplexing_puller.go index 39d3a97c906..603c303cc59 100644 --- a/cdc/puller/multiplexing_puller.go +++ b/cdc/puller/multiplexing_puller.go @@ -75,7 +75,7 @@ type tableProgressWithSubID struct { type MultiplexingPuller struct { changefeed model.ChangeFeedID client *kv.SharedClient - consume func(context.Context, *model.RawKVEntry, []tablepb.Span) error + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error hasher func(tablepb.Span, int) int frontiers int @@ -106,7 +106,7 @@ type MultiplexingPuller struct { func NewMultiplexingPuller( changefeed model.ChangeFeedID, client *kv.SharedClient, - consume func(context.Context, *model.RawKVEntry, []tablepb.Span) error, + consume func(context.Context, *model.RawKVEntry, []tablepb.Span, model.ShouldSplitKVEntry) error, workers int, hasher func(tablepb.Span, int) int, frontiers int, @@ -130,13 +130,23 @@ func NewMultiplexingPuller( } // Subscribe some spans. They will share one same resolved timestamp progress. -func (p *MultiplexingPuller) Subscribe(spans []tablepb.Span, startTs model.Ts, tableName string) { +func (p *MultiplexingPuller) Subscribe( + spans []tablepb.Span, + startTs model.Ts, + tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, +) { p.subscriptions.Lock() defer p.subscriptions.Unlock() - p.subscribe(spans, startTs, tableName) + p.subscribe(spans, startTs, tableName, shouldSplitKVEntry) } -func (p *MultiplexingPuller) subscribe(spans []tablepb.Span, startTs model.Ts, tableName string) []kv.SubscriptionID { +func (p *MultiplexingPuller) subscribe( + spans []tablepb.Span, + startTs model.Ts, + tableName string, + shouldSplitKVEntry model.ShouldSplitKVEntry, +) []kv.SubscriptionID { for _, span := range spans { if _, exists := p.subscriptions.n.Get(span); exists { log.Panic("redundant subscription", @@ -164,7 +174,7 @@ func (p *MultiplexingPuller) subscribe(spans []tablepb.Span, startTs model.Ts, t progress.consume.RLock() defer progress.consume.RUnlock() if !progress.consume.removed { - return p.consume(ctx, raw, spans) + return p.consume(ctx, raw, spans, shouldSplitKVEntry) } return nil } diff --git a/cdc/puller/multiplexing_puller_test.go b/cdc/puller/multiplexing_puller_test.go index 7a3086e5cf0..30f7fd1ae60 100644 --- a/cdc/puller/multiplexing_puller_test.go +++ b/cdc/puller/multiplexing_puller_test.go @@ -28,7 +28,7 @@ import ( func newMultiplexingPullerForTest(outputCh chan<- *model.RawKVEntry) *MultiplexingPuller { client := kv.NewSharedClient(model.ChangeFeedID{}, nil, false, nil, nil, nil, nil, nil) - consume := func(ctx context.Context, e *model.RawKVEntry, _ []tablepb.Span) error { + consume := func(ctx context.Context, e *model.RawKVEntry, _ []tablepb.Span, _ model.ShouldSplitKVEntry) error { select { case <-ctx.Done(): return ctx.Err() @@ -81,7 +81,10 @@ func TestMultiplexingPullerResolvedForward(t *testing.T) { spans := []tablepb.Span{spanz.ToSpan([]byte("t_a"), []byte("t_e"))} spans[0].TableID = 1 - subID := puller.subscribe(spans, 996, "test")[0] + shouldSplitKVEntry := func(raw *model.RawKVEntry) bool { + return false + } + subID := puller.subscribe(spans, 996, "test", shouldSplitKVEntry)[0] for _, event := range events { puller.inputChs[0] <- kv.MultiplexingEvent{RegionFeedEvent: event, SubscriptionID: subID} }