Skip to content

Commit

Permalink
sink(ticdc): fix TxnEventAppender.startTs in CreateTableSink (pingcap…
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 authored Apr 19, 2023
1 parent 1f3151f commit 34b7918
Showing 1 changed file with 14 additions and 14 deletions.
28 changes: 14 additions & 14 deletions cdc/sinkv2/eventsink/factory/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,8 @@ func New(ctx context.Context,
return s, nil
}

// CreateTableSinkForConsumer creates a TableSink by schema for consumer.
// The difference between CreateTableSink and CreateTableSinkForConsumer is that
// CreateTableSinkForConsumer will not create a new sink for each table.
// NOTICE: This only used for the consumer. Please do not use it in the processor.
func (s *SinkFactory) CreateTableSinkForConsumer(
// CreateTableSink creates a TableSink by schema.
func (s *SinkFactory) CreateTableSink(
changefeedID model.ChangeFeedID,
tableID model.TableID, startTs model.Ts,
totalRowsCounter prometheus.Counter,
Expand All @@ -106,18 +103,18 @@ func (s *SinkFactory) CreateTableSinkForConsumer(
return tablesink.New[*model.RowChangedEvent](changefeedID, tableID, startTs,
s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter)
case sink.TxnSink:
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, s.txnSink,
// IgnoreStartTs is true because the consumer can
// **not** get the start ts of the row changed event.
&eventsink.TxnEventAppender{TableSinkStartTs: startTs, IgnoreStartTs: true},
totalRowsCounter)
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs,
s.txnSink, &eventsink.TxnEventAppender{TableSinkStartTs: startTs}, totalRowsCounter)
default:
panic("unknown sink type")
}
}

// CreateTableSink creates a TableSink by schema.
func (s *SinkFactory) CreateTableSink(
// CreateTableSinkForConsumer creates a TableSink by schema for consumer.
// The difference between CreateTableSink and CreateTableSinkForConsumer is that
// CreateTableSinkForConsumer will not create a new sink for each table.
// NOTICE: This only used for the consumer. Please do not use it in the processor.
func (s *SinkFactory) CreateTableSinkForConsumer(
changefeedID model.ChangeFeedID,
tableID model.TableID, startTs model.Ts,
totalRowsCounter prometheus.Counter,
Expand All @@ -128,8 +125,11 @@ func (s *SinkFactory) CreateTableSink(
return tablesink.New[*model.RowChangedEvent](changefeedID, tableID, startTs,
s.rowSink, &eventsink.RowChangeEventAppender{}, totalRowsCounter)
case sink.TxnSink:
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs,
s.txnSink, &eventsink.TxnEventAppender{}, totalRowsCounter)
return tablesink.New[*model.SingleTableTxn](changefeedID, tableID, startTs, s.txnSink,
// IgnoreStartTs is true because the consumer can
// **not** get the start ts of the row changed event.
&eventsink.TxnEventAppender{TableSinkStartTs: startTs, IgnoreStartTs: true},
totalRowsCounter)
default:
panic("unknown sink type")
}
Expand Down

0 comments on commit 34b7918

Please sign in to comment.