Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sinkv2,processor: replace table ID with Span #7838

Merged
merged 5 commits into from
Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,8 +1189,7 @@ func (p *processor) createTablePipelineImpl(
return nil, errors.Trace(err)
}
} else {
s := p.sinkV2Factory.CreateTableSink(
p.changefeedID, span.TableID, p.metricsTableSinkTotalRows)
s := p.sinkV2Factory.CreateTableSink(p.changefeedID, span, p.metricsTableSinkTotalRows)
table, err = pipeline.NewTableActor(
ctx,
p.upstream,
Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/factory"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/retry"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/prometheus/client_golang/prometheus"
"github.com/tikv/client-go/v2/oracle"
Expand Down Expand Up @@ -639,7 +640,8 @@ func (m *SinkManager) AddTable(tableID model.TableID, startTs model.Ts, targetTs
sinkWrapper := newTableSinkWrapper(
m.changefeedID,
tableID,
m.sinkFactory.CreateTableSink(m.changefeedID, tableID, m.metricsTableSinkTotalRows),
m.sinkFactory.CreateTableSink(
m.changefeedID, spanz.TableIDToComparableSpan(tableID), m.metricsTableSinkTotalRows),
tablepb.TableStatePreparing,
startTs,
targetTs,
Expand Down
28 changes: 19 additions & 9 deletions cdc/processor/sinkmanager/table_sink_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory"
cerrors "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/pingcap/tiflow/pkg/upstream"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -183,7 +184,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndGotSomeFilteredEvents() {
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -289,7 +291,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndOneTxnFi
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -394,7 +397,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAbortWhenNoMemAndBlocked(
require.ErrorIs(suite.T(), err, cerrors.ErrFlowControllerAborted)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -520,7 +524,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndOnlyAdvanceTableSinkWhenR
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -647,7 +652,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnAndAbortWhenNoMemAndForce
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -774,7 +780,8 @@ func (suite *workerSuite) TestHandleTaskWithoutSplitTxnOnlyAdvanceTableSinkWhenR
require.Equal(suite.T(), context.Canceled, err)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -892,7 +899,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndDoNotAdvanceTableUntilMee
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -976,7 +984,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableUntilTaskIsFi
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, sink := createTableSinkWrapper(changefeedID, tableID)
wrapper, sink := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down Expand Up @@ -1047,7 +1056,8 @@ func (suite *workerSuite) TestHandleTaskWithSplitTxnAndAdvanceTableIfNoWorkload(
require.ErrorIs(suite.T(), err, context.Canceled)
}()

wrapper, _ := createTableSinkWrapper(changefeedID, tableID)
wrapper, _ := createTableSinkWrapper(
changefeedID, spanz.TableIDToComparableSpan(tableID))
lowerBoundPos := engine.Position{
StartTs: 0,
CommitTs: 1,
Expand Down
16 changes: 11 additions & 5 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/tablesink"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -63,14 +64,17 @@ func (m *mockSink) Close() error {
}

//nolint:unparam
func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) {
func createTableSinkWrapper(
changefeedID model.ChangeFeedID, span tablepb.Span,
) (*tableSinkWrapper, *mockSink) {
tableState := tablepb.TableStatePreparing
sink := newMockSink()
innerTableSink := tablesink.New[*model.RowChangedEvent](changefeedID, tableID,
innerTableSink := tablesink.New[*model.RowChangedEvent](
changefeedID, span,
sink, &eventsink.RowChangeEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{}))
wrapper := newTableSinkWrapper(
changefeedID,
tableID,
span.TableID,
innerTableSink,
tableState,
0,
Expand All @@ -82,7 +86,8 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table
func TestTableSinkWrapperClose(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper, _ := createTableSinkWrapper(
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1))
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
wrapper.close(context.Background())
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
Expand All @@ -91,7 +96,8 @@ func TestTableSinkWrapperClose(t *testing.T) {
func TestUpdateReceivedSorterResolvedTs(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper, _ := createTableSinkWrapper(
model.DefaultChangeFeedID("1"), spanz.TableIDToComparableSpan(1))
wrapper.updateReceivedSorterResolvedTs(100)
require.Equal(t, uint64(100), wrapper.getReceivedSorterResolvedTs())
require.Equal(t, tablepb.TableStatePrepared, wrapper.getState())
Expand Down
9 changes: 6 additions & 3 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"strings"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/blackhole"
"github.com/pingcap/tiflow/cdc/sinkv2/eventsink/cloudstorage"
Expand Down Expand Up @@ -92,14 +93,16 @@ func New(ctx context.Context,
}

// CreateTableSink creates a TableSink by schema.
func (s *SinkFactory) CreateTableSink(changefeedID model.ChangeFeedID, tableID model.TableID, totalRowsCounter prometheus.Counter) tablesink.TableSink {
func (s *SinkFactory) CreateTableSink(
changefeedID model.ChangeFeedID, span tablepb.Span, totalRowsCounter prometheus.Counter,
) tablesink.TableSink {
switch s.sinkType {
case sink.RowSink:
// We have to indicate the type here, otherwise it can not be compiled.
return tablesink.New[*model.RowChangedEvent](changefeedID, tableID,
return tablesink.New[*model.RowChangedEvent](changefeedID, span,
s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter)
case sink.TxnSink:
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID,
return tablesink.New[*model.SingleTableTxn](changefeedID, span,
s.txnSink, &eventsink.TxnEventAppender{}, totalRowsCounter)
default:
panic("unknown sink type")
Expand Down
3 changes: 2 additions & 1 deletion cdc/sinkv2/eventsink/factory/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/sink"
"github.com/pingcap/tiflow/pkg/sink/kafka"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -106,7 +107,7 @@ func TestSinkFactory(t *testing.T) {
require.NotNil(t, sinkFactory.rowSink)

tableSink := sinkFactory.CreateTableSink(model.DefaultChangeFeedID("1"),
1, prometheus.NewCounter(prometheus.CounterOpts{}))
spanz.TableIDToComparableSpan(1), prometheus.NewCounter(prometheus.CounterOpts{}))
require.NotNil(t, tableSink, "table sink can be created")

err = sinkFactory.Close()
Expand Down
11 changes: 6 additions & 5 deletions cdc/sinkv2/tablesink/progress_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -56,8 +57,8 @@ type pendingResolvedTs struct {
// Every event is associated with a `eventID` which is a continuous number. `eventID`
// can be regarded as the event's offset in `pendingEvents`.
type progressTracker struct {
// tableID is the table ID of the table sink.
tableID model.TableID
// span is the span of the table sink.
span tablepb.Span

// Internal Buffer size. Modified in tests only.
bufferSize uint64
Expand Down Expand Up @@ -92,13 +93,13 @@ type progressTracker struct {
// newProgressTracker is used to create a new progress tracker.
// The last min resolved ts is set to 0.
// It means that the table sink has not started yet.
func newProgressTracker(tableID model.TableID, bufferSize uint64) *progressTracker {
func newProgressTracker(span tablepb.Span, bufferSize uint64) *progressTracker {
if bufferSize%8 != 0 {
panic("bufferSize must be align to 8 bytes")
}

return &progressTracker{
tableID: tableID,
span: span,
bufferSize: bufferSize / 8,
// It means the start of the table.
// It's Ok to use 0 here.
Expand Down Expand Up @@ -287,7 +288,7 @@ func (r *progressTracker) close(ctx context.Context) {
return
case <-blockTicker.C:
log.Warn("Close process doesn't return in time, may be stuck",
zap.Int64("tableID", r.tableID),
zap.Stringer("span", &r.span),
zap.Int("trackingCount", r.trackingCount()),
zap.Any("lastMinResolvedTs", r.advance()),
)
Expand Down
25 changes: 13 additions & 12 deletions cdc/sinkv2/tablesink/progress_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/pkg/spanz"
"github.com/stretchr/testify/require"
)

Expand All @@ -33,7 +34,7 @@ func (r *progressTracker) pendingResolvedTsEventsCount() int {
func TestNewProgressTracker(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
require.Equal(
t,
uint64(0),
Expand All @@ -45,7 +46,7 @@ func TestNewProgressTracker(t *testing.T) {
func TestAddEvent(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addEvent()
tracker.addEvent()
Expand All @@ -56,15 +57,15 @@ func TestAddResolvedTs(t *testing.T) {
t.Parallel()

// There is no event in the tracker.
tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addResolvedTs(model.NewResolvedTs(1))
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
require.Equal(t, 0, tracker.trackingCount(), "resolved ts should not be added")
require.Equal(t, uint64(3), tracker.advance().Ts, "lastMinResolvedTs should be 3")

// There is an event in the tracker.
tracker = newProgressTracker(1, defaultBufferSize)
tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(2))
tracker.addResolvedTs(model.NewResolvedTs(3))
Expand All @@ -77,7 +78,7 @@ func TestRemove(t *testing.T) {
var cb1, cb2, cb4, cb5 func()

// Only event.
tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
cb2 = tracker.addEvent()
tracker.addEvent()
Expand All @@ -86,7 +87,7 @@ func TestRemove(t *testing.T) {
require.Equal(t, 3, tracker.trackingCount(), "not advanced")

// Both event and resolved ts.
tracker = newProgressTracker(1, defaultBufferSize)
tracker = newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 = tracker.addEvent()
cb2 = tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(3))
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestRemove(t *testing.T) {
func TestCloseTracker(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down Expand Up @@ -149,7 +150,7 @@ func TestCloseTracker(t *testing.T) {
func TestCloseTrackerCancellable(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
tracker.addEvent()
Expand All @@ -174,7 +175,7 @@ func TestCloseTrackerCancellable(t *testing.T) {
func TestTrackerBufferBoundary(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, 8)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), 8)

cbs := make([]func(), 0)
for i := 0; i < 65; i++ {
Expand Down Expand Up @@ -207,7 +208,7 @@ func TestTrackerBufferBoundary(t *testing.T) {
func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down Expand Up @@ -242,7 +243,7 @@ func TestClosedTrackerDoNotAdvanceCheckpointTs(t *testing.T) {
func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down Expand Up @@ -270,7 +271,7 @@ func TestOnlyResolvedTsShouldDirectlyAdvanceCheckpointTs(t *testing.T) {
func TestShouldDirectlyUpdateResolvedTsIfNoMoreEvents(t *testing.T) {
t.Parallel()

tracker := newProgressTracker(1, defaultBufferSize)
tracker := newProgressTracker(spanz.TableIDToComparableSpan(1), defaultBufferSize)
cb1 := tracker.addEvent()
tracker.addResolvedTs(model.NewResolvedTs(1))
cb2 := tracker.addEvent()
Expand Down
Loading