Skip to content

Commit

Permalink
sinkv2,processor: replace table ID with Span
Browse files Browse the repository at this point in the history
Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Dec 12, 2022
1 parent f4ea6ec commit 8ac28ac
Show file tree
Hide file tree
Showing 11 changed files with 101 additions and 59 deletions.
3 changes: 1 addition & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,8 +1188,7 @@ func (p *processor) createTablePipelineImpl(
return nil, errors.Trace(err)
}
} else {
s := p.sinkV2Factory.CreateTableSink(
p.changefeedID, span.TableID, p.metricsTableSinkTotalRows)
s := p.sinkV2Factory.CreateTableSink(p.changefeedID, span, p.metricsTableSinkTotalRows)
table, err = pipeline.NewTableActor(
ctx,
p.upstream,
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -639,7 +640,8 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
tableID,
m.sinkFactory.CreateTableSink(m.changefeedID, tableID, m.metricsTableSinkTotalRows),
m.sinkFactory.CreateTableSink(
m.changefeedID, spanz.TableIDToComparableSpan(tableID), m.metricsTableSinkTotalRows),
tablepb.TableStatePreparing,
startTs,
targetTs,
Expand Down
28 changes: 19 additions & 9 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -183,7 +184,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndGotSomeFilteredEvents() {
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -289,7 +291,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndOneTxnFi
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -394,7 +397,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndBlocked(
require.ErrorIs(suite.T(), err, cerrors.ErrFlowControllerAborted)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -520,7 +524,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndOnlyAdvanceTableSinkWhenR
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -647,7 +652,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnAndAbortWhenNoMemAndForce
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -774,7 +780,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnOnlyAdvanceTableSinkWhenR
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -892,7 +899,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndDoNotAdvanceTableUntilMee
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -976,7 +984,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableUntilTaskIsFi
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -1047,7 +1056,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload(
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, _ := createTableSinkWrapper(changefeedID, tableID)
wrapper, _ := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down
14 changes: 9 additions & 5 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -63,14 +64,15 @@ func (m *mockSink) Close() error {
}

//nolint:unparam
func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) {
func createTableSinkWrapper(changefeedID model.ChangeFeedID, span tablepb.Span) (*tableSinkWrapper, *mockSink) {
tableState := tablepb.TableStatePreparing
sink := newMockSink()
innerTableSink := tablesink.New[*model.RowChangedEvent](changefeedID, tableID,
innerTableSink := tablesink.New[*model.RowChangedEvent](
changefeedID, span,
sink, &eventsink.RowChangeEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{}))
wrapper := newTableSinkWrapper(
changefeedID,
tableID,
span.TableID,
innerTableSink,
tableState,
0,
Expand All @@ -82,7 +84,8 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table
func TestTableSinkWrapperClose(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper, _ := createTableSinkWrapper(
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1))
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.close(context.Background())
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
Expand All @@ -91,7 +94,8 @@ func TestTableSinkWrapperClose(t *testing.T) {
func TestUpdateReceivedSorterResolvedTs(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper, _ := createTableSinkWrapper(
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1))
wrapper.updateReceivedSorterResolvedTs(100)
require.Equal(t, uint64(100), wrapper.getReceivedSorterResolvedTs())
require.Equal(t, tablepb.TableStatePrepared, wrapper.getState())
Expand Down
9 changes: 6 additions & 3 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/cloudstorage"
Expand Down Expand Up @@ -92,14 +93,16 @@ func New(ctx context.Context,
}

// CreateTableSink creates a TableSink by schema.
func (s *SinkFactory) CreateTableSink(changefeedID model.ChangeFeedID, tableID model.TableID, totalRowsCounter prometheus.Counter) tablesink.TableSink {
func (s *SinkFactory) CreateTableSink(
changefeedID model.ChangeFeedID, span tablepb.Span, totalRowsCounter prometheus.Counter,
) tablesink.TableSink {
switch s.sinkType {
case sink.RowSink:
// We have to indicate the type here, otherwise it can not be compiled.
return tablesink.New[*model.RowChangedEvent](changefeedID, tableID,
return tablesink.New[*model.RowChangedEvent](changefeedID, span,
s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter)
case sink.TxnSink:
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID,
return tablesink.New[*model.SingleTableTxn](changefeedID, span,
s.txnSink, &eventsink.TxnEventAppender{}, totalRowsCounter)
default:
panic("unknown sink type")
Expand Down
3 changes: 2 additions & 1 deletion cdc/sinkv2/eventsink/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestSinkFactory(t *testing.T) {
require.NotNil(t, sinkFactory.rowSink)

tableSink := sinkFactory.CreateTableSink(model.DefaultChangeFeedID("1"),
1, prometheus.NewCounter(prometheus.CounterOpts{}))
spanz.TableIDToComparableSpan(1), prometheus.NewCounter(prometheus.CounterOpts{}))
require.NotNil(t, tableSink, "table sink can be created")

err = sinkFactory.Close()
Expand Down
11 changes: 6 additions & 5 deletions cdc/sinkv2/tablesink/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -54,8 +55,8 @@ type pendingResolvedTs struct {
// Every event is associated with a `eventID` which is a continuous number. `eventID`
// can be regarded as the event's offset in `pendingEvents`.
type progressTracker struct {
// tableID is the table ID of the table sink.
tableID model.TableID
// span is the span of the table sink.
span tablepb.Span

// Internal Buffer size. Modified in tests only.
bufferSize uint64
Expand Down Expand Up @@ -90,13 +91,13 @@ type progressTracker struct {
// newProgressTracker is used to create a new progress tracker.
// The last min resolved ts is set to 0.
// It means that the table sink has not started yet.
func newProgressTracker(tableID model.TableID, bufferSize uint64) *progressTracker {
func newProgressTracker(span tablepb.Span, bufferSize uint64) *progressTracker {
if bufferSize%8 != 0 {
panic("bufferSize must be align to 8 bytes")
}

return &progressTracker{
tableID: tableID,
span: span,
bufferSize: bufferSize / 8,
// It means the start of the table.
// It's Ok to use 0 here.
Expand Down Expand Up @@ -263,7 +264,7 @@ func (r *progressTracker) close(ctx context.Context) {
return
case <-blockTicker.C:
log.Warn("Close process doesn't return in time, may be stuck",
zap.Int64("tableID", r.tableID),
zap.Stringer("span", &r.span),
zap.Int("trackingCount", r.trackingCount()),
zap.Any("lastMinResolvedTs", r.advance()),
)
Expand Down
21 changes: 11 additions & 10 deletions cdc/sinkv2/tablesink/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
)

func TestNewProgressTracker(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
require.Equal(
t,
uint64(0),
Expand All @@ -38,7 +39,7 @@ func TestNewProgressTracker(t *testing.T) {
func TestAddEvent(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addEvent()
tracker.addEvent()
Expand All @@ -49,15 +50,15 @@ func TestAddResolvedTs(t *testing.T) {
t.Parallel()

// There is no event in the tracker.
tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addResolvedTs(model.NewResolvedTs(1))
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
require.Equal(t, 0, tracker.trackingCount(), "resolved ts should not be added")
require.Equal(t, uint64(3), tracker.advance().Ts, "lastMinResolvedTs should be 3")

// There is an event in the tracker.
tracker = newProgressTracker(1, defaultBufferSize)
tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
Expand All @@ -70,7 +71,7 @@ func TestRemove(t *testing.T) {
var cb1, cb2, cb4, cb5 func()

// Only event.
tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
cb2 = tracker.addEvent()
tracker.addEvent()
Expand All @@ -79,7 +80,7 @@ func TestRemove(t *testing.T) {
require.Equal(t, 3, tracker.trackingCount(), "not advanced")

// Both event and resolved ts.
tracker = newProgressTracker(1, defaultBufferSize)
tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 = tracker.addEvent()
cb2 = tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(3))
Expand Down Expand Up @@ -113,7 +114,7 @@ func TestRemove(t *testing.T) {
func TestCloseTracker(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down Expand Up @@ -142,7 +143,7 @@ func TestCloseTracker(t *testing.T) {
func TestCloseTrackerCancellable(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
tracker.addEvent()
Expand All @@ -167,7 +168,7 @@ func TestCloseTrackerCancellable(t *testing.T) {
func TestTrackerBufferBoundary(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, 8)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), 8)

cbs := make([]func(), 0)
for i := 0; i < 65; i++ {
Expand Down Expand Up @@ -200,7 +201,7 @@ func TestTrackerBufferBoundary(t *testing.T) {
func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down
Loading

0 comments on commit 8ac28ac

Please sign in to comment.