Skip to content

Commit

Permalink
consumer(ticdc): update the logic of consumer. (pingcap#4129) (pingca…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Feb 24, 2022
1 parent 220b1d5 commit 34e5660
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 60 deletions.
6 changes: 3 additions & 3 deletions cdc/sink/black_hole.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ import (
"go.uber.org/zap"
)

// newBlackHoleSink creates a block hole sink
func newBlackHoleSink(ctx context.Context, opts map[string]string) *blackHoleSink {
// newBlackHoleSink creates a black hole sink
func newBlackHoleSink(ctx context.Context) *blackHoleSink {
return &blackHoleSink{
statistics: NewStatistics(ctx, "blackhole", opts),
statistics: NewStatistics(ctx, "blackhole"),
}
}

Expand Down
4 changes: 2 additions & 2 deletions cdc/sink/buffer_sink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestFlushTable(t *testing.T) {
defer func() {
cancel()
}()
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
require.Equal(t, uint64(5), b.getTableCheckpointTs(2))
require.Nil(t, b.EmitRowChangedEvents(ctx))
tbl1 := &model.TableName{TableID: 1}
Expand Down Expand Up @@ -74,7 +74,7 @@ func TestFlushTable(t *testing.T) {

func TestFlushFailed(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
b := newBufferSink(ctx, newBlackHoleSink(ctx, make(map[string]string)), make(chan error), 5, make(chan drawbackMsg))
b := newBufferSink(ctx, newBlackHoleSink(ctx), make(chan error), 5, make(chan drawbackMsg))
checkpoint, err := b.FlushRowChangedEvents(ctx, 3, 8)
require.True(t, checkpoint <= 8)
require.Nil(t, err)
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mq.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func newMqSink(
resolvedNotifier: notifier,
resolvedReceiver: resolvedReceiver,

statistics: NewStatistics(ctx, "MQ", opts),
statistics: NewStatistics(ctx, "MQ"),
}

go func() {
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,7 @@ func newMySQLSink(
params: params,
filter: filter,
txnCache: common.NewUnresolvedTxnCache(),
statistics: NewStatistics(ctx, "mysql", opts),
statistics: NewStatistics(ctx, "mysql"),
metricConflictDetectDurationHis: metricConflictDetectDurationHis,
metricBucketSizeCounters: metricBucketSizeCounters,
errCh: make(chan error, 1),
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newMySQLSink4Test(ctx context.Context, t *testing.T) *mysqlSink {
return &mysqlSink{
txnCache: common.NewUnresolvedTxnCache(),
filter: f,
statistics: NewStatistics(ctx, "test", make(map[string]string)),
statistics: NewStatistics(ctx, "test"),
params: params,
}
}
Expand Down
2 changes: 1 addition & 1 deletion cdc/sink/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func init() {
// register blackhole sink
sinkIniterMap["blackhole"] = func(ctx context.Context, changefeedID model.ChangeFeedID, sinkURI *url.URL,
filter *filter.Filter, config *config.ReplicaConfig, opts map[string]string, errCh chan error) (Sink, error) {
return newBlackHoleSink(ctx, opts), nil
return newBlackHoleSink(ctx), nil
}

// register mysql sink
Expand Down
13 changes: 6 additions & 7 deletions cdc/sink/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,12 @@ const (
)

// NewStatistics creates a statistics
func NewStatistics(ctx context.Context, name string, opts map[string]string) *Statistics {
statistics := &Statistics{name: name, lastPrintStatusTime: time.Now()}
if cid, ok := opts[OptChangefeedID]; ok {
statistics.changefeedID = cid
}
if cid, ok := opts[OptCaptureAddr]; ok {
statistics.captureAddr = cid
func NewStatistics(ctx context.Context, name string) *Statistics {
statistics := &Statistics{
name: name,
captureAddr: util.CaptureAddrFromCtx(ctx),
changefeedID: util.ChangefeedIDFromCtx(ctx),
lastPrintStatusTime: time.Now(),
}
statistics.metricExecTxnHis = execTxnHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
statistics.metricExecDDLHis = execDDLHistogram.WithLabelValues(statistics.captureAddr, statistics.changefeedID)
Expand Down
106 changes: 62 additions & 44 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ func init() {
File: logPath,
})
if err != nil {
log.Fatal("init logger failed", zap.Error(err))
log.Panic("init logger failed", zap.Error(err))
}

upstreamURI, err := url.Parse(upstreamURIStr)
if err != nil {
log.Fatal("invalid upstream-uri", zap.Error(err))
log.Panic("invalid upstream-uri", zap.Error(err))
}
scheme := strings.ToLower(upstreamURI.Scheme)
if scheme != "kafka" {
log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", zap.String("upstream-uri", upstreamURIStr))
log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`",
zap.String("upstreamURI", upstreamURIStr))
}
s := upstreamURI.Query().Get("version")
if s != "" {
Expand All @@ -106,20 +107,20 @@ func init() {

config, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
log.Panic("Error creating sarama config", zap.Error(err))
}

s = upstreamURI.Query().Get("partition-num")
if s == "" {
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config)
if err != nil {
log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
log.Panic("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
}
kafkaPartitionNum = partition
} else {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid partition-num of upstream-uri")
log.Panic("invalid partition-num of upstream-uri")
}
kafkaPartitionNum = int32(c)
}
Expand All @@ -128,7 +129,7 @@ func init() {
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid max-message-bytes of upstream-uri")
log.Panic("invalid max-message-bytes of upstream-uri")
}
log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c))
kafkaMaxMessageBytes = c
Expand All @@ -138,7 +139,7 @@ func init() {
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid max-batch-size of upstream-uri")
log.Panic("invalid max-batch-size of upstream-uri")
}
log.Info("Setting max-batch-size", zap.Int("max-batch-size", c))
kafkaMaxBatchSize = c
Expand Down Expand Up @@ -227,24 +228,24 @@ func main() {
*/
config, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
log.Panic("Error creating sarama config", zap.Error(err))
}
err = waitTopicCreated(kafkaAddrs, kafkaTopic, config)
if err != nil {
log.Fatal("wait topic created failed", zap.Error(err))
log.Panic("wait topic created failed", zap.Error(err))
}
/**
* Setup a new Sarama consumer group
*/
consumer, err := NewConsumer(context.TODO())
if err != nil {
log.Fatal("Error creating consumer", zap.Error(err))
log.Panic("Error creating consumer", zap.Error(err))
}

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(kafkaAddrs, kafkaGroupID, config)
if err != nil {
log.Fatal("Error creating consumer group client", zap.Error(err))
log.Panic("Error creating consumer group client", zap.Error(err))
}

wg := &sync.WaitGroup{}
Expand All @@ -256,7 +257,7 @@ func main() {
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), consumer); err != nil {
log.Fatal("Error from consumer: %v", zap.Error(err))
log.Panic("Error from consumer: %v", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
Expand All @@ -268,7 +269,7 @@ func main() {

go func() {
if err := consumer.Run(ctx); err != nil {
log.Fatal("Error running consumer: %v", zap.Error(err))
log.Panic("Error running consumer: %v", zap.Error(err))
}
}()

Expand All @@ -286,7 +287,7 @@ func main() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Fatal("Error closing client", zap.Error(err))
log.Panic("Error closing client", zap.Error(err))
}
}

Expand Down Expand Up @@ -383,9 +384,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if sink == nil {
panic("sink should initialized")
}
ClaimMessages:

for message := range claim.Messages() {
log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value)
if err != nil {
return errors.Trace(err)
Expand All @@ -395,7 +396,7 @@ ClaimMessages:
for {
tp, hasNext, err := batchDecoder.HasNext()
if err != nil {
log.Fatal("decode message key failed", zap.Error(err))
log.Panic("decode message key failed", zap.Error(err))
}
if !hasNext {
break
Expand All @@ -404,29 +405,30 @@ ClaimMessages:
counter++
// If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning.
if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 {
log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes),
zap.Int("recevied-bytes", len(message.Key)+len(message.Value)))
log.Panic("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes),
zap.Int("receviedBytes", len(message.Key)+len(message.Value)))
}

switch tp {
case model.MqMessageTypeDDL:
ddl, err := batchDecoder.NextDDLEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
c.appendDDL(ddl)
case model.MqMessageTypeRow:
row, err := batchDecoder.NextRowChangedEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if row.CommitTs <= globalResolvedTs || row.CommitTs <= sink.resolvedTs {
log.Debug("filter fallback row", zap.ByteString("row", message.Key),
log.Debug("RowChangedEvent fallback row, ignore it",
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("sinkResolvedTs", sink.resolvedTs),
zap.Int32("partition", partition))
break ClaimMessages
zap.Int32("partition", partition),
zap.ByteString("row", message.Key))
}
// FIXME: hack to set start-ts in row changed event, as start-ts
// is not contained in TiCDC open protocol
Expand All @@ -439,7 +441,7 @@ ClaimMessages:
c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
err = sink.EmitRowChangedEvents(ctx, row)
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
log.Panic("emit row changed event failed", zap.Error(err))
}
lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID)
if !ok || lastCommitTs.(uint64) < row.CommitTs {
Expand All @@ -448,21 +450,29 @@ ClaimMessages:
case model.MqMessageTypeResolved:
ts, err := batchDecoder.NextResolvedEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
resolvedTs := atomic.LoadUint64(&sink.resolvedTs)
if resolvedTs < ts {
// `resolvedTs` should be monotonically increasing, it's allowed to receive redandunt one.
if ts < resolvedTs {
log.Panic("partition resolved ts fallback",
zap.Uint64("ts", ts),
zap.Uint64("resolvedTs", resolvedTs),
zap.Int32("partition", partition))
} else if ts > resolvedTs {
log.Debug("update sink resolved ts",
zap.Uint64("ts", ts),
zap.Int32("partition", partition))
atomic.StoreUint64(&sink.resolvedTs, ts)
} else {
log.Info("redundant sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition))
}
}
session.MarkMessage(message, "")
}

if counter > kafkaMaxBatchSize {
log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize),
log.Panic("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize),
zap.Int("actual-batch-size", counter))
}
}
Expand All @@ -477,8 +487,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
return
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if ddl.CommitTs <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
if ddl.CommitTs < globalResolvedTs {
log.Panic("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
}
if ddl.CommitTs == globalResolvedTs {
log.Warn("receive redundant ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
return
}
c.ddlList = append(c.ddlList, ddl)
Expand Down Expand Up @@ -519,14 +532,15 @@ func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error {
// Run runs the Consumer
func (c *Consumer) Run(ctx context.Context) error {
var lastGlobalResolvedTs uint64
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
case <-ticker.C:
}
time.Sleep(100 * time.Millisecond)
// handle ddl
// initialize the `globalResolvedTs` as min of all partition's `ResolvedTs`
globalResolvedTs := uint64(math.MaxUint64)
err := c.forEachSink(func(sink *partitionSink) error {
resolvedTs := atomic.LoadUint64(&sink.resolvedTs)
Expand All @@ -538,6 +552,7 @@ func (c *Consumer) Run(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
// handle ddl
todoDDL := c.getFrontDDL()
if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs {
// flush DMLs
Expand All @@ -560,18 +575,21 @@ func (c *Consumer) Run(ctx context.Context) error {
if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs {
globalResolvedTs = todoDDL.CommitTs
}
if lastGlobalResolvedTs == globalResolvedTs {
continue
if lastGlobalResolvedTs > globalResolvedTs {
log.Panic("global ResolvedTs fallback")
}
lastGlobalResolvedTs = globalResolvedTs
atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs)
log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs))

err = c.forEachSink(func(sink *partitionSink) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
})
if err != nil {
return errors.Trace(err)
if globalResolvedTs > lastGlobalResolvedTs {
lastGlobalResolvedTs = globalResolvedTs
atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs)
log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs))

err = c.forEachSink(func(sink *partitionSink) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
})
if err != nil {
return errors.Trace(err)
}
}
}
}
Expand Down

0 comments on commit 34e5660

Please sign in to comment.