Skip to content

Commit

Permalink
pipeline(ticdc): split dataflow in sinkNode (pingcap#5943)
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Jul 7, 2022
1 parent be67a42 commit 32bed6c
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 117 deletions.
65 changes: 50 additions & 15 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -79,6 +80,7 @@ type sinkNode struct {
barrierTs model.Ts

flowController tableFlowController
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
isTableActorMode bool
Expand All @@ -92,16 +94,17 @@ func newSinkNode(
targetTs model.Ts,
flowController tableFlowController,
splitTxn bool,
redoManager redo.LogManager,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,
flowController: flowController,
splitTxn: splitTxn,
redoManager: redoManager,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -159,17 +162,41 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
err = n.stop(ctx)
}
}()

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

if n.redoManager != nil && n.redoManager.Enabled() {
// redo log do not support batch resolve mode, hence we
// use `ResolvedMark` to restore a normal resolved ts
resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts)

redoTs := n.redoManager.GetMinResolvedTs()
if redoTs < currentBarrierTs {
log.Debug("redoTs should not less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs))
}

// Fixme(CharlesCheung): remove this check after refactoring redoManager
if resolved.Ts > redoTs {
resolved = model.NewResolvedTs(redoTs)
}
}

if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}

currentCheckpointTs := n.getCheckpointTs()
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}

checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -200,6 +227,16 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
panic("ProcessorSyncResolvedPreEmit")
})

emitRows := func(rows ...*model.RowChangedEvent) error {
if n.redoManager != nil && n.redoManager.Enabled() {
err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, rows...)
if err != nil {
return err
}
}
return n.sink.EmitRowChangedEvents(ctx, rows...)
}

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
Expand All @@ -225,13 +262,13 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
return errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
return n.sink.EmitRowChangedEvents(ctx, deleteEvent.Row, insertEvent.Row)
return emitRows(deleteEvent.Row, insertEvent.Row)
}
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
}

return n.sink.EmitRowChangedEvents(ctx, event.Row)
return emitRows(event.Row)
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
Expand Down Expand Up @@ -323,11 +360,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

var resolved model.ResolvedTs
resolved := model.NewResolvedTs(event.CRTs)
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
Expand Down
19 changes: 10 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -175,7 +176,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -206,7 +207,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false)
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -249,7 +250,7 @@ func TestStopStatus(t *testing.T) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -287,7 +288,7 @@ func TestManyTs(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
require.Equal(t, TableStatusInitializing, node.Status())

Expand Down Expand Up @@ -449,7 +450,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// empty row, no Columns and PreColumns.
Expand All @@ -471,7 +472,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -529,7 +530,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false)
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, false, redo.NewDisabledManager())
require.Nil(t, node.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))

// nil row.
Expand Down Expand Up @@ -674,7 +675,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController, false)
sNode := newSinkNode(1, sink, 0, 10, flowController, false, redo.NewDisabledManager())
require.Nil(t, sNode.Init(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil)))
sNode.barrierTs = 10

Expand Down
7 changes: 4 additions & 3 deletions cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func NewTablePipeline(ctx cdcContext.Context,
sink sink.Sink,
targetTs model.Ts,
upstream *upstream.Upstream,
redoLogEnabled bool,
redoManager redo.LogManager,
) TablePipeline {
ctx, cancel := cdcContext.WithCancel(ctx)
changefeed := ctx.ChangefeedVars().ID
Expand All @@ -207,7 +207,7 @@ func NewTablePipeline(ctx cdcContext.Context,
splitTxn := replConfig.Sink.TxnAtomicity.ShouldSplitTxn()

flowController := flowcontrol.NewTableFlowController(perTableMemoryQuota,
redoLogEnabled, splitTxn)
redoManager.Enabled(), splitTxn)
config := ctx.ChangefeedVars().Info.Config
cyclicEnabled := config.Cyclic != nil && config.Cyclic.IsEnabled()
runnerSize := defaultRunnersSize
Expand All @@ -218,7 +218,8 @@ func NewTablePipeline(ctx cdcContext.Context,
p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs,
flowController, mounter, replConfig)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController, splitTxn)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs,
targetTs, flowController, splitTxn, redoManager)

p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName,
changefeed, upstream))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController, splitTxn)
t.targetTs, flowController, splitTxn, t.redoManager)
actorSinkNode.initWithReplicaConfig(true, t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down Expand Up @@ -436,7 +436,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// will be able to cooperate replication status directly. Then we will add
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
if t.redoManager.Enabled() {
return t.sinkNode.ResolvedTs()
}
return t.sortNode.ResolvedTs()
Expand Down
35 changes: 21 additions & 14 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ func TestAsyncStopFailed(t *testing.T) {
}()

tbl := &tableActor{
stopped: 0,
tableID: 1,
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false),
stopped: 0,
tableID: 1,
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
redoManager: redo.NewDisabledManager(),
}
tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, false, tbl.redoManager)
require.True(t, tbl.AsyncStop(1))

mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
Expand All @@ -67,6 +68,7 @@ func TestTableActorInterface(t *testing.T) {
tbl := &tableActor{
markTableID: 2,
tableID: 1,
redoManager: redo.NewDisabledManager(),
sinkNode: sink,
sortNode: sorter,
tableName: "t1",
Expand All @@ -90,6 +92,9 @@ func TestTableActorInterface(t *testing.T) {

require.Equal(t, model.Ts(5), tbl.ResolvedTs())
tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tbl.redoManager, _ = redo.NewMockManager(ctx)
sink.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
}
Expand All @@ -105,11 +110,12 @@ func TestTableActorCancel(t *testing.T) {
}()

tbl := &tableActor{
stopped: 0,
tableID: 1,
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
stopped: 0,
tableID: 1,
redoManager: redo.NewDisabledManager(),
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
}
mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
tbl.actorID = actor.ID(1)
Expand All @@ -122,7 +128,7 @@ func TestTableActorCancel(t *testing.T) {
func TestTableActorWait(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
eg, _ := errgroup.WithContext(ctx)
tbl := &tableActor{wg: eg}
tbl := &tableActor{wg: eg, redoManager: redo.NewDisabledManager()}
wg := sync.WaitGroup{}
wg.Add(1)
stopped := false
Expand All @@ -140,6 +146,7 @@ func TestHandleError(t *testing.T) {
canceled := false
reporterErr := false
tbl := &tableActor{
redoManager: redo.NewDisabledManager(),
cancel: func() {
canceled = true
},
Expand Down Expand Up @@ -403,7 +410,8 @@ func TestTableActorStart(t *testing.T) {
return nil
}
tbl := &tableActor{
globalVars: globalVars,
redoManager: redo.NewDisabledManager(),
globalVars: globalVars,
changefeedVars: &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand All @@ -414,7 +422,6 @@ func TestTableActorStart(t *testing.T) {
StartTs: 0,
MarkTableID: 1,
},
redoManager: redo.NewDisabledManager(),
replicaConfig: config.GetDefaultReplicaConfig(),
}
require.Nil(t, tbl.start(ctx))
Expand Down
4 changes: 2 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ func (p *processor) createTablePipelineImpl(
return nil, errors.Trace(err)
}

s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows, p.redoManager)
s, err := sink.NewTableSink(p.sink, tableID, p.metricsTableSinkTotalRows)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down Expand Up @@ -828,7 +828,7 @@ func (p *processor) createTablePipelineImpl(
s,
p.changefeed.Info.GetTargetTs(),
p.upStream,
p.redoManager.Enabled(),
p.redoManager,
)
}

Expand Down
Loading

0 comments on commit 32bed6c

Please sign in to comment.