Skip to content

Commit

Permalink
processor/sourcemanager(ticdc): move mounted iter into sourceManager
Browse files Browse the repository at this point in the history
  • Loading branch information
Rustin170506 committed Dec 1, 2022
1 parent 8579aaf commit 68c9a88
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 87 deletions.
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,9 +847,9 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh)
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, sortEngine, p.errCh)
sinkManager, err := sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.redoManager,
sortEngine, p.mg, p.errCh, p.metricsTableSinkTotalRows)
p.sourceManager, p.errCh, p.metricsTableSinkTotalRows)
// Bind them so that sourceManager can notify sinkManager.
p.sourceManager.OnResolve(sinkManager.UpdateReceivedSorterResolvedTs)
if err != nil {
Expand Down
35 changes: 17 additions & 18 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/pipeline"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
Expand Down Expand Up @@ -70,8 +70,8 @@ type SinkManager struct {
eventCache *redoEventCache
// redoManager is used to report the resolved ts of the table if redo log is enabled.
redoManager redo.LogManager
// sortEngine is used by the sink manager to fetch data.
sortEngine engine.SortEngine
// sourceManager is used by the sink manager to fetch data.
sourceManager *sourcemanager.SourceManager

// sinkFactory used to create table sink.
sinkFactory *factory.SinkFactory
Expand Down Expand Up @@ -108,8 +108,7 @@ func New(
changefeedInfo *model.ChangeFeedInfo,
up *upstream.Upstream,
redoManager redo.LogManager,
sortEngine engine.SortEngine,
mg entry.MounterGroup,
sourceManager *sourcemanager.SourceManager,
errChan chan error,
metricsTableSinkTotalRows prometheus.Counter,
) (*SinkManager, error) {
Expand All @@ -125,13 +124,13 @@ func New(

ctx, cancel := context.WithCancel(ctx)
m := &SinkManager{
changefeedID: changefeedID,
ctx: ctx,
cancel: cancel,
up: up,
memQuota: newMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota),
sinkFactory: tableSinkFactory,
sortEngine: sortEngine,
changefeedID: changefeedID,
ctx: ctx,
cancel: cancel,
up: up,
memQuota: newMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota),
sinkFactory: tableSinkFactory,
sourceManager: sourceManager,

engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize),

Expand All @@ -151,7 +150,7 @@ func New(
m.eventCache = newRedoEventCache(changefeedID, changefeedInfo.Config.MemoryQuota/3)
}

m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startWorkers(changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startGenerateTasks()
m.backgroundGC()

Expand All @@ -164,9 +163,9 @@ func New(
}

// start all workers and report the error to the error channel.
func (m *SinkManager) startWorkers(mg entry.MounterGroup, splitTxn bool, enableOldValue bool) {
func (m *SinkManager) startWorkers(splitTxn bool, enableOldValue bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, mg, m.sortEngine, m.memQuota,
w := newSinkWorker(m.changefeedID, m.sourceManager, m.memQuota,
m.eventCache, splitTxn, enableOldValue)
m.sinkWorkers = append(m.sinkWorkers, w)
m.wg.Add(1)
Expand Down Expand Up @@ -195,7 +194,7 @@ func (m *SinkManager) startWorkers(mg entry.MounterGroup, splitTxn bool, enableO
}

for i := 0; i < redoWorkerNum; i++ {
w := newRedoWorker(m.changefeedID, mg, m.sortEngine, m.memQuota,
w := newRedoWorker(m.changefeedID, m.sourceManager, m.memQuota,
m.redoManager, m.eventCache, splitTxn, enableOldValue)
m.redoWorkers = append(m.redoWorkers, w)
m.wg.Add(1)
Expand Down Expand Up @@ -280,7 +279,7 @@ func (m *SinkManager) backgroundGC() {
zap.String("changefeed", m.changefeedID.ID))
return
case gcEvent := <-m.engineGCChan:
if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
if err := m.sourceManager.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
log.Error("Failed to clean table in sort engine",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down Expand Up @@ -659,7 +658,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) pipeline.Stats {
if m.redoManager != nil {
resolvedTs = m.redoManager.GetResolvedTs(tableID)
} else {
resolvedTs = m.sortEngine.GetResolvedTs(tableID)
resolvedTs = m.sourceManager.GetTableResolvedTs(tableID)
}
return pipeline.Stats{
CheckpointTs: resolvedMark,
Expand Down
45 changes: 25 additions & 20 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory"
mockengine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock"
Expand Down Expand Up @@ -53,14 +54,16 @@ func createManagerWithMemEngine(
changefeedID model.ChangeFeedID,
changefeedInfo *model.ChangeFeedInfo,
errChan chan error,
) *SinkManager {
) (*SinkManager, engine.SortEngine) {
sortEngine := memory.New(context.Background())
up := upstream.NewUpstream4Test(&mockPD{})
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan)
manager, err := New(
ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}),
nil, sortEngine, &entry.MockMountGroup{},
ctx, changefeedID, changefeedInfo, up,
nil, sm,
errChan, prometheus.NewCounter(prometheus.CounterOpts{}))
require.NoError(t, err)
return manager
return manager, sortEngine
}

// nolint:revive
Expand All @@ -74,9 +77,11 @@ func createManagerWithMockEngine(
) (*SinkManager, *mockengine.MockSortEngine) {
ctrl := gomock.NewController(t)
sortEngine := mockengine.NewMockSortEngine(ctrl)
up := upstream.NewUpstream4Test(&mockPD{})
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan)
manager, err := New(
ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}),
nil, sortEngine, &entry.MockMountGroup{},
ctx, changefeedID, changefeedInfo, up,
nil, sm,
errChan, prometheus.NewCounter(prometheus.CounterOpts{}))
require.NoError(t, err)
return manager, sortEngine
Expand Down Expand Up @@ -160,7 +165,7 @@ func TestAddTable(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -190,7 +195,7 @@ func TestRemoveTable(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand All @@ -202,7 +207,7 @@ func TestRemoveTable(t *testing.T) {
require.NotNil(t, tableSink)
err := manager.StartTable(tableID, 0)
require.NoError(t, err)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)

Expand Down Expand Up @@ -232,7 +237,7 @@ func TestUpdateBarrierTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand All @@ -250,14 +255,14 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
err := manager.StartTable(tableID, 0)
Expand All @@ -278,14 +283,14 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
// This would happen when the table just added to this node and redo log is enabled.
// So there is possibility that the resolved ts is smaller than the global barrier ts.
manager.UpdateBarrierTs(4)
Expand All @@ -308,14 +313,14 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)

manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
Expand All @@ -335,10 +340,10 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)

Expand All @@ -356,7 +361,7 @@ func TestClose(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))

err := manager.Close()
require.NoError(t, err)
Expand All @@ -382,7 +387,7 @@ func TestGetTableStats(t *testing.T) {
mockEngine.EXPECT().GetResolvedTs(tableID).AnyTimes()

manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, mockEngine, tableID)
// This would happen when the table just added to this node and redo log is enabled.
// So there is possibility that the resolved ts is smaller than the global barrier ts.
manager.UpdateBarrierTs(4)
Expand Down
16 changes: 5 additions & 11 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/redo"
"go.uber.org/zap"
)

type redoWorker struct {
changefeedID model.ChangeFeedID
mg entry.MounterGroup
sortEngine engine.SortEngine
sourceManager *sourcemanager.SourceManager
memQuota *memQuota
redoManager redo.LogManager
eventCache *redoEventCache
Expand All @@ -38,8 +37,7 @@ type redoWorker struct {

func newRedoWorker(
changefeedID model.ChangeFeedID,
mg entry.MounterGroup,
sortEngine engine.SortEngine,
sourceManager *sourcemanager.SourceManager,
quota *memQuota,
redoManager redo.LogManager,
eventCache *redoEventCache,
Expand All @@ -48,8 +46,7 @@ func newRedoWorker(
) *redoWorker {
return &redoWorker{
changefeedID: changefeedID,
mg: mg,
sortEngine: sortEngine,
sourceManager: sourceManager,
memQuota: quota,
redoManager: redoManager,
eventCache: eventCache,
Expand Down Expand Up @@ -164,10 +161,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error {
}

upperBound := task.getUpperBound()
iter := engine.NewMountedEventIter(
w.sortEngine.FetchByTable(task.tableID, task.lowerBound, upperBound),
w.mg, 256)

iter := w.sourceManager.FetchByTable(task.tableID, task.lowerBound, upperBound)
defer func() {
if err := iter.Close(); err != nil {
log.Error("sink redo worker fails to close iterator",
Expand Down
21 changes: 8 additions & 13 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type sinkWorker struct {
changefeedID model.ChangeFeedID
mg entry.MounterGroup
sortEngine engine.SortEngine
memQuota *memQuota
eventCache *redoEventCache
changefeedID model.ChangeFeedID
sourceManager *sourcemanager.SourceManager
memQuota *memQuota
eventCache *redoEventCache
// splitTxn indicates whether to split the transaction into multiple batches.
splitTxn bool
// enableOldValue indicates whether to enable the old value feature.
Expand All @@ -45,17 +44,15 @@ type sinkWorker struct {
// newWorker creates a new worker.
func newSinkWorker(
changefeedID model.ChangeFeedID,
mg entry.MounterGroup,
sortEngine engine.SortEngine,
sourceManager *sourcemanager.SourceManager,
quota *memQuota,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *sinkWorker {
return &sinkWorker{
changefeedID: changefeedID,
mg: mg,
sortEngine: sortEngine,
sourceManager: sourceManager,
memQuota: quota,
eventCache: eventCache,
splitTxn: splitTxn,
Expand Down Expand Up @@ -145,9 +142,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (err error)

// lowerBound and upperBound are both closed intervals.
allEventSize := 0
iter := engine.NewMountedEventIter(
w.sortEngine.FetchByTable(task.tableID, lowerBound, upperBound),
w.mg, 256)
iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound)
defer func() {
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
if err := iter.Close(); err != nil {
Expand Down
Loading

0 comments on commit 68c9a88

Please sign in to comment.