From 34b79183c77f09bf39a5c5da0b026ecfcb32a35d Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Wed, 19 Apr 2023 18:05:20 +0800 Subject: [PATCH] sink(ticdc): fix TxnEventAppender.startTs in CreateTableSink (#8810) close pingcap/tiflow#8805 --- cdc/sinkv2/eventsink/factory/factory.go | 28 ++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/cdc/sinkv2/eventsink/factory/factory.go b/cdc/sinkv2/eventsink/factory/factory.go index f3ba11f8ce5..071b6e994c0 100644 --- a/cdc/sinkv2/eventsink/factory/factory.go +++ b/cdc/sinkv2/eventsink/factory/factory.go @@ -91,11 +91,8 @@ func New(ctx context.Context, return s, nil } -// CreateTableSinkForConsumer creates a TableSink by schema for consumer. -// The difference between CreateTableSink and CreateTableSinkForConsumer is that -// CreateTableSinkForConsumer will not create a new sink for each table. -// NOTICE: This only used for the consumer. Please do not use it in the processor. -func (s *SinkFactory) CreateTableSinkForConsumer( +// CreateTableSink creates a TableSink by schema. +func (s *SinkFactory) CreateTableSink( changefeedID model.ChangeFeedID, tableID model.TableID, startTs model.Ts, totalRowsCounter prometheus.Counter, @@ -106,18 +103,18 @@ func (s *SinkFactory) CreateTableSinkForConsumer( return tablesink.New[*model.RowChangedEvent](changefeedID, tableID, startTs, s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter) case sink.TxnSink: - return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, s.txnSink, - // IgnoreStartTs is true because the consumer can - // **not** get the start ts of the row changed event. - &eventsink.TxnEventAppender{TableSinkStartTs: startTs, IgnoreStartTs: true}, - totalRowsCounter) + return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, + s.txnSink, &eventsink.TxnEventAppender{TableSinkStartTs: startTs}, totalRowsCounter) default: panic("unknown sink type") } } -// CreateTableSink creates a TableSink by schema. -func (s *SinkFactory) CreateTableSink( +// CreateTableSinkForConsumer creates a TableSink by schema for consumer. +// The difference between CreateTableSink and CreateTableSinkForConsumer is that +// CreateTableSinkForConsumer will not create a new sink for each table. +// NOTICE: This only used for the consumer. Please do not use it in the processor. +func (s *SinkFactory) CreateTableSinkForConsumer( changefeedID model.ChangeFeedID, tableID model.TableID, startTs model.Ts, totalRowsCounter prometheus.Counter, @@ -128,8 +125,11 @@ func (s *SinkFactory) CreateTableSink( return tablesink.New[*model.RowChangedEvent](changefeedID, tableID, startTs, s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter) case sink.TxnSink: - return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, - s.txnSink, &eventsink.TxnEventAppender{}, totalRowsCounter) + return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, s.txnSink, + // IgnoreStartTs is true because the consumer can + // **not** get the start ts of the row changed event. + &eventsink.TxnEventAppender{TableSinkStartTs: startTs, IgnoreStartTs: true}, + totalRowsCounter) default: panic("unknown sink type") }