Skip to content

Commit

Permalink
cdc/sink: Refine sink interface (#3613)
Browse files Browse the repository at this point in the history
  • Loading branch information
sdojjy authored Nov 26, 2021
1 parent b2baebd commit d7fa431
Show file tree
Hide file tree
Showing 19 changed files with 71 additions and 64 deletions.
2 changes: 1 addition & 1 deletion cdc/owner/async_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (m *mockSink) Close(ctx context.Context) error {
return nil
}

func (m *mockSink) Barrier(ctx context.Context) error {
func (m *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down
10 changes: 6 additions & 4 deletions cdc/processor/pipeline/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ func (s *TableStatus) Store(new TableStatus) {
}

type sinkNode struct {
sink sink.Sink
status TableStatus
sink sink.Sink
status TableStatus
tableID model.TableID

resolvedTs model.Ts
checkpointTs model.Ts
Expand All @@ -78,8 +79,9 @@ type sinkNode struct {
flowController tableFlowController
}

func newSinkNode(sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
func newSinkNode(tableID model.TableID, sink sink.Sink, startTs model.Ts, targetTs model.Ts, flowController tableFlowController) *sinkNode {
return &sinkNode{
tableID: tableID,
sink: sink,
status: TableStatusInitializing,
targetTs: targetTs,
Expand Down Expand Up @@ -136,7 +138,7 @@ func (n *sinkNode) flushSink(ctx pipeline.NodeContext, resolvedTs model.Ts) (err
if err := n.emitRow2Sink(ctx); err != nil {
return errors.Trace(err)
}
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, resolvedTs)
checkpointTs, err := n.sink.FlushRowChangedEvents(ctx, n.tableID, resolvedTs)
if err != nil {
return errors.Trace(err)
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/pipeline/sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func (s *mockSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (s *mockSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *mockSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
s.received = append(s.received, struct {
resolvedTs model.Ts
row *model.RowChangedEvent
Expand All @@ -92,7 +92,7 @@ func (s *mockSink) Close(ctx context.Context) error {
return nil
}

func (s *mockSink) Barrier(ctx context.Context) error {
func (s *mockSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -137,7 +137,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
})

// test stop at targetTs
node := newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node := newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -163,7 +163,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(10))

// test the stop at ts command
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand All @@ -186,7 +186,7 @@ func (s *outputSuite) TestStatus(c *check.C) {
c.Assert(node.CheckpointTs(), check.Equals, uint64(2))

// test the stop at ts command is after then resolvedTs and checkpointTs is greater than stop ts
node = newSinkNode(&mockSink{}, 0, 10, &mockFlowController{})
node = newSinkNode(1, &mockSink{}, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -223,7 +223,7 @@ func (s *outputSuite) TestStopStatus(c *check.C) {
})

closeCh := make(chan interface{}, 1)
node := newSinkNode(&mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
node := newSinkNode(1, &mockCloseControlSink{mockSink: mockSink{}, closeCh: closeCh}, 0, 100, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)
c.Assert(node.Receive(pipeline.MockNodeContext4Test(ctx,
Expand Down Expand Up @@ -258,7 +258,7 @@ func (s *outputSuite) TestManyTs(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)
c.Assert(node.Status(), check.Equals, TableStatusInitializing)

Expand Down Expand Up @@ -379,7 +379,7 @@ func (s *outputSuite) TestIgnoreEmptyRowChangeEvent(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// empty row, no Columns and PreColumns.
Expand All @@ -399,7 +399,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenEnableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down Expand Up @@ -458,7 +458,7 @@ func (s *outputSuite) TestSplitUpdateEventWhenDisableOldValue(c *check.C) {
},
})
sink := &mockSink{}
node := newSinkNode(sink, 0, 10, &mockFlowController{})
node := newSinkNode(1, sink, 0, 10, &mockFlowController{})
c.Assert(node.Init(pipeline.MockNodeContext4Test(ctx, pipeline.Message{}, nil)), check.IsNil)

// nil row.
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func NewTablePipeline(ctx cdcContext.Context,

p := pipeline.NewPipeline(ctx, 500*time.Millisecond, runnerSize, defaultOutputChannelSize)
sorterNode := newSorterNode(tableName, tableID, replicaInfo.StartTs, flowController, mounter)
sinkNode := newSinkNode(sink, replicaInfo.StartTs, targetTs, flowController)
sinkNode := newSinkNode(tableID, sink, replicaInfo.StartTs, targetTs, flowController)

p.AppendNode(ctx, "puller", newPullerNode(tableID, replicaInfo, tableName))
p.AppendNode(ctx, "sorter", sorterNode)
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (b *blackHoleSink) EmitRowChangedEvents(ctx context.Context, rows ...*model
return nil
}

func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *blackHoleSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("BlockHoleSink: FlushRowChangedEvents", zap.Uint64("resolvedTs", resolvedTs))
err := b.statistics.RecordBatchExecution(func() (int, error) {
// TODO: add some random replication latency
Expand Down Expand Up @@ -79,6 +79,6 @@ func (b *blackHoleSink) Close(ctx context.Context) error {
return nil
}

func (b *blackHoleSink) Barrier(ctx context.Context) error {
func (b *blackHoleSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}
5 changes: 3 additions & 2 deletions cdc/sink/buffer_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ func (b *bufferSink) run(ctx context.Context, errCh chan error) {
b.bufferMu.Unlock()

start := time.Now()
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, resolvedTs)
// todo: use real table ID
checkpointTs, err := b.Sink.FlushRowChangedEvents(ctx, 0, resolvedTs)
if err != nil {
if errors.Cause(err) != context.Canceled {
errCh <- err
Expand Down Expand Up @@ -146,7 +147,7 @@ func (b *bufferSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.Ro
return nil
}

func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (b *bufferSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
select {
case <-ctx.Done():
return atomic.LoadUint64(&b.checkpointTs), ctx.Err()
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ func (f *fileSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowC
return f.emitRowChangedEvents(ctx, newTableStream, rows...)
}

func (f *fileSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (f *fileSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
log.Debug("[FlushRowChangedEvents] enter", zap.Uint64("ts", resolvedTs))
return f.flushRowChangedEvents(ctx, resolvedTs)
}
Expand Down Expand Up @@ -349,7 +349,7 @@ func (f *fileSink) Close(ctx context.Context) error {
return nil
}

func (f *fileSink) Barrier(ctx context.Context) error {
func (f *fileSink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in file sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/cdclog/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func (s *s3Sink) flushLogMeta(ctx context.Context) error {
return cerror.WrapError(cerror.ErrS3SinkWriteStorage, s.storage.WriteFile(ctx, logMetaFile, data))
}

func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (s *s3Sink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
// we should flush all events before resolvedTs, there are two kind of flush policy
// 1. flush row events to a s3 chunk: if the event size is not enough,
// TODO: when cdc crashed, we should repair these chunks to a complete file
Expand Down Expand Up @@ -347,7 +347,7 @@ func (s *s3Sink) Close(ctx context.Context) error {
return nil
}

func (s *s3Sink) Barrier(ctx context.Context) error {
func (s *s3Sink) Barrier(ctx context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in s3 sink has flushed
// all buffered events forcedlly.
return nil
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (m *Manager) getMinEmittedTs() model.Ts {
return minTs
}

func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
func (m *Manager) flushBackendSink(ctx context.Context, tableID model.TableID) (model.Ts, error) {
// NOTICE: Because all table sinks will try to flush backend sink,
// which will cause a lot of lock contention and blocking in high concurrency cases.
// So here we use flushing as a lightweight lock to improve the lock competition problem.
Expand All @@ -119,7 +119,7 @@ func (m *Manager) flushBackendSink(ctx context.Context) (model.Ts, error) {
atomic.StoreInt64(&m.flushing, 0)
}()
minEmittedTs := m.getMinEmittedTs()
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, minEmittedTs)
checkpointTs, err := m.backendSink.FlushRowChangedEvents(ctx, tableID, minEmittedTs)
if err != nil {
return m.getCheckpointTs(), errors.Trace(err)
}
Expand All @@ -142,7 +142,7 @@ func (m *Manager) destroyTableSink(ctx context.Context, tableID model.TableID) e
return ctx.Err()
case <-callback:
}
return m.backendSink.Barrier(ctx)
return m.backendSink.Barrier(ctx, tableID)
}

func (m *Manager) getCheckpointTs() uint64 {
Expand Down
26 changes: 13 additions & 13 deletions cdc/sink/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *checkSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (c *checkSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (c *checkSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
c.rowsMu.Lock()
defer c.rowsMu.Unlock()
var newRows []*model.RowChangedEvent
Expand Down Expand Up @@ -83,7 +83,7 @@ func (c *checkSink) Close(ctx context.Context) error {
return nil
}

func (c *checkSink) Barrier(ctx context.Context) error {
func (c *checkSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand Down Expand Up @@ -118,7 +118,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
for j := 1; j < rowNum; j++ {
if rand.Intn(10) == 0 {
resolvedTs := lastResolvedTs + uint64(rand.Intn(j-int(lastResolvedTs)))
_, err := tableSink.FlushRowChangedEvents(ctx, resolvedTs)
_, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), resolvedTs)
c.Assert(err, check.IsNil)
lastResolvedTs = resolvedTs
} else {
Expand All @@ -129,7 +129,7 @@ func (s *managerSuite) TestManagerRandom(c *check.C) {
c.Assert(err, check.IsNil)
}
}
_, err := tableSink.FlushRowChangedEvents(ctx, uint64(rowNum))
_, err := tableSink.FlushRowChangedEvents(ctx, model.TableID(i), uint64(rowNum))
c.Assert(err, check.IsNil)
}()
}
Expand Down Expand Up @@ -180,7 +180,7 @@ func (s *managerSuite) TestManagerAddRemoveTable(c *check.C) {
})
c.Assert(err, check.IsNil)
}
_, err := sink.FlushRowChangedEvents(ctx, resolvedTs)
_, err := sink.FlushRowChangedEvents(ctx, sink.(*tableSink).tableID, resolvedTs)
if err != nil {
c.Assert(errors.Cause(err), check.Equals, context.Canceled)
}
Expand Down Expand Up @@ -244,7 +244,7 @@ func (s *managerSuite) TestManagerDestroyTableSink(c *check.C) {
CommitTs: uint64(110),
})
c.Assert(err, check.IsNil)
_, err = tableSink.FlushRowChangedEvents(ctx, 110)
_, err = tableSink.FlushRowChangedEvents(ctx, tableID, 110)
c.Assert(err, check.IsNil)
err = manager.destroyTableSink(ctx, tableID)
c.Assert(err, check.IsNil)
Expand Down Expand Up @@ -295,11 +295,11 @@ func BenchmarkManagerFlushing(b *testing.B) {
// All tables are flushed concurrently, except table 0.
for i := 1; i < goroutineNum; i++ {
i := i
tableSink := tableSinks[i]
tblSink := tableSinks[i]
go func() {
for j := 1; j < rowNum; j++ {
if j%2 == 0 {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(j))
_, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(j))
if err != nil {
b.Error(err)
}
Expand All @@ -310,9 +310,9 @@ func BenchmarkManagerFlushing(b *testing.B) {

b.ResetTimer()
// Table 0 flush.
tableSink := tableSinks[0]
tblSink := tableSinks[0]
for i := 0; i < b.N; i++ {
_, err := tableSink.FlushRowChangedEvents(context.Background(), uint64(rowNum))
_, err := tblSink.FlushRowChangedEvents(context.Background(), tblSink.(*tableSink).tableID, uint64(rowNum))
if err != nil {
b.Error(err)
}
Expand Down Expand Up @@ -345,7 +345,7 @@ func (e *errorSink) EmitDDLEvent(ctx context.Context, ddl *model.DDLEvent) error
panic("unreachable")
}

func (e *errorSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (e *errorSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
return 0, errors.New("error in flush row changed events")
}

Expand All @@ -357,7 +357,7 @@ func (e *errorSink) Close(ctx context.Context) error {
return nil
}

func (e *errorSink) Barrier(ctx context.Context) error {
func (e *errorSink) Barrier(ctx context.Context, tableID model.TableID) error {
return nil
}

Expand All @@ -374,7 +374,7 @@ func (s *managerSuite) TestManagerError(c *check.C) {
Table: &model.TableName{TableID: 1},
})
c.Assert(err, check.IsNil)
_, err = sink.FlushRowChangedEvents(ctx, 2)
_, err = sink.FlushRowChangedEvents(ctx, 1, 2)
c.Assert(err, check.IsNil)
err = <-errCh
c.Assert(err.Error(), check.Equals, "error in emit row changed events")
Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ func (k *mqSink) EmitRowChangedEvents(ctx context.Context, rows ...*model.RowCha
return nil
}

func (k *mqSink) FlushRowChangedEvents(ctx context.Context, resolvedTs uint64) (uint64, error) {
func (k *mqSink) FlushRowChangedEvents(ctx context.Context, tableID model.TableID, resolvedTs uint64) (uint64, error) {
if resolvedTs <= k.checkpointTs {
return k.checkpointTs, nil
}
Expand Down Expand Up @@ -260,7 +260,7 @@ func (k *mqSink) Close(ctx context.Context) error {
return errors.Trace(err)
}

func (k *mqSink) Barrier(cxt context.Context) error {
func (k *mqSink) Barrier(cxt context.Context, tableID model.TableID) error {
// Barrier does nothing because FlushRowChangedEvents in mq sink has flushed
// all buffered events by force.
return nil
Expand Down
10 changes: 6 additions & 4 deletions cdc/sink/mq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,22 +80,24 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) {

// mock kafka broker processes 1 row changed event
leader.Returns(prodSuccess)
tableID := model.TableID(1)
row := &model.RowChangedEvent{
Table: &model.TableName{
Schema: "test",
Table: "t1",
Schema: "test",
Table: "t1",
TableID: tableID,
},
StartTs: 100,
CommitTs: 120,
Columns: []*model.Column{{Name: "col1", Type: 1, Value: "aa"}},
}
err = sink.EmitRowChangedEvents(ctx, row)
c.Assert(err, check.IsNil)
checkpointTs, err := sink.FlushRowChangedEvents(ctx, uint64(120))
checkpointTs, err := sink.FlushRowChangedEvents(ctx, tableID, uint64(120))
c.Assert(err, check.IsNil)
c.Assert(checkpointTs, check.Equals, uint64(120))
// flush older resolved ts
checkpointTs, err = sink.FlushRowChangedEvents(ctx, uint64(110))
checkpointTs, err = sink.FlushRowChangedEvents(ctx, tableID, uint64(110))
c.Assert(err, check.IsNil)
c.Assert(checkpointTs, check.Equals, uint64(120))

Expand Down
Loading

0 comments on commit d7fa431

Please sign in to comment.