diff --git a/cmd/kafka-consumer/main.go b/cmd/kafka-consumer/main.go index fc05c04d8e9..68c6007b2a5 100644 --- a/cmd/kafka-consumer/main.go +++ b/cmd/kafka-consumer/main.go @@ -80,16 +80,17 @@ func init() { File: logPath, }) if err != nil { - log.Fatal("init logger failed", zap.Error(err)) + log.Panic("init logger failed", zap.Error(err)) } upstreamURI, err := url.Parse(upstreamURIStr) if err != nil { - log.Fatal("invalid upstream-uri", zap.Error(err)) + log.Panic("invalid upstream-uri", zap.Error(err)) } scheme := strings.ToLower(upstreamURI.Scheme) if scheme != "kafka" { - log.Fatal("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", zap.String("upstream-uri", upstreamURIStr)) + log.Panic("invalid upstream-uri scheme, the scheme of upstream-uri must be `kafka`", + zap.String("upstreamURI", upstreamURIStr)) } s := upstreamURI.Query().Get("version") if s != "" { @@ -106,20 +107,20 @@ func init() { config, err := newSaramaConfig() if err != nil { - log.Fatal("Error creating sarama config", zap.Error(err)) + log.Panic("Error creating sarama config", zap.Error(err)) } s = upstreamURI.Query().Get("partition-num") if s == "" { partition, err := getPartitionNum(kafkaAddrs, kafkaTopic, config) if err != nil { - log.Fatal("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err)) + log.Panic("can not get partition number", zap.String("topic", kafkaTopic), zap.Error(err)) } kafkaPartitionNum = partition } else { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid partition-num of upstream-uri") + log.Panic("invalid partition-num of upstream-uri") } kafkaPartitionNum = int32(c) } @@ -128,7 +129,7 @@ func init() { if s != "" { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid max-message-bytes of upstream-uri") + log.Panic("invalid max-message-bytes of upstream-uri") } log.Info("Setting max-message-bytes", zap.Int("max-message-bytes", c)) kafkaMaxMessageBytes = c @@ -138,7 +139,7 @@ func init() { if s != "" { c, err := strconv.Atoi(s) if err != nil { - log.Fatal("invalid max-batch-size of upstream-uri") + log.Panic("invalid max-batch-size of upstream-uri") } log.Info("Setting max-batch-size", zap.Int("max-batch-size", c)) kafkaMaxBatchSize = c @@ -227,24 +228,24 @@ func main() { */ config, err := newSaramaConfig() if err != nil { - log.Fatal("Error creating sarama config", zap.Error(err)) + log.Panic("Error creating sarama config", zap.Error(err)) } err = waitTopicCreated(kafkaAddrs, kafkaTopic, config) if err != nil { - log.Fatal("wait topic created failed", zap.Error(err)) + log.Panic("wait topic created failed", zap.Error(err)) } /** * Setup a new Sarama consumer group */ consumer, err := NewConsumer(context.TODO()) if err != nil { - log.Fatal("Error creating consumer", zap.Error(err)) + log.Panic("Error creating consumer", zap.Error(err)) } ctx, cancel := context.WithCancel(context.Background()) client, err := sarama.NewConsumerGroup(kafkaAddrs, kafkaGroupID, config) if err != nil { - log.Fatal("Error creating consumer group client", zap.Error(err)) + log.Panic("Error creating consumer group client", zap.Error(err)) } wg := &sync.WaitGroup{} @@ -256,7 +257,7 @@ func main() { // server-side rebalance happens, the consumer session will need to be // recreated to get the new claims if err := client.Consume(ctx, strings.Split(kafkaTopic, ","), consumer); err != nil { - log.Fatal("Error from consumer: %v", zap.Error(err)) + log.Panic("Error from consumer: %v", zap.Error(err)) } // check if context was cancelled, signaling that the consumer should stop if ctx.Err() != nil { @@ -268,7 +269,7 @@ func main() { go func() { if err := consumer.Run(ctx); err != nil { - log.Fatal("Error running consumer: %v", zap.Error(err)) + log.Panic("Error running consumer: %v", zap.Error(err)) } }() @@ -286,7 +287,7 @@ func main() { cancel() wg.Wait() if err = client.Close(); err != nil { - log.Fatal("Error closing client", zap.Error(err)) + log.Panic("Error closing client", zap.Error(err)) } } @@ -383,9 +384,9 @@ func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim saram if sink == nil { panic("sink should initialized") } -ClaimMessages: + for message := range claim.Messages() { - log.Info("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) + log.Debug("Message claimed", zap.Int32("partition", message.Partition), zap.ByteString("key", message.Key), zap.ByteString("value", message.Value)) batchDecoder, err := codec.NewJSONEventBatchDecoder(message.Key, message.Value) if err != nil { return errors.Trace(err) @@ -395,7 +396,7 @@ ClaimMessages: for { tp, hasNext, err := batchDecoder.HasNext() if err != nil { - log.Fatal("decode message key failed", zap.Error(err)) + log.Panic("decode message key failed", zap.Error(err)) } if !hasNext { break @@ -404,29 +405,30 @@ ClaimMessages: counter++ // If the message containing only one event exceeds the length limit, CDC will allow it and issue a warning. if len(message.Key)+len(message.Value) > kafkaMaxMessageBytes && counter > 1 { - log.Fatal("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), - zap.Int("recevied-bytes", len(message.Key)+len(message.Value))) + log.Panic("kafka max-messages-bytes exceeded", zap.Int("max-message-bytes", kafkaMaxMessageBytes), + zap.Int("receviedBytes", len(message.Key)+len(message.Value))) } switch tp { case model.MqMessageTypeDDL: ddl, err := batchDecoder.NextDDLEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } c.appendDDL(ddl) case model.MqMessageTypeRow: row, err := batchDecoder.NextRowChangedEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) if row.CommitTs <= globalResolvedTs || row.CommitTs <= sink.resolvedTs { - log.Debug("filter fallback row", zap.ByteString("row", message.Key), + log.Debug("RowChangedEvent fallback row, ignore it", + zap.Uint64("commitTs", row.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs), zap.Uint64("sinkResolvedTs", sink.resolvedTs), - zap.Int32("partition", partition)) - break ClaimMessages + zap.Int32("partition", partition), + zap.ByteString("row", message.Key)) } // FIXME: hack to set start-ts in row changed event, as start-ts // is not contained in TiCDC open protocol @@ -439,7 +441,7 @@ ClaimMessages: c.fakeTableIDGenerator.generateFakeTableID(row.Table.Schema, row.Table.Table, partitionID) err = sink.EmitRowChangedEvents(ctx, row) if err != nil { - log.Fatal("emit row changed event failed", zap.Error(err)) + log.Panic("emit row changed event failed", zap.Error(err)) } lastCommitTs, ok := sink.tablesMap.Load(row.Table.TableID) if !ok || lastCommitTs.(uint64) < row.CommitTs { @@ -448,21 +450,29 @@ ClaimMessages: case model.MqMessageTypeResolved: ts, err := batchDecoder.NextResolvedEvent() if err != nil { - log.Fatal("decode message value failed", zap.ByteString("value", message.Value)) + log.Panic("decode message value failed", zap.ByteString("value", message.Value)) } resolvedTs := atomic.LoadUint64(&sink.resolvedTs) - if resolvedTs < ts { + // `resolvedTs` should be monotonically increasing, it's allowed to receive redandunt one. + if ts < resolvedTs { + log.Panic("partition resolved ts fallback", + zap.Uint64("ts", ts), + zap.Uint64("resolvedTs", resolvedTs), + zap.Int32("partition", partition)) + } else if ts > resolvedTs { log.Debug("update sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition)) atomic.StoreUint64(&sink.resolvedTs, ts) + } else { + log.Info("redundant sink resolved ts", zap.Uint64("ts", ts), zap.Int32("partition", partition)) } } session.MarkMessage(message, "") } if counter > kafkaMaxBatchSize { - log.Fatal("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), + log.Panic("Open Protocol max-batch-size exceeded", zap.Int("max-batch-size", kafkaMaxBatchSize), zap.Int("actual-batch-size", counter)) } } @@ -477,8 +487,11 @@ func (c *Consumer) appendDDL(ddl *model.DDLEvent) { return } globalResolvedTs := atomic.LoadUint64(&c.globalResolvedTs) - if ddl.CommitTs <= globalResolvedTs { - log.Error("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) + if ddl.CommitTs < globalResolvedTs { + log.Panic("unexpected ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) + } + if ddl.CommitTs == globalResolvedTs { + log.Warn("receive redundant ddl job", zap.Uint64("ddlts", ddl.CommitTs), zap.Uint64("globalResolvedTs", globalResolvedTs)) return } c.ddlList = append(c.ddlList, ddl) @@ -519,14 +532,15 @@ func (c *Consumer) forEachSink(fn func(sink *partitionSink) error) error { // Run runs the Consumer func (c *Consumer) Run(ctx context.Context) error { var lastGlobalResolvedTs uint64 + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() for { select { case <-ctx.Done(): return ctx.Err() - default: + case <-ticker.C: } - time.Sleep(100 * time.Millisecond) - // handle ddl + // initialize the `globalResolvedTs` as min of all partition's `ResolvedTs` globalResolvedTs := uint64(math.MaxUint64) err := c.forEachSink(func(sink *partitionSink) error { resolvedTs := atomic.LoadUint64(&sink.resolvedTs) @@ -538,6 +552,7 @@ func (c *Consumer) Run(ctx context.Context) error { if err != nil { return errors.Trace(err) } + // handle ddl todoDDL := c.getFrontDDL() if todoDDL != nil && globalResolvedTs >= todoDDL.CommitTs { // flush DMLs @@ -560,18 +575,21 @@ func (c *Consumer) Run(ctx context.Context) error { if todoDDL != nil && todoDDL.CommitTs < globalResolvedTs { globalResolvedTs = todoDDL.CommitTs } - if lastGlobalResolvedTs == globalResolvedTs { - continue + if lastGlobalResolvedTs > globalResolvedTs { + log.Panic("global ResolvedTs fallback") } - lastGlobalResolvedTs = globalResolvedTs - atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs) - log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) - err = c.forEachSink(func(sink *partitionSink) error { - return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) - }) - if err != nil { - return errors.Trace(err) + if globalResolvedTs > lastGlobalResolvedTs { + lastGlobalResolvedTs = globalResolvedTs + atomic.StoreUint64(&c.globalResolvedTs, globalResolvedTs) + log.Info("update globalResolvedTs", zap.Uint64("ts", globalResolvedTs)) + + err = c.forEachSink(func(sink *partitionSink) error { + return syncFlushRowChangedEvents(ctx, sink, globalResolvedTs) + }) + if err != nil { + return errors.Trace(err) + } } } }