Skip to content

Commit

Permalink
sinkv2(ticdc): make event sink close waiting for all callbacks be cal…
Browse files Browse the repository at this point in the history
…led (#6344)

ref #5928
  • Loading branch information
Rustin170506 authored Jul 20, 2022
1 parent f86dd06 commit 1945f04
Show file tree
Hide file tree
Showing 10 changed files with 158 additions and 104 deletions.
9 changes: 4 additions & 5 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -603,11 +603,10 @@ func (d *DDLEvent) fillPreTableInfo(preTableInfo *TableInfo) {
//msgp:ignore SingleTableTxn
type SingleTableTxn struct {
// data fields of SingleTableTxn
Table *TableName
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent
ReplicaID uint64
Table *TableName
StartTs uint64
CommitTs uint64
Rows []*RowChangedEvent

// control fields of SingleTableTxn
// FinishWg is a barrier txn, after this txn is received, the worker must
Expand Down
1 change: 0 additions & 1 deletion cdc/processor/pipeline/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ const (
// and replicating it to downstream
TableStateReplicating
// TableStateStopping means the table is stopping, but not guaranteed yet.
// at the moment, this state is not used, only keep aligned with `schedulepb.TableStateStopping`
TableStateStopping
// TableStateStopped means sink stop all works, but the table resource not released yet.
TableStateStopped
Expand Down
6 changes: 3 additions & 3 deletions cdc/sink/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ type preparedDMLs struct {
}

// prepareDMLs converts model.RowChangedEvent list to query string list and args list
func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64, bucket int) *preparedDMLs {
func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, bucket int) *preparedDMLs {
sqls := make([]string, 0, len(rows))
values := make([][]interface{}, 0, len(rows))
replaces := make(map[string][][]interface{})
Expand Down Expand Up @@ -780,7 +780,7 @@ func (s *mysqlSink) prepareDMLs(rows []*model.RowChangedEvent, replicaID uint64,
return dmls
}

func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, replicaID uint64, bucket int) error {
func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent, bucket int) error {
failpoint.Inject("SinkFlushDMLPanic", func() {
time.Sleep(time.Second)
log.Fatal("SinkFlushDMLPanic")
Expand All @@ -792,7 +792,7 @@ func (s *mysqlSink) execDMLs(ctx context.Context, rows []*model.RowChangedEvent,
failpoint.Return(errors.Trace(dmysql.ErrInvalidConn))
})
s.statistics.ObserveRows(rows...)
dmls := s.prepareDMLs(rows, replicaID, bucket)
dmls := s.prepareDMLs(rows, bucket)
log.Debug("prepare DMLs", zap.Any("rows", rows), zap.Strings("sqls", dmls.sqls), zap.Any("values", dmls.values))
if err := s.execDMLWithMaxRetries(ctx, dmls, bucket); err != nil {
log.Error("execute DMLs failed", zap.String("err", err.Error()))
Expand Down
8 changes: 4 additions & 4 deletions cdc/sink/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func TestPrepareDML(t *testing.T) {
defer cancel()
ms := newMySQLSink4Test(ctx, t)
for _, tc := range testCases {
dmls := ms.prepareDMLs(tc.input, 0, 0)
dmls := ms.prepareDMLs(tc.input, 0)
require.Equal(t, tc.expected, dmls)
}
}
Expand Down Expand Up @@ -1333,7 +1333,7 @@ func TestExecDMLRollbackErrDatabaseNotExists(t *testing.T) {

require.Nil(t, err)

err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */)
err = sink.execDMLs(ctx, rows, 1 /* bucket */)
require.Equal(t, errDatabaseNotExists, errors.Cause(err))

err = sink.Close(ctx)
Expand Down Expand Up @@ -1410,7 +1410,7 @@ func TestExecDMLRollbackErrTableNotExists(t *testing.T) {

require.Nil(t, err)

err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */)
err = sink.execDMLs(ctx, rows, 1 /* bucket */)
require.Equal(t, errTableNotExists, errors.Cause(err))

err = sink.Close(ctx)
Expand Down Expand Up @@ -1492,7 +1492,7 @@ func TestExecDMLRollbackErrRetryable(t *testing.T) {

require.Nil(t, err)

err = sink.execDMLs(ctx, rows, 1 /* replicaID */, 1 /* bucket */)
err = sink.execDMLs(ctx, rows, 1 /* bucket */)
require.Equal(t, errLockDeadlock, errors.Cause(err))

err = sink.Close(ctx)
Expand Down
10 changes: 4 additions & 6 deletions cdc/sink/mysql/mysql_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type mysqlSinkWorker struct {
txnCh chan *model.SingleTableTxn
maxTxnRow int
bucket int
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error
execDMLs func(context.Context, []*model.RowChangedEvent, int) error
metricBucketSize prometheus.Counter
receiver *notify.Receiver
closedCh chan struct{}
Expand All @@ -45,7 +45,7 @@ func newMySQLSinkWorker(
bucket int,
metricBucketSize prometheus.Counter,
receiver *notify.Receiver,
execDMLs func(context.Context, []*model.RowChangedEvent, uint64, int) error,
execDMLs func(context.Context, []*model.RowChangedEvent, int) error,
) *mysqlSinkWorker {
return &mysqlSinkWorker{
txnCh: make(chan *model.SingleTableTxn, 1024),
Expand Down Expand Up @@ -84,7 +84,6 @@ func (w *mysqlSinkWorker) isNormal() bool {
func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
var (
toExecRows []*model.RowChangedEvent
replicaID uint64
txnNum int
)

Expand Down Expand Up @@ -116,7 +115,7 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
if len(toExecRows) == 0 {
return nil
}
err := w.execDMLs(ctx, toExecRows, replicaID, w.bucket)
err := w.execDMLs(ctx, toExecRows, w.bucket)
if err != nil {
txnNum = 0
return err
Expand Down Expand Up @@ -144,14 +143,13 @@ func (w *mysqlSinkWorker) run(ctx context.Context) (err error) {
txn.FinishWg.Done()
continue
}
if txn.ReplicaID != replicaID || len(toExecRows)+len(txn.Rows) > w.maxTxnRow {
if len(toExecRows)+len(txn.Rows) > w.maxTxnRow {
if err := flushRows(); err != nil {
txnNum++
w.hasError.Store(true)
return errors.Trace(err)
}
}
replicaID = txn.ReplicaID
toExecRows = append(toExecRows, txn.Rows...)
txnNum++
case <-w.receiver.C:
Expand Down
123 changes: 50 additions & 73 deletions cdc/sink/mysql/mysql_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,134 +38,115 @@ func TestMysqlSinkWorker(t *testing.T) {
IsPartition: false,
}
testCases := []struct {
txns []*model.SingleTableTxn
expectedOutputRows [][]*model.RowChangedEvent
exportedOutputReplicaIDs []uint64
maxTxnRow int
txns []*model.SingleTableTxn
expectedOutputRows [][]*model.RowChangedEvent
maxTxnRow int
}{
{
txns: []*model.SingleTableTxn{},
maxTxnRow: 4,
maxTxnRow: 1,
},
{
txns: []*model.SingleTableTxn{
{
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
ReplicaID: 1,
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
},
},
expectedOutputRows: [][]*model.RowChangedEvent{{{CommitTs: 1}}},
exportedOutputReplicaIDs: []uint64{1},
maxTxnRow: 2,
expectedOutputRows: [][]*model.RowChangedEvent{{{CommitTs: 1}}},
maxTxnRow: 1,
},
{
txns: []*model.SingleTableTxn{
{
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}},
ReplicaID: 1,
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}},
},
},
expectedOutputRows: [][]*model.RowChangedEvent{
{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 1}},
},
exportedOutputReplicaIDs: []uint64{1},
maxTxnRow: 2,
maxTxnRow: 2,
},
{
txns: []*model.SingleTableTxn{
{
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}},
ReplicaID: 1,
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}, {CommitTs: 1}},
},
{
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
ReplicaID: 1,
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
},
{
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}, {CommitTs: 3}},
ReplicaID: 1,
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}, {CommitTs: 3}},
},
},
expectedOutputRows: [][]*model.RowChangedEvent{
{{CommitTs: 1}, {CommitTs: 1}, {CommitTs: 2}},
{{CommitTs: 3}, {CommitTs: 3}},
},
exportedOutputReplicaIDs: []uint64{1, 1},
maxTxnRow: 4,
maxTxnRow: 4,
},
{
txns: []*model.SingleTableTxn{
{
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
ReplicaID: 1,
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
},
{
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
ReplicaID: 2,
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}},
},
{
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}},
ReplicaID: 3,
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}},
},
},
expectedOutputRows: [][]*model.RowChangedEvent{
{{CommitTs: 1}},
{{CommitTs: 2}},
{{CommitTs: 1}, {CommitTs: 2}},
{{CommitTs: 3}},
},
exportedOutputReplicaIDs: []uint64{1, 2, 3},
maxTxnRow: 4,
maxTxnRow: 2,
},
{
txns: []*model.SingleTableTxn{
{
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
ReplicaID: 1,
Table: tbl,
CommitTs: 1,
Rows: []*model.RowChangedEvent{{CommitTs: 1}},
},
{
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}},
ReplicaID: 1,
Table: tbl,
CommitTs: 2,
Rows: []*model.RowChangedEvent{{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}},
},
{
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}},
ReplicaID: 1,
Table: tbl,
CommitTs: 3,
Rows: []*model.RowChangedEvent{{CommitTs: 3}},
},
{
Table: tbl,
CommitTs: 4,
Rows: []*model.RowChangedEvent{{CommitTs: 4}},
ReplicaID: 1,
Table: tbl,
CommitTs: 4,
Rows: []*model.RowChangedEvent{{CommitTs: 4}},
},
},
expectedOutputRows: [][]*model.RowChangedEvent{
{{CommitTs: 1}},
{{CommitTs: 2}, {CommitTs: 2}, {CommitTs: 2}},
{{CommitTs: 3}, {CommitTs: 4}},
},
exportedOutputReplicaIDs: []uint64{1, 1, 1},
maxTxnRow: 2,
maxTxnRow: 3,
},
}
ctx := context.Background()
Expand All @@ -174,18 +155,16 @@ func TestMysqlSinkWorker(t *testing.T) {
for i, tc := range testCases {
cctx, cancel := context.WithCancel(ctx)
var outputRows [][]*model.RowChangedEvent
var outputReplicaIDs []uint64
receiver, err := notifier.NewReceiver(-1)
require.Nil(t, err)
w := newMySQLSinkWorker(tc.maxTxnRow, 1,
metrics.BucketSizeCounter.
WithLabelValues("default", "changefeed", "1"),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
func(ctx context.Context, events []*model.RowChangedEvent, bucket int) error {
rows := make([]*model.RowChangedEvent, len(events))
copy(rows, events)
outputRows = append(outputRows, rows)
outputReplicaIDs = append(outputReplicaIDs, replicaID)
return nil
})
errg, cctx := errgroup.WithContext(cctx)
Expand All @@ -205,8 +184,6 @@ func TestMysqlSinkWorker(t *testing.T) {
require.Equal(t, context.Canceled, errors.Cause(errg.Wait()))
require.Equal(t, tc.expectedOutputRows, outputRows,
fmt.Sprintf("case %v, %s, %s", i, spew.Sdump(outputRows), spew.Sdump(tc.expectedOutputRows)))
require.Equal(t, tc.exportedOutputReplicaIDs, outputReplicaIDs, tc.exportedOutputReplicaIDs,
fmt.Sprintf("case %v, %s, %s", i, spew.Sdump(outputReplicaIDs), spew.Sdump(tc.exportedOutputReplicaIDs)))
}
}

Expand Down Expand Up @@ -264,7 +241,7 @@ func TestMySQLSinkWorkerExitWithError(t *testing.T) {
metrics.
BucketSizeCounter.WithLabelValues("default", "changefeed", "1"),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
func(ctx context.Context, events []*model.RowChangedEvent, bucket int) error {
return errExecFailed
})
errg, cctx := errgroup.WithContext(cctx)
Expand Down Expand Up @@ -341,7 +318,7 @@ func TestMySQLSinkWorkerExitCleanup(t *testing.T) {
metrics.
BucketSizeCounter.WithLabelValues("default", "changefeed", "1"),
receiver,
func(ctx context.Context, events []*model.RowChangedEvent, replicaID uint64, bucket int) error {
func(ctx context.Context, events []*model.RowChangedEvent, bucket int) error {
return errExecFailed
})
errg, cctx := errgroup.WithContext(cctx)
Expand Down
7 changes: 7 additions & 0 deletions cdc/sinkv2/tablesink/event_table_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,14 @@ func (e *eventTableSink[E]) GetCheckpointTs() model.ResolvedTs {
return e.progressTracker.minTs()
}

// Close the table sink and wait for all callbacks be called.
// Notice: It will be blocked until all callbacks be called.
func (e *eventTableSink[E]) Close() {
// TODO: Before we depends on this state,
// we should check the state working well with new scheduler.
// Maybe we only need a sink state(isClosing), not a table state.
e.state.Store(pipeline.TableStateStopping)
e.progressTracker.close()
e.state.Store(pipeline.TableStateStopped)
}

Expand Down
Loading

0 comments on commit 1945f04

Please sign in to comment.