Skip to content

Commit

Permalink
Merge branch 'release-5.3' into cherry-pick-4262-to-release-5.3
Browse files Browse the repository at this point in the history
  • Loading branch information
zhaoxinyu authored Feb 21, 2022
2 parents ab6d309 + 9b6e289 commit 0a38b87
Showing 1 changed file with 62 additions and 44 deletions.
106 changes: 62 additions & 44 deletions cmd/kafka-consumer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,16 +80,17 @@ func init() {
File: logPath,
})
if err != nil {
log.Fatal("init logger failed", zap.Error(err))
log.Panic("init logger failed", zap.Error(err))
}

upstreamURI, err := url.Parse(upstreamURIStr)
if err != nil {
log.Fatal("invalid upstream-uri", zap.Error(err))
log.Panic("invalid upstream-uri", zap.Error(err))
}
scheme := strings.ToLower(upstreamURI.Scheme)
if scheme != "kafka" {
log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", zap.String("upstream-uri", upstreamURIStr))
log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`",
zap.String("upstreamURI", upstreamURIStr))
}
s := upstreamURI.Query().Get("version")
if s != "" {
Expand All @@ -106,20 +107,20 @@ func init() {

config, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
log.Panic("Error creating sarama config", zap.Error(err))
}

s = upstreamURI.Query().Get("partition-num")
if s == "" {
partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config)
if err != nil {
log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
log.Panic("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err))
}
kafkaPartitionNum = partition
} else {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid partition-num of upstream-uri")
log.Panic("invalid partition-num of upstream-uri")
}
kafkaPartitionNum = int32(c)
}
Expand All @@ -128,7 +129,7 @@ func init() {
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid max-message-bytes of upstream-uri")
log.Panic("invalid max-message-bytes of upstream-uri")
}
log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c))
kafkaMaxMessageBytes = c
Expand All @@ -138,7 +139,7 @@ func init() {
if s != "" {
c, err := strconv.Atoi(s)
if err != nil {
log.Fatal("invalid max-batch-size of upstream-uri")
log.Panic("invalid max-batch-size of upstream-uri")
}
log.Info("Setting max-batch-size", zap.Int("max-batch-size", c))
kafkaMaxBatchSize = c
Expand Down Expand Up @@ -227,24 +228,24 @@ func main() {
*/
config, err := newSaramaConfig()
if err != nil {
log.Fatal("Error creating sarama config", zap.Error(err))
log.Panic("Error creating sarama config", zap.Error(err))
}
err = waitTopicCreated(kafkaAddrs, kafkaTopic, config)
if err != nil {
log.Fatal("wait topic created failed", zap.Error(err))
log.Panic("wait topic created failed", zap.Error(err))
}
/**
* Setup a new Sarama consumer group
*/
consumer, err := NewConsumer(context.TODO())
if err != nil {
log.Fatal("Error creating consumer", zap.Error(err))
log.Panic("Error creating consumer", zap.Error(err))
}

ctx, cancel := context.WithCancel(context.Background())
client, err := sarama.NewConsumerGroup(kafkaAddrs, kafkaGroupID, config)
if err != nil {
log.Fatal("Error creating consumer group client", zap.Error(err))
log.Panic("Error creating consumer group client", zap.Error(err))
}

wg := &sync.WaitGroup{}
Expand All @@ -256,7 +257,7 @@ func main() {
// server-side rebalance happens, the consumer session will need to be
// recreated to get the new claims
if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), consumer); err != nil {
log.Fatal("Error from consumer: %v", zap.Error(err))
log.Panic("Error from consumer: %v", zap.Error(err))
}
// check if context was cancelled, signaling that the consumer should stop
if ctx.Err() != nil {
Expand All @@ -268,7 +269,7 @@ func main() {

go func() {
if err := consumer.Run(ctx); err != nil {
log.Fatal("Error running consumer: %v", zap.Error(err))
log.Panic("Error running consumer: %v", zap.Error(err))
}
}()

Expand All @@ -286,7 +287,7 @@ func main() {
cancel()
wg.Wait()
if err = client.Close(); err != nil {
log.Fatal("Error closing client", zap.Error(err))
log.Panic("Error closing client", zap.Error(err))
}
}

Expand Down Expand Up @@ -383,9 +384,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram
if sink == nil {
panic("sink should initialized")
}
ClaimMessages:

for message := range claim.Messages() {
log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value))
batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value)
if err != nil {
return errors.Trace(err)
Expand All @@ -395,7 +396,7 @@ ClaimMessages:
for {
tp, hasNext, err := batchDecoder.HasNext()
if err != nil {
log.Fatal("decode message key failed", zap.Error(err))
log.Panic("decode message key failed", zap.Error(err))
}
if !hasNext {
break
Expand All @@ -404,29 +405,30 @@ ClaimMessages:
counter++
// If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning.
if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 {
log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes),
zap.Int("recevied-bytes", len(message.Key)+len(message.Value)))
log.Panic("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes),
zap.Int("receviedBytes", len(message.Key)+len(message.Value)))
}

switch tp {
case model.MqMessageTypeDDL:
ddl, err := batchDecoder.NextDDLEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
c.appendDDL(ddl)
case model.MqMessageTypeRow:
row, err := batchDecoder.NextRowChangedEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if row.CommitTs <= globalResolvedTs || row.CommitTs <= sink.resolvedTs {
log.Debug("filter fallback row", zap.ByteString("row", message.Key),
log.Debug("RowChangedEvent fallback row, ignore it",
zap.Uint64("commitTs", row.CommitTs),
zap.Uint64("globalResolvedTs", globalResolvedTs),
zap.Uint64("sinkResolvedTs", sink.resolvedTs),
zap.Int32("partition", partition))
break ClaimMessages
zap.Int32("partition", partition),
zap.ByteString("row", message.Key))
}
// FIXME: hack to set start-ts in row changed event, as start-ts
// is not contained in TiCDC open protocol
Expand All @@ -439,7 +441,7 @@ ClaimMessages:
c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID)
err = sink.EmitRowChangedEvents(ctx, row)
if err != nil {
log.Fatal("emit row changed event failed", zap.Error(err))
log.Panic("emit row changed event failed", zap.Error(err))
}
lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID)
if !ok || lastCommitTs.(uint64) < row.CommitTs {
Expand All @@ -448,21 +450,29 @@ ClaimMessages:
case model.MqMessageTypeResolved:
ts, err := batchDecoder.NextResolvedEvent()
if err != nil {
log.Fatal("decode message value failed", zap.ByteString("value", message.Value))
log.Panic("decode message value failed", zap.ByteString("value", message.Value))
}
resolvedTs := atomic.LoadUint64(&sink.resolvedTs)
if resolvedTs < ts {
// `resolvedTs` should be monotonically increasing, it's allowed to receive redandunt one.
if ts < resolvedTs {
log.Panic("partition resolved ts fallback",
zap.Uint64("ts", ts),
zap.Uint64("resolvedTs", resolvedTs),
zap.Int32("partition", partition))
} else if ts > resolvedTs {
log.Debug("update sink resolved ts",
zap.Uint64("ts", ts),
zap.Int32("partition", partition))
atomic.StoreUint64(&sink.resolvedTs, ts)
} else {
log.Info("redundant sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition))
}
}
session.MarkMessage(message, "")
}

if counter > kafkaMaxBatchSize {
log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize),
log.Panic("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize),
zap.Int("actual-batch-size", counter))
}
}
Expand All @@ -477,8 +487,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) {
return
}
globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs)
if ddl.CommitTs <= globalResolvedTs {
log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
if ddl.CommitTs < globalResolvedTs {
log.Panic("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
}
if ddl.CommitTs == globalResolvedTs {
log.Warn("receive redundant ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs))
return
}
c.ddlList = append(c.ddlList, ddl)
Expand Down Expand Up @@ -519,14 +532,15 @@ func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error {
// Run runs the Consumer
func (c *Consumer) Run(ctx context.Context) error {
var lastGlobalResolvedTs uint64
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
case <-ticker.C:
}
time.Sleep(100 * time.Millisecond)
// handle ddl
// initialize the `globalResolvedTs` as min of all partition's `ResolvedTs`
globalResolvedTs := uint64(math.MaxUint64)
err := c.forEachSink(func(sink *partitionSink) error {
resolvedTs := atomic.LoadUint64(&sink.resolvedTs)
Expand All @@ -538,6 +552,7 @@ func (c *Consumer) Run(ctx context.Context) error {
if err != nil {
return errors.Trace(err)
}
// handle ddl
todoDDL := c.getFrontDDL()
if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs {
// flush DMLs
Expand All @@ -560,18 +575,21 @@ func (c *Consumer) Run(ctx context.Context) error {
if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs {
globalResolvedTs = todoDDL.CommitTs
}
if lastGlobalResolvedTs == globalResolvedTs {
continue
if lastGlobalResolvedTs > globalResolvedTs {
log.Panic("global ResolvedTs fallback")
}
lastGlobalResolvedTs = globalResolvedTs
atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs)
log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs))

err = c.forEachSink(func(sink *partitionSink) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
})
if err != nil {
return errors.Trace(err)
if globalResolvedTs > lastGlobalResolvedTs {
lastGlobalResolvedTs = globalResolvedTs
atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs)
log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs))

err = c.forEachSink(func(sink *partitionSink) error {
return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs)
})
if err != nil {
return errors.Trace(err)
}
}
}
}
Expand Down

0 comments on commit 0a38b87

Please sign in to comment.