Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

processor/sourcemanager(ticdc): move mounted iter into sourceManager #7769

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,9 @@ func (p *processor) lazyInitImpl(ctx cdcContext.Context) error {
zap.Duration("duration", time.Since(start)))
return errors.Trace(err)
}
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, sortEngine, p.errCh)
p.sourceManager = sourcemanager.New(p.changefeedID, p.upstream, p.mg, sortEngine, p.errCh)
sinkManager, err := sinkmanager.New(stdCtx, p.changefeedID, p.changefeed.Info, p.upstream, p.redoManager,
sortEngine, p.mg, p.errCh, p.metricsTableSinkTotalRows)
p.sourceManager, p.errCh, p.metricsTableSinkTotalRows)
// Bind them so that sourceManager can notify sinkManager.
p.sourceManager.OnResolve(sinkManager.UpdateReceivedSorterResolvedTs)
if err != nil {
Expand Down
35 changes: 17 additions & 18 deletions cdc/processor/sinkmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/tablepb"
"github.com/pingcap/tiflow/cdc/redo"
Expand Down Expand Up @@ -79,8 +79,8 @@ type SinkManager struct {
eventCache *redoEventCache
// redoManager is used to report the resolved ts of the table if redo log is enabled.
redoManager redo.LogManager
// sortEngine is used by the sink manager to fetch data.
sortEngine engine.SortEngine
// sourceManager is used by the sink manager to fetch data.
sourceManager *sourcemanager.SourceManager

// sinkFactory used to create table sink.
sinkFactory *factory.SinkFactory
Expand Down Expand Up @@ -117,8 +117,7 @@ func New(
changefeedInfo *model.ChangeFeedInfo,
up *upstream.Upstream,
redoManager redo.LogManager,
sortEngine engine.SortEngine,
mg entry.MounterGroup,
sourceManager *sourcemanager.SourceManager,
errChan chan error,
metricsTableSinkTotalRows prometheus.Counter,
) (*SinkManager, error) {
Expand All @@ -134,13 +133,13 @@ func New(

ctx, cancel := context.WithCancel(ctx)
m := &SinkManager{
changefeedID: changefeedID,
ctx: ctx,
cancel: cancel,
up: up,
memQuota: newMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota),
sinkFactory: tableSinkFactory,
sortEngine: sortEngine,
changefeedID: changefeedID,
ctx: ctx,
cancel: cancel,
up: up,
memQuota: newMemQuota(changefeedID, changefeedInfo.Config.MemoryQuota),
sinkFactory: tableSinkFactory,
sourceManager: sourceManager,

engineGCChan: make(chan *gcEvent, defaultEngineGCChanSize),

Expand All @@ -160,7 +159,7 @@ func New(
m.eventCache = newRedoEventCache(changefeedID, changefeedInfo.Config.MemoryQuota/3)
}

m.startWorkers(mg, changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startWorkers(changefeedInfo.Config.Sink.TxnAtomicity.ShouldSplitTxn(), changefeedInfo.Config.EnableOldValue)
m.startGenerateTasks()
m.backgroundGC()

Expand All @@ -173,9 +172,9 @@ func New(
}

// start all workers and report the error to the error channel.
func (m *SinkManager) startWorkers(mg entry.MounterGroup, splitTxn bool, enableOldValue bool) {
func (m *SinkManager) startWorkers(splitTxn bool, enableOldValue bool) {
for i := 0; i < sinkWorkerNum; i++ {
w := newSinkWorker(m.changefeedID, mg, m.sortEngine, m.memQuota,
w := newSinkWorker(m.changefeedID, m.sourceManager, m.memQuota,
m.eventCache, splitTxn, enableOldValue)
m.sinkWorkers = append(m.sinkWorkers, w)
m.wg.Add(1)
Expand Down Expand Up @@ -204,7 +203,7 @@ func (m *SinkManager) startWorkers(mg entry.MounterGroup, splitTxn bool, enableO
}

for i := 0; i < redoWorkerNum; i++ {
w := newRedoWorker(m.changefeedID, mg, m.sortEngine, m.memQuota,
w := newRedoWorker(m.changefeedID, m.sourceManager, m.memQuota,
m.redoManager, m.eventCache, splitTxn, enableOldValue)
m.redoWorkers = append(m.redoWorkers, w)
m.wg.Add(1)
Expand Down Expand Up @@ -289,7 +288,7 @@ func (m *SinkManager) backgroundGC() {
zap.String("changefeed", m.changefeedID.ID))
return
case gcEvent := <-m.engineGCChan:
if err := m.sortEngine.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
if err := m.sourceManager.CleanByTable(gcEvent.tableID, gcEvent.cleanPos); err != nil {
log.Error("Failed to clean table in sort engine",
zap.String("namespace", m.changefeedID.Namespace),
zap.String("changefeed", m.changefeedID.ID),
Expand Down Expand Up @@ -668,7 +667,7 @@ func (m *SinkManager) GetTableStats(tableID model.TableID) TableStats {
if m.redoManager != nil {
resolvedTs = m.redoManager.GetResolvedTs(tableID)
} else {
resolvedTs = m.sortEngine.GetResolvedTs(tableID)
resolvedTs = m.sourceManager.GetTableResolvedTs(tableID)
}
return TableStats{
CheckpointTs: resolvedMark,
Expand Down
45 changes: 25 additions & 20 deletions cdc/processor/sinkmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/mock/gomock"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/memory"
mockengine "github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine/mock"
Expand Down Expand Up @@ -53,14 +54,16 @@ func createManagerWithMemEngine(
changefeedID model.ChangeFeedID,
changefeedInfo *model.ChangeFeedInfo,
errChan chan error,
) *SinkManager {
) (*SinkManager, engine.SortEngine) {
sortEngine := memory.New(context.Background())
up := upstream.NewUpstream4Test(&mockPD{})
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan)
manager, err := New(
ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}),
nil, sortEngine, &entry.MockMountGroup{},
ctx, changefeedID, changefeedInfo, up,
nil, sm,
errChan, prometheus.NewCounter(prometheus.CounterOpts{}))
require.NoError(t, err)
return manager
return manager, sortEngine
}

// nolint:revive
Expand All @@ -74,9 +77,11 @@ func createManagerWithMockEngine(
) (*SinkManager, *mockengine.MockSortEngine) {
ctrl := gomock.NewController(t)
sortEngine := mockengine.NewMockSortEngine(ctrl)
up := upstream.NewUpstream4Test(&mockPD{})
sm := sourcemanager.New(changefeedID, up, &entry.MockMountGroup{}, sortEngine, errChan)
manager, err := New(
ctx, changefeedID, changefeedInfo, upstream.NewUpstream4Test(&mockPD{}),
nil, sortEngine, &entry.MockMountGroup{},
ctx, changefeedID, changefeedInfo, up,
nil, sm,
errChan, prometheus.NewCounter(prometheus.CounterOpts{}))
require.NoError(t, err)
return manager, sortEngine
Expand Down Expand Up @@ -160,7 +165,7 @@ func TestAddTable(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand Down Expand Up @@ -190,7 +195,7 @@ func TestRemoveTable(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand All @@ -202,7 +207,7 @@ func TestRemoveTable(t *testing.T) {
require.NotNil(t, tableSink)
err := manager.StartTable(tableID, 0)
require.NoError(t, err)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)

Expand Down Expand Up @@ -232,7 +237,7 @@ func TestUpdateBarrierTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
Expand All @@ -250,14 +255,14 @@ func TestGenerateTableSinkTaskWithBarrierTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
err := manager.StartTable(tableID, 0)
Expand All @@ -278,14 +283,14 @@ func TestGenerateTableSinkTaskWithResolvedTs(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
// This would happen when the table just added to this node and redo log is enabled.
// So there is possibility that the resolved ts is smaller than the global barrier ts.
manager.UpdateBarrierTs(4)
Expand All @@ -308,14 +313,14 @@ func TestGetTableStatsToReleaseMemQuota(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
defer func() {
err := manager.Close()
require.NoError(t, err)
}()
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)

manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)
Expand All @@ -335,10 +340,10 @@ func TestDoNotGenerateTableSinkTaskWhenTableIsNotReplicating(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, e := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
tableID := model.TableID(1)
manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, e, tableID)
manager.UpdateBarrierTs(4)
manager.UpdateReceivedSorterResolvedTs(tableID, 5)

Expand All @@ -356,7 +361,7 @@ func TestClose(t *testing.T) {
defer cancel()

changefeedInfo := getChangefeedInfo()
manager := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))
manager, _ := createManagerWithMemEngine(t, ctx, model.DefaultChangeFeedID("1"), changefeedInfo, make(chan error, 1))

err := manager.Close()
require.NoError(t, err)
Expand All @@ -382,7 +387,7 @@ func TestGetTableStats(t *testing.T) {
mockEngine.EXPECT().GetResolvedTs(tableID).AnyTimes()

manager.AddTable(tableID, 1, 100)
addTableAndAddEventsToSortEngine(t, manager.sortEngine, tableID)
addTableAndAddEventsToSortEngine(t, mockEngine, tableID)
// This would happen when the table just added to this node and redo log is enabled.
// So there is possibility that the resolved ts is smaller than the global barrier ts.
manager.UpdateBarrierTs(4)
Expand Down
16 changes: 5 additions & 11 deletions cdc/processor/sinkmanager/redo_log_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/pingcap/tiflow/cdc/redo"
"go.uber.org/zap"
)

type redoWorker struct {
changefeedID model.ChangeFeedID
mg entry.MounterGroup
sortEngine engine.SortEngine
sourceManager *sourcemanager.SourceManager
memQuota *memQuota
redoManager redo.LogManager
eventCache *redoEventCache
Expand All @@ -38,8 +37,7 @@ type redoWorker struct {

func newRedoWorker(
changefeedID model.ChangeFeedID,
mg entry.MounterGroup,
sortEngine engine.SortEngine,
sourceManager *sourcemanager.SourceManager,
quota *memQuota,
redoManager redo.LogManager,
eventCache *redoEventCache,
Expand All @@ -48,8 +46,7 @@ func newRedoWorker(
) *redoWorker {
return &redoWorker{
changefeedID: changefeedID,
mg: mg,
sortEngine: sortEngine,
sourceManager: sourceManager,
memQuota: quota,
redoManager: redoManager,
eventCache: eventCache,
Expand Down Expand Up @@ -164,10 +161,7 @@ func (w *redoWorker) handleTask(ctx context.Context, task *redoTask) error {
}

upperBound := task.getUpperBound()
iter := engine.NewMountedEventIter(
w.sortEngine.FetchByTable(task.tableID, task.lowerBound, upperBound),
w.mg, 256)

iter := w.sourceManager.FetchByTable(task.tableID, task.lowerBound, upperBound)
defer func() {
if err := iter.Close(); err != nil {
log.Error("sink redo worker fails to close iterator",
Expand Down
21 changes: 8 additions & 13 deletions cdc/processor/sinkmanager/table_sink_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/entry"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager"
"github.com/pingcap/tiflow/cdc/processor/sourcemanager/engine"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)

type sinkWorker struct {
changefeedID model.ChangeFeedID
mg entry.MounterGroup
sortEngine engine.SortEngine
memQuota *memQuota
eventCache *redoEventCache
changefeedID model.ChangeFeedID
sourceManager *sourcemanager.SourceManager
memQuota *memQuota
eventCache *redoEventCache
// splitTxn indicates whether to split the transaction into multiple batches.
splitTxn bool
// enableOldValue indicates whether to enable the old value feature.
Expand All @@ -45,17 +44,15 @@ type sinkWorker struct {
// newWorker creates a new worker.
func newSinkWorker(
changefeedID model.ChangeFeedID,
mg entry.MounterGroup,
sortEngine engine.SortEngine,
sourceManager *sourcemanager.SourceManager,
quota *memQuota,
eventCache *redoEventCache,
splitTxn bool,
enableOldValue bool,
) *sinkWorker {
return &sinkWorker{
changefeedID: changefeedID,
mg: mg,
sortEngine: sortEngine,
sourceManager: sourceManager,
memQuota: quota,
eventCache: eventCache,
splitTxn: splitTxn,
Expand Down Expand Up @@ -145,9 +142,7 @@ func (w *sinkWorker) handleTask(ctx context.Context, task *sinkTask) (err error)

// lowerBound and upperBound are both closed intervals.
allEventSize := 0
iter := engine.NewMountedEventIter(
w.sortEngine.FetchByTable(task.tableID, lowerBound, upperBound),
w.mg, 256)
iter := w.sourceManager.FetchByTable(task.tableID, lowerBound, upperBound)
defer func() {
w.metricRedoEventCacheMiss.Add(float64(allEventSize))
if err := iter.Close(); err != nil {
Expand Down
Loading