Skip to content

Commit

Permalink
sinkmanager(ticdc): consider table state when generate tasks (#7591)
Browse files Browse the repository at this point in the history
ref #5928
  • Loading branch information
Rustin170506 authored Nov 14, 2022
1 parent b098b57 commit e0fda20
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 4 deletions.
2 changes: 2 additions & 0 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Manager interface {
// AddTable adds a table(TableSink) to the sink manager.
// Sink manager will create a new table sink and start it.
AddTable(tableID model.TableID, startTs model.Ts, targetTs model.Ts)
// StartTable starts a table sin. Make it replicating data to downstream.
StartTable(tableID model.TableID)
// RemoveTable removes a table(TableSink) from the sink manager.
// Sink manager will stop the table sink and remove it.
RemoveTable(tableID model.TableID) error
Expand Down
31 changes: 31 additions & 0 deletions cdc/processor/sinkmanager/manager_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ func (m *ManagerImpl) generateTableSinkFetchTask() error {
case <-m.ctx.Done():
return m.ctx.Err()
case <-m.taskTicker.C:
// No more tables.
if m.progressHeap.len() == 0 {
continue
}
slowestTableProgress := m.progressHeap.pop()
tableID := slowestTableProgress.tableID
tableSink, ok := m.tableSinks.Load(tableID)
Expand All @@ -190,6 +194,19 @@ func (m *ManagerImpl) generateTableSinkFetchTask() error {
// So we do **not** need add it back to the heap.
continue
}
tableState := tableSink.(*tableSinkWrapper).getState()
if tableState < tablepb.TableStateReplicating {
log.Panic("Tables that are not started should not appear in the progress heap",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
// It means table sink is stopping or stopped.
// We should skip it and do not push it back.
// Because there is no case that stopping/stopped -> replicating.
if tableState > tablepb.TableStateReplicating {
continue
}
// We use the barrier ts as the upper bound of the fetch tableSinkTask.
// Because it can not exceed the barrier ts.
// We also need to consider the resolved ts from sorter,
Expand Down Expand Up @@ -273,9 +290,23 @@ func (m *ManagerImpl) AddTable(tableID model.TableID, startTs model.Ts, targetTs
tableID,
m.sinkFactory.CreateTableSink(m.changefeedID, tableID, m.metricsTableSinkTotalRows),
tablepb.TableStatePreparing,
startTs,
targetTs,
)
m.tableSinks.Store(tableID, sinkWrapper)
}

// StartTable sets the table(TableSink) state to replicating.
func (m *ManagerImpl) StartTable(tableID model.TableID) {
tableSink, ok := m.tableSinks.Load(tableID)
if !ok {
log.Panic("Table sink not found when starting table stats",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
zap.Int64("tableID", tableID))
}
tableSink.(*tableSinkWrapper).start()
startTs := tableSink.(*tableSinkWrapper).startTs
initProgress := &progress{
tableID: tableID,
nextLowerBoundPos: sorter.Position{StartTs: startTs - 1, CommitTs: startTs},
Expand Down
27 changes: 26 additions & 1 deletion cdc/processor/sinkmanager/manager_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func TestAddTable(t *testing.T) {
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, 0, manager.progressHeap.len(), "Not started table shout not in progress heap")
manager.StartTable(tableID)
require.Equal(t, &progress{
tableID: tableID,
nextLowerBoundPos: sorter.Position{
Expand Down Expand Up @@ -184,6 +186,7 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
manager.StartTable(tableID)

require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(tableID)
Expand All @@ -208,6 +211,7 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
// So there is possibility that the resolved ts is smaller than the global barrier ts.
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 3)
manager.StartTable(tableID)

require.Eventually(t, func() bool {
tableSink, ok := manager.tableSinks.Load(tableID)
Expand All @@ -228,9 +232,9 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID)

manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
manager.StartTable(tableID)

require.Eventually(t, func() bool {
s, err := manager.GetTableStats(tableID)
Expand All @@ -239,6 +243,27 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)
}

func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
t.Parallel()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManager(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSorterEngine(t, manager.sortEngine, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)

require.Equal(t, uint64(0), manager.memQuota.getUsedBytes())
tableSink, ok := manager.tableSinks.Load(tableID)
require.True(t, ok)
require.NotNil(t, tableSink)
require.Equal(t, uint64(0), tableSink.(*tableSinkWrapper).getCheckpointTs().Ts)
}

func TestClose(t *testing.T) {
t.Parallel()

Expand Down
4 changes: 3 additions & 1 deletion cdc/processor/sinkmanager/table_progress_heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ func (p *progressHeap) Len() int {
}

func (p *progressHeap) Less(i, j int) bool {
return p.heap[i].nextLowerBoundPos.Compare(p.heap[j].nextLowerBoundPos) == -1
a := p.heap[i]
b := p.heap[j]
return a.nextLowerBoundPos.Compare(b.nextLowerBoundPos) == -1
}

func (p *progressHeap) Swap(i, j int) {
Expand Down
11 changes: 11 additions & 0 deletions cdc/processor/sinkmanager/table_sink_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type tableSinkWrapper struct {
tableSink sinkv2.TableSink
// state used to control the lifecycle of the table.
state *tablepb.TableState
// startTs is the start ts of the table.
startTs model.Ts
// targetTs is the upper bound of the table sink.
targetTs model.Ts
// receivedSorterResolvedTs is the resolved ts received from the sorter.
Expand All @@ -51,22 +53,31 @@ func newTableSinkWrapper(
tableID model.TableID,
tableSink sinkv2.TableSink,
state tablepb.TableState,
startTs model.Ts,
targetTs model.Ts,
) *tableSinkWrapper {
return &tableSinkWrapper{
changefeed: changefeed,
tableID: tableID,
tableSink: tableSink,
state: &state,
startTs: startTs,
targetTs: targetTs,
}
}

func (t *tableSinkWrapper) start() {
t.state.Store(tablepb.TableStateReplicating)
}

func (t *tableSinkWrapper) appendRowChangedEvents(events ...*model.RowChangedEvent) {
t.tableSink.AppendRowChangedEvents(events...)
}

func (t *tableSinkWrapper) updateReceivedSorterResolvedTs(ts model.Ts) {
if t.state.Load() == tablepb.TableStatePreparing && ts > t.startTs {
t.state.Store(tablepb.TableStatePrepared)
}
t.receivedSorterResolvedTs.Store(ts)
}

Expand Down
6 changes: 4 additions & 2 deletions cdc/processor/sinkmanager/table_sink_wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (m *mockSink) Close() error {

//nolint:unparam
func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.TableID) (*tableSinkWrapper, *mockSink) {
tableState := tablepb.TableStateReplicating
tableState := tablepb.TableStatePreparing
sink := newMockSink()
innerTableSink := tablesink.New[*model.RowChangedEvent](changefeedID, tableID,
sink, &eventsink.RowChangeEventAppender{}, prometheus.NewCounter(prometheus.CounterOpts{}))
Expand All @@ -59,6 +59,7 @@ func createTableSinkWrapper(changefeedID model.ChangeFeedID, tableID model.Table
tableID,
innerTableSink,
tableState,
0,
100,
)
return wrapper, sink
Expand All @@ -68,7 +69,7 @@ func TestTableSinkWrapperClose(t *testing.T) {
t.Parallel()

wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
require.Equal(t, tablepb.TableStateReplicating, wrapper.getState())
require.Equal(t, tablepb.TableStatePreparing, wrapper.getState())
require.ErrorIs(t, cerror.ErrTableProcessorStoppedSafely, errors.Cause(wrapper.close(context.Background())))
require.Equal(t, tablepb.TableStateStopped, wrapper.getState(), "table sink state should be stopped")
}
Expand All @@ -79,6 +80,7 @@ func TestUpdateReceivedSorterResolvedTs(t *testing.T) {
wrapper, _ := createTableSinkWrapper(model.DefaultChangeFeedID("1"), 1)
wrapper.updateReceivedSorterResolvedTs(100)
require.Equal(t, uint64(100), wrapper.getReceivedSorterResolvedTs())
require.Equal(t, tablepb.TableStatePrepared, wrapper.getState())
}

func TestConvertNilRowChangedEvents(t *testing.T) {
Expand Down

0 comments on commit e0fda20

Please sign in to comment.