From 8ac28ac8816142835f1d1fb3d1bd5711b743895a Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Mon, 14 Nov 2022 17:41:20 +0800 Subject: [PATCH 1/2] sinkv2,processor: replace table ID with Span Signed-off-by: Neil Shen --- cdc/processor/processor.go | 3 +- cdc/processor/sinkmanager/manager.go | 4 +- .../sinkmanager/table_sink_worker_test.go | 28 +++++++++---- .../sinkmanager/table_sink_wrapper_test.go | 14 ++++--- cdc/sinkv2/eventsink/factory/factory.go | 9 ++-- cdc/sinkv2/eventsink/factory/factory_test.go | 3 +- cdc/sinkv2/tablesink/progress_tracker.go | 11 ++--- cdc/sinkv2/tablesink/progress_tracker_test.go | 21 +++++----- cdc/sinkv2/tablesink/table_sink_impl.go | 15 +++---- cdc/sinkv2/tablesink/table_sink_impl_test.go | 41 +++++++++++-------- pkg/config/debug.go | 11 +++++ 11 files changed, 101 insertions(+), 59 deletions(-) diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index e0ae90a349e..646eb258969 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -1188,8 +1188,7 @@ func (p *processor) createTablePipelineImpl( return nil, errors.Trace(err) } } else { - s := p.sinkV2Factory.CreateTableSink( - p.changefeedID, span.TableID, p.metricsTableSinkTotalRows) + s := p.sinkV2Factory.CreateTableSink(p.changefeedID, span, p.metricsTableSinkTotalRows) table, err = pipeline.NewTableActor( ctx, p.upstream, diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 1f98b2811fd..7822eea2565 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory" cerrors "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/retry" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/prometheus/client_golang/prometheus" "github.com/tikv/client-go/v2/oracle" @@ -639,7 +640,8 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs sinkWrapper := newTableSinkWrapper( m.changefeedID, tableID, - m.sinkFactory.CreateTableSink(m.changefeedID, tableID, m.metricsTableSinkTotalRows), + m.sinkFactory.CreateTableSink( + m.changefeedID, spanz.TableIDToComparableSpan(tableID), m.metricsTableSinkTotalRows), tablepb.TableStatePreparing, startTs, targetTs, diff --git a/cdc/processor/sinkmanager/table_sink_worker_test.go b/cdc/processor/sinkmanager/table_sink_worker_test.go index d91c4459713..bf260159a1f 100644 --- a/cdc/processor/sinkmanager/table_sink_worker_test.go +++ b/cdc/processor/sinkmanager/table_sink_worker_test.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine" "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/pingcap/tiflow/pkg/upstream" "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" @@ -183,7 +184,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndGotSomeFilteredEvents() { require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -289,7 +291,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndOneTxnFi require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -394,7 +397,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndBlocked( require.ErrorIs(suite.T(), err, cerrors.ErrFlowControllerAborted) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -520,7 +524,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndOnlyAdvanceTableSinkWhenR require.ErrorIs(suite.T(), err, context.Canceled) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -647,7 +652,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnAndAbortWhenNoMemAndForce require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -774,7 +780,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnOnlyAdvanceTableSinkWhenR require.Equal(suite.T(), context.Canceled, err) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -892,7 +899,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndDoNotAdvanceTableUntilMee require.ErrorIs(suite.T(), err, context.Canceled) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -976,7 +984,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableUntilTaskIsFi require.ErrorIs(suite.T(), err, context.Canceled) }() - wrapper, sink := createTableSinkWrapper(changefeedID, tableID) + wrapper, sink := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, @@ -1047,7 +1056,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload( require.ErrorIs(suite.T(), err, context.Canceled) }() - wrapper, _ := createTableSinkWrapper(changefeedID, tableID) + wrapper, _ := createTableSinkWrapper( + changefeedID, spanz.TableIDToComparableSpan(tableID)) lowerBoundPos := engine.Position{ StartTs: 0, CommitTs: 1, diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 92d27013d8c..8da4c64b356 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -63,14 +64,15 @@ func (m *mockSink) Close() error { } //nolint:unparam -func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) { +func createTableSinkWrapper(changefeedID model.ChangeFeedID, span tablepb.Span) (*tableSinkWrapper, *mockSink) { tableState := tablepb.TableStatePreparing sink := newMockSink() - innerTableSink := tablesink.New[*model.RowChangedEvent](changefeedID, tableID, + innerTableSink := tablesink.New[*model.RowChangedEvent]( + changefeedID, span, sink, &eventsink.RowChangeEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) wrapper := newTableSinkWrapper( changefeedID, - tableID, + span.TableID, innerTableSink, tableState, 0, @@ -82,7 +84,8 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table func TestTableSinkWrapperClose(t *testing.T) { t.Parallel() - wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) + wrapper, _ := createTableSinkWrapper( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1)) require.Equal(t, tablepb.TableStatePreparing, wrapper.getState()) wrapper.close(context.Background()) require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped") @@ -91,7 +94,8 @@ func TestTableSinkWrapperClose(t *testing.T) { func TestUpdateReceivedSorterResolvedTs(t *testing.T) { t.Parallel() - wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1) + wrapper, _ := createTableSinkWrapper( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1)) wrapper.updateReceivedSorterResolvedTs(100) require.Equal(t, uint64(100), wrapper.getReceivedSorterResolvedTs()) require.Equal(t, tablepb.TableStatePrepared, wrapper.getState()) diff --git a/cdc/sinkv2/eventsink/factory/factory.go b/cdc/sinkv2/eventsink/factory/factory.go index 8faa22980ab..e461e2488e1 100644 --- a/cdc/sinkv2/eventsink/factory/factory.go +++ b/cdc/sinkv2/eventsink/factory/factory.go @@ -18,6 +18,7 @@ import ( "strings" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink/cloudstorage" @@ -92,14 +93,16 @@ func New(ctx context.Context, } // CreateTableSink creates a TableSink by schema. -func (s *SinkFactory) CreateTableSink(changefeedID model.ChangeFeedID, tableID model.TableID, totalRowsCounter prometheus.Counter) tablesink.TableSink { +func (s *SinkFactory) CreateTableSink( + changefeedID model.ChangeFeedID, span tablepb.Span, totalRowsCounter prometheus.Counter, +) tablesink.TableSink { switch s.sinkType { case sink.RowSink: // We have to indicate the type here, otherwise it can not be compiled. - return tablesink.New[*model.RowChangedEvent](changefeedID, tableID, + return tablesink.New[*model.RowChangedEvent](changefeedID, span, s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter) case sink.TxnSink: - return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, + return tablesink.New[*model.SingleTableTxn](changefeedID, span, s.txnSink, &eventsink.TxnEventAppender{}, totalRowsCounter) default: panic("unknown sink type") diff --git a/cdc/sinkv2/eventsink/factory/factory_test.go b/cdc/sinkv2/eventsink/factory/factory_test.go index e1958222d87..971808bcb42 100644 --- a/cdc/sinkv2/eventsink/factory/factory_test.go +++ b/cdc/sinkv2/eventsink/factory/factory_test.go @@ -28,6 +28,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/sink" "github.com/pingcap/tiflow/pkg/sink/kafka" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -106,7 +107,7 @@ func TestSinkFactory(t *testing.T) { require.NotNil(t, sinkFactory.rowSink) tableSink := sinkFactory.CreateTableSink(model.DefaultChangeFeedID("1"), - 1, prometheus.NewCounter(prometheus.CounterOpts{})) + spanz.TableIDToComparableSpan(1), prometheus.NewCounter(prometheus.CounterOpts{})) require.NotNil(t, tableSink, "table sink can be created") err = sinkFactory.Close() diff --git a/cdc/sinkv2/tablesink/progress_tracker.go b/cdc/sinkv2/tablesink/progress_tracker.go index d739c144f50..a00d8b13c49 100644 --- a/cdc/sinkv2/tablesink/progress_tracker.go +++ b/cdc/sinkv2/tablesink/progress_tracker.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" "go.uber.org/zap" ) @@ -54,8 +55,8 @@ type pendingResolvedTs struct { // Every event is associated with a `eventID` which is a continuous number. `eventID` // can be regarded as the event's offset in `pendingEvents`. type progressTracker struct { - // tableID is the table ID of the table sink. - tableID model.TableID + // span is the span of the table sink. + span tablepb.Span // Internal Buffer size. Modified in tests only. bufferSize uint64 @@ -90,13 +91,13 @@ type progressTracker struct { // newProgressTracker is used to create a new progress tracker. // The last min resolved ts is set to 0. // It means that the table sink has not started yet. -func newProgressTracker(tableID model.TableID, bufferSize uint64) *progressTracker { +func newProgressTracker(span tablepb.Span, bufferSize uint64) *progressTracker { if bufferSize%8 != 0 { panic("bufferSize must be align to 8 bytes") } return &progressTracker{ - tableID: tableID, + span: span, bufferSize: bufferSize / 8, // It means the start of the table. // It's Ok to use 0 here. @@ -263,7 +264,7 @@ func (r *progressTracker) close(ctx context.Context) { return case <-blockTicker.C: log.Warn("Close process doesn't return in time, may be stuck", - zap.Int64("tableID", r.tableID), + zap.Stringer("span", &r.span), zap.Int("trackingCount", r.trackingCount()), zap.Any("lastMinResolvedTs", r.advance()), ) diff --git a/cdc/sinkv2/tablesink/progress_tracker_test.go b/cdc/sinkv2/tablesink/progress_tracker_test.go index c8a5ce6b95c..88533348d6b 100644 --- a/cdc/sinkv2/tablesink/progress_tracker_test.go +++ b/cdc/sinkv2/tablesink/progress_tracker_test.go @@ -20,13 +20,14 @@ import ( "time" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/stretchr/testify/require" ) func TestNewProgressTracker(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) require.Equal( t, uint64(0), @@ -38,7 +39,7 @@ func TestNewProgressTracker(t *testing.T) { func TestAddEvent(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) tracker.addEvent() tracker.addEvent() tracker.addEvent() @@ -49,7 +50,7 @@ func TestAddResolvedTs(t *testing.T) { t.Parallel() // There is no event in the tracker. - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) tracker.addResolvedTs(model.NewResolvedTs(1)) tracker.addResolvedTs(model.NewResolvedTs(2)) tracker.addResolvedTs(model.NewResolvedTs(3)) @@ -57,7 +58,7 @@ func TestAddResolvedTs(t *testing.T) { require.Equal(t, uint64(3), tracker.advance().Ts, "lastMinResolvedTs should be 3") // There is an event in the tracker. - tracker = newProgressTracker(1, defaultBufferSize) + tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(2)) tracker.addResolvedTs(model.NewResolvedTs(3)) @@ -70,7 +71,7 @@ func TestRemove(t *testing.T) { var cb1, cb2, cb4, cb5 func() // Only event. - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) tracker.addEvent() cb2 = tracker.addEvent() tracker.addEvent() @@ -79,7 +80,7 @@ func TestRemove(t *testing.T) { require.Equal(t, 3, tracker.trackingCount(), "not advanced") // Both event and resolved ts. - tracker = newProgressTracker(1, defaultBufferSize) + tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) cb1 = tracker.addEvent() cb2 = tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(3)) @@ -113,7 +114,7 @@ func TestRemove(t *testing.T) { func TestCloseTracker(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) cb1 := tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(1)) cb2 := tracker.addEvent() @@ -142,7 +143,7 @@ func TestCloseTracker(t *testing.T) { func TestCloseTrackerCancellable(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(1)) tracker.addEvent() @@ -167,7 +168,7 @@ func TestCloseTrackerCancellable(t *testing.T) { func TestTrackerBufferBoundary(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, 8) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), 8) cbs := make([]func(), 0) for i := 0; i < 65; i++ { @@ -200,7 +201,7 @@ func TestTrackerBufferBoundary(t *testing.T) { func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) cb1 := tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(1)) cb2 := tracker.addEvent() diff --git a/cdc/sinkv2/tablesink/table_sink_impl.go b/cdc/sinkv2/tablesink/table_sink_impl.go index 2c033db8be6..3e28c509a72 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl.go +++ b/cdc/sinkv2/tablesink/table_sink_impl.go @@ -21,6 +21,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" + "github.com/pingcap/tiflow/cdc/processor/tablepb" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" "github.com/prometheus/client_golang/prometheus" @@ -36,7 +37,7 @@ var ( // EventTableSink is a table sink that can write events. type EventTableSink[E eventsink.TableEvent] struct { changefeedID model.ChangeFeedID - tableID model.TableID + span tablepb.Span eventID uint64 maxResolvedTs model.ResolvedTs backendSink eventsink.EventSink[E] @@ -53,18 +54,18 @@ type EventTableSink[E eventsink.TableEvent] struct { // New an eventTableSink with given backendSink and event appender. func New[E eventsink.TableEvent]( changefeedID model.ChangeFeedID, - tableID model.TableID, + span tablepb.Span, backendSink eventsink.EventSink[E], appender eventsink.Appender[E], totalRowsCounter prometheus.Counter, ) *EventTableSink[E] { return &EventTableSink[E]{ changefeedID: changefeedID, - tableID: tableID, + span: span, eventID: 0, maxResolvedTs: model.NewResolvedTs(0), backendSink: backendSink, - progressTracker: newProgressTracker(tableID, defaultBufferSize), + progressTracker: newProgressTracker(span, defaultBufferSize), eventAppender: appender, eventBuffer: make([]E, 0, 1024), state: state.TableSinkSinking, @@ -130,7 +131,7 @@ func (e *EventTableSink[E]) Close(ctx context.Context) { log.Warn(fmt.Sprintf("Table sink is already %s", currentState.String()), zap.String("namespace", e.changefeedID.Namespace), zap.String("changefeed", e.changefeedID.ID), - zap.Uint64("tableID", uint64(e.tableID))) + zap.Stringer("span", &e.span)) return } @@ -146,7 +147,7 @@ func (e *EventTableSink[E]) Close(ctx context.Context) { log.Info("Stopping table sink", zap.String("namespace", e.changefeedID.Namespace), zap.String("changefeed", e.changefeedID.ID), - zap.Int64("tableID", e.tableID), + zap.Stringer("span", &e.span), zap.Uint64("checkpointTs", stoppingCheckpointTs.Ts)) e.progressTracker.close(ctx) e.state.Store(state.TableSinkStopped) @@ -154,7 +155,7 @@ func (e *EventTableSink[E]) Close(ctx context.Context) { log.Info("Table sink stopped", zap.String("namespace", e.changefeedID.Namespace), zap.String("changefeed", e.changefeedID.ID), - zap.Int64("tableID", e.tableID), + zap.Stringer("span", &e.span), zap.Uint64("checkpointTs", stoppedCheckpointTs.Ts), zap.Duration("duration", time.Since(start))) } diff --git a/cdc/sinkv2/tablesink/table_sink_impl_test.go b/cdc/sinkv2/tablesink/table_sink_impl_test.go index c78e3606724..4f7e8cb6e54 100644 --- a/cdc/sinkv2/tablesink/table_sink_impl_test.go +++ b/cdc/sinkv2/tablesink/table_sink_impl_test.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/cdc/sinkv2/eventsink" "github.com/pingcap/tiflow/cdc/sinkv2/tablesink/state" + "github.com/pingcap/tiflow/pkg/spanz" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" ) @@ -154,8 +155,9 @@ func TestNewEventTableSink(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) require.Equal(t, uint64(0), tb.eventID, "eventID should start from 0") require.Equal(t, model.NewResolvedTs(0), tb.maxResolvedTs, "maxResolvedTs should start from 0") @@ -170,8 +172,9 @@ func TestAppendRowChangedEvents(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) require.Len(t, tb.eventBuffer, 7, "txn event buffer should have 7 txns") @@ -181,8 +184,9 @@ func TestUpdateResolvedTs(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) // No event will be flushed. @@ -230,8 +234,9 @@ func TestGetCheckpointTs(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) require.Equal(t, model.NewResolvedTs(0), tb.GetCheckpointTs(), "checkpointTs should be 0") @@ -266,8 +271,9 @@ func TestClose(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) err := tb.UpdateResolvedTs(model.NewResolvedTs(105)) @@ -294,8 +300,9 @@ func TestCloseCancellable(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) err := tb.UpdateResolvedTs(model.NewResolvedTs(105)) @@ -319,8 +326,9 @@ func TestCloseReentrant(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) err := tb.UpdateResolvedTs(model.NewResolvedTs(105)) @@ -347,8 +355,9 @@ func TestCheckpointTsFrozenWhenStopping(t *testing.T) { t.Parallel() sink := &mockEventSink{} - tb := New[*model.SingleTableTxn](model.DefaultChangeFeedID("1"), - 1, sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) + tb := New[*model.SingleTableTxn]( + model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1), + sink, &eventsink.TxnEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{})) tb.AppendRowChangedEvents(getTestRows()...) err := tb.UpdateResolvedTs(model.NewResolvedTs(105)) diff --git a/pkg/config/debug.go b/pkg/config/debug.go index e4ed8b8d0ec..54f01687e0a 100644 --- a/pkg/config/debug.go +++ b/pkg/config/debug.go @@ -15,6 +15,7 @@ package config import ( "github.com/pingcap/errors" + cerror "github.com/pingcap/tiflow/pkg/errors" cerrors "github.com/pingcap/tiflow/pkg/errors" ) @@ -61,6 +62,16 @@ func (c *DebugConfig) ValidateAndAdjust() error { if err := c.Scheduler.ValidateAndAdjust(); err != nil { return errors.Trace(err) } + if c.Scheduler.RegionPerSpan != 0 { + if c.EnablePullBasedSink || !c.EnableNewSink { + // TODO: Removing this check once pull based sink is compatible with + // span replication. + return cerror.ErrInvalidServerOption.GenWithStackByArgs( + "enabling span replication requires setting " + + "`debug.enable-new-sink` to be true and " + + "`debug.enable-pull-based-sink` to be false") + } + } if c.EnablePullBasedSink { if !c.EnableDBSorter { return cerrors.ErrInvalidPullBasedSinkConfig.GenWithStackByArgs( From 8eb6833d211c4c3962ac17f5ca6ce3a9c93667f1 Mon Sep 17 00:00:00 2001 From: Neil Shen Date: Tue, 13 Dec 2022 19:22:45 +0800 Subject: [PATCH 2/2] address lints Signed-off-by: Neil Shen --- cdc/processor/sinkmanager/table_sink_wrapper_test.go | 4 +++- cdc/sinkv2/tablesink/progress_tracker_test.go | 4 ++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cdc/processor/sinkmanager/table_sink_wrapper_test.go b/cdc/processor/sinkmanager/table_sink_wrapper_test.go index 8da4c64b356..0662c310c87 100644 --- a/cdc/processor/sinkmanager/table_sink_wrapper_test.go +++ b/cdc/processor/sinkmanager/table_sink_wrapper_test.go @@ -64,7 +64,9 @@ func (m *mockSink) Close() error { } //nolint:unparam -func createTableSinkWrapper(changefeedID model.ChangeFeedID, span tablepb.Span) (*tableSinkWrapper, *mockSink) { +func createTableSinkWrapper( + changefeedID model.ChangeFeedID, span tablepb.Span, +) (*tableSinkWrapper, *mockSink) { tableState := tablepb.TableStatePreparing sink := newMockSink() innerTableSink := tablesink.New[*model.RowChangedEvent]( diff --git a/cdc/sinkv2/tablesink/progress_tracker_test.go b/cdc/sinkv2/tablesink/progress_tracker_test.go index 59f55f888b0..5137f538193 100644 --- a/cdc/sinkv2/tablesink/progress_tracker_test.go +++ b/cdc/sinkv2/tablesink/progress_tracker_test.go @@ -243,7 +243,7 @@ func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) { func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) cb1 := tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(1)) cb2 := tracker.addEvent() @@ -271,7 +271,7 @@ func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) { func TestShouldDirectlyUpdateResolvedTsIfNoMoreEvents(t *testing.T) { t.Parallel() - tracker := newProgressTracker(1, defaultBufferSize) + tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize) cb1 := tracker.addEvent() tracker.addResolvedTs(model.NewResolvedTs(1)) cb2 := tracker.addEvent()