Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pipeline(ticdc): split dataflow in sinkNode #5943

Merged
merged 5 commits into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 49 additions & 16 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/cdc/sink"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -73,19 +74,25 @@ type sinkNode struct {
barrierTs model.Ts

flowController tableFlowController
redoManager redo.LogManager

replicaConfig *config.ReplicaConfig
}

func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(
tableID model.TableID, sink sink.Sink,
startTs model.Ts, targetTs model.Ts,
flowController tableFlowController,
redoManager redo.LogManager,
) *sinkNode {
sn := &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,

tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
barrierTs: startTs,
flowController: flowController,
redoManager: redoManager,
}
sn.resolvedTs.Store(model.NewResolvedTs(startTs))
sn.checkpointTs.Store(model.NewResolvedTs(startTs))
Expand Down Expand Up @@ -136,17 +143,35 @@ func (n *sinkNode) flushSink(ctx context.Context, resolved model.ResolvedTs) (er
err = n.stop(ctx)
}
}()

currentBarrierTs := atomic.LoadUint64(&n.barrierTs)
currentCheckpointTs := n.getCheckpointTs()
if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}
if resolved.Ts > n.targetTs {
resolved = model.NewResolvedTs(n.targetTs)
}

if n.redoManager != nil && n.redoManager.Enabled() {
// redo log do not support batch resolve mode
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could have explain it in more detail.

resolved = model.NewResolvedTs(resolved.ResolvedMark())
err = n.redoManager.FlushLog(ctx, n.tableID, resolved.Ts)

redoTs := n.redoManager.GetMinResolvedTs()
Rustin170506 marked this conversation as resolved.
Show resolved Hide resolved
if redoTs < currentBarrierTs {
log.Debug("redoTs should not less than current barrierTs",
zap.Int64("tableID", n.tableID),
zap.Uint64("redoTs", redoTs),
zap.Uint64("barrierTs", currentBarrierTs))
}
}

if resolved.Ts > currentBarrierTs {
resolved = model.NewResolvedTs(currentBarrierTs)
}

currentCheckpointTs := n.getCheckpointTs()
if currentCheckpointTs.EqualOrGreater(resolved) {
return nil
}

checkpoint, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolved)
if err != nil {
return errors.Trace(err)
Expand Down Expand Up @@ -177,6 +202,16 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
panic("ProcessorSyncResolvedPreEmit")
})

emitRows := func(rows ...*model.RowChangedEvent) error {
if n.redoManager != nil && n.redoManager.Enabled() {
err := n.redoManager.EmitRowChangedEvents(ctx, n.tableID, rows...)
if err != nil {
return err
}
}
return n.sink.EmitRowChangedEvents(ctx, rows...)
}

if event == nil || event.Row == nil {
log.Warn("skip emit nil event", zap.Any("event", event))
return nil
Expand All @@ -202,13 +237,13 @@ func (n *sinkNode) emitRowToSink(ctx context.Context, event *model.PolymorphicEv
return errors.Trace(err)
}
// NOTICE: Please do not change the order, the delete event always comes before the insert event.
return n.sink.EmitRowChangedEvents(ctx, deleteEvent.Row, insertEvent.Row)
return emitRows(deleteEvent.Row, insertEvent.Row)
}
// If the handle key columns are not updated, PreColumns is directly ignored.
event.Row.PreColumns = nil
}

return n.sink.EmitRowChangedEvents(ctx, event.Row)
return emitRows(event.Row)
}

// shouldSplitUpdateEvent determines if the split event is needed to align the old format based on
Expand Down Expand Up @@ -290,11 +325,9 @@ func (n *sinkNode) HandleMessage(ctx context.Context, msg pmessage.Message) (boo
failpoint.Return(false, errors.New("processor sync resolved injected error"))
})

var resolved model.ResolvedTs
resolved := model.NewResolvedTs(event.CRTs)
if event.Resolved != nil {
resolved = *(event.Resolved)
} else {
resolved = model.NewResolvedTs(event.CRTs)
}

if err := n.flushSink(ctx, resolved); err != nil {
Expand Down
21 changes: 12 additions & 9 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/pingcap/tiflow/cdc/model"
"github.com/pingcap/tiflow/cdc/redo"
"github.com/pingcap/tiflow/pkg/config"
cdcContext "github.com/pingcap/tiflow/pkg/context"
cerrors "github.com/pingcap/tiflow/pkg/errors"
Expand Down Expand Up @@ -135,7 +136,7 @@ func TestStatus(t *testing.T) {
})

// test stop at targetTs
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx, pmessage.Message{}, nil).
ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -184,7 +185,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(10), node.CheckpointTs())

// test the stop at ts command
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -222,7 +223,7 @@ func TestStatus(t *testing.T) {
require.Equal(t, uint64(2), node.CheckpointTs())

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -272,7 +273,9 @@ func TestStopStatus(t *testing.T) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1,
&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100,
&mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -314,7 +317,7 @@ func TestManyTs(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
require.Equal(t, TableStatusInitializing, node.Status())
Expand Down Expand Up @@ -487,7 +490,7 @@ func TestIgnoreEmptyRowChangeEvent(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand All @@ -512,7 +515,7 @@ func TestSplitUpdateEventWhenEnableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -574,7 +577,7 @@ func TestSplitUpdateEventWhenDisableOldValue(t *testing.T) {
},
})
sink := &mockSink{}
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{}, redo.NewDisabledManager())
node.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)

Expand Down Expand Up @@ -724,7 +727,7 @@ func TestFlushSinkReleaseFlowController(t *testing.T) {
flowController := &flushFlowController{}
sink := &flushSink{}
// sNode is a sinkNode
sNode := newSinkNode(1, sink, 0, 10, flowController)
sNode := newSinkNode(1, sink, 0, 10, flowController, redo.NewDisabledManager())
sNode.initWithReplicaConfig(pipeline.MockNodeContext4Test(ctx,
pmessage.Message{}, nil).ChangefeedVars().Info.Config)
sNode.barrierTs = 10
Expand Down
36 changes: 18 additions & 18 deletions cdc/processor/pipeline/table_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ type tableActor struct {
// backend mounter
mounter entry.Mounter
// backend tableSink
tableSink sink.Sink
redoLogEnabled bool
tableSink sink.Sink
redoManager redo.LogManager

pullerNode *pullerNode
sortNode *sorterNode
Expand Down Expand Up @@ -103,7 +103,7 @@ func NewTableActor(cdcCtx cdcContext.Context,
tableName string,
replicaInfo *model.TableReplicaInfo,
sink sink.Sink,
redoLogEnabled bool,
redoManager redo.LogManager,
targetTs model.Ts,
) (TablePipeline, error) {
config := cdcCtx.ChangefeedVars().Info.Config
Expand All @@ -124,18 +124,18 @@ func NewTableActor(cdcCtx cdcContext.Context,
wg: wg,
cancel: cancel,

tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upstream: up,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoLogEnabled: redoLogEnabled,
targetTs: targetTs,
started: false,
tableID: tableID,
markTableID: replicaInfo.MarkTableID,
tableName: tableName,
memoryQuota: serverConfig.GetGlobalServerConfig().PerTableMemoryQuota,
upstream: up,
mounter: mounter,
replicaInfo: replicaInfo,
replicaConfig: config,
tableSink: sink,
redoManager: redoManager,
targetTs: targetTs,
started: false,

changefeedID: changefeedVars.ID,
changefeedVars: changefeedVars,
Expand Down Expand Up @@ -279,7 +279,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {
zap.String("tableName", t.tableName),
zap.Uint64("quota", t.memoryQuota))

flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoLogEnabled)
flowController := flowcontrol.NewTableFlowController(t.memoryQuota, t.redoManager.Enabled())
sorterNode := newSorterNode(t.tableName, t.tableID,
t.replicaInfo.StartTs, flowController,
t.mounter, t.replicaConfig,
Expand Down Expand Up @@ -315,7 +315,7 @@ func (t *tableActor) start(sdtTableContext context.Context) error {

actorSinkNode := newSinkNode(t.tableID, t.tableSink,
t.replicaInfo.StartTs,
t.targetTs, flowController)
t.targetTs, flowController, t.redoManager)
actorSinkNode.initWithReplicaConfig(t.replicaConfig)
t.sinkNode = actorSinkNode

Expand Down Expand Up @@ -389,7 +389,7 @@ func (t *tableActor) ResolvedTs() model.Ts {
// will be able to cooperate replication status directly. Then we will add
// another replication barrier for consistent replication instead of reusing
// the global resolved-ts.
if redo.IsConsistentEnabled(t.replicaConfig.Consistent.Level) {
if t.redoManager.Enabled() {
return t.sinkNode.ResolvedTs()
}
return t.sortNode.ResolvedTs()
Expand Down
38 changes: 23 additions & 15 deletions cdc/processor/pipeline/table_actor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,14 @@ func TestAsyncStopFailed(t *testing.T) {
}()

tbl := &tableActor{
stopped: 0,
tableID: 1,
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
sinkNode: newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}),
stopped: 0,
tableID: 1,
router: tableActorRouter,
redoManager: redo.NewDisabledManager(),
cancel: func() {},
reportErr: func(err error) {},
}
tbl.sinkNode = newSinkNode(1, &mockSink{}, 0, 0, &mockFlowController{}, tbl.redoManager)
require.True(t, tbl.AsyncStop(1))

mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
Expand All @@ -67,6 +68,7 @@ func TestTableActorInterface(t *testing.T) {
tbl := &tableActor{
markTableID: 2,
tableID: 1,
redoManager: redo.NewDisabledManager(),
sinkNode: sink,
sortNode: sorter,
tableName: "t1",
Expand All @@ -90,6 +92,9 @@ func TestTableActorInterface(t *testing.T) {

require.Equal(t, model.Ts(5), tbl.ResolvedTs())
tbl.replicaConfig.Consistent.Level = string(redo.ConsistentLevelEventual)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tbl.redoManager, _ = redo.NewMockManager(ctx)
sink.resolvedTs.Store(model.NewResolvedTs(6))
require.Equal(t, model.Ts(6), tbl.ResolvedTs())
}
Expand All @@ -105,11 +110,12 @@ func TestTableActorCancel(t *testing.T) {
}()

tbl := &tableActor{
stopped: 0,
tableID: 1,
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
stopped: 0,
tableID: 1,
redoManager: redo.NewDisabledManager(),
router: tableActorRouter,
cancel: func() {},
reportErr: func(err error) {},
}
mb := actor.NewMailbox[pmessage.Message](actor.ID(1), 0)
tbl.actorID = actor.ID(1)
Expand All @@ -122,7 +128,7 @@ func TestTableActorCancel(t *testing.T) {
func TestTableActorWait(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
eg, _ := errgroup.WithContext(ctx)
tbl := &tableActor{wg: eg}
tbl := &tableActor{wg: eg, redoManager: redo.NewDisabledManager()}
wg := sync.WaitGroup{}
wg.Add(1)
stopped := false
Expand All @@ -140,6 +146,7 @@ func TestHandleError(t *testing.T) {
canceled := false
reporterErr := false
tbl := &tableActor{
redoManager: redo.NewDisabledManager(),
cancel: func() {
canceled = true
},
Expand Down Expand Up @@ -358,7 +365,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, false, 10)
}, &mockSink{}, redo.NewDisabledManager(), 10)
require.NotNil(t, tbl)
require.Nil(t, err)
require.NotPanics(t, func() {
Expand All @@ -374,7 +381,7 @@ func TestNewTableActor(t *testing.T) {
&model.TableReplicaInfo{
StartTs: 0,
MarkTableID: 1,
}, &mockSink{}, false, 10)
}, &mockSink{}, redo.NewDisabledManager(), 10)
require.Nil(t, tbl)
require.NotNil(t, err)

Expand Down Expand Up @@ -403,7 +410,8 @@ func TestTableActorStart(t *testing.T) {
return nil
}
tbl := &tableActor{
globalVars: globalVars,
redoManager: redo.NewDisabledManager(),
globalVars: globalVars,
changefeedVars: &cdcContext.ChangefeedVars{
ID: model.DefaultChangeFeedID("changefeed-id-test"),
Info: &model.ChangeFeedInfo{
Expand Down
Loading