Skip to content

Commit

Permalink
remove skipAutoCreate failpoint from kafka test. (pingcap#4673)
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand authored Feb 24, 2022
1 parent 34e5660 commit 19b9177
Show file tree
Hide file tree
Showing 5 changed files with 413 additions and 166 deletions.
243 changes: 122 additions & 121 deletions cdc/sink/producer/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tiflow/cdc/sink/codec"
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/kafka"
"github.com/pingcap/tiflow/pkg/notify"
"github.com/pingcap/tiflow/pkg/security"
"github.com/pingcap/tiflow/pkg/util"
Expand Down Expand Up @@ -397,37 +398,122 @@ func (k *kafkaSaramaProducer) run(ctx context.Context) error {
}
}

func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config) error {
// FIXME: find a way to remove this failpoint for workload the unit test
failpoint.Inject("SkipTopicAutoCreate", func() {
failpoint.Return(nil)
})
admin, err := sarama.NewClusterAdmin(config.BrokerEndpoints, saramaConfig)
var (
newSaramaConfigImpl = newSaramaConfig
// NewAdminClientImpl specifies the build method for the admin client.
NewAdminClientImpl kafka.ClusterAdminClientCreator = kafka.NewSaramaAdminClient
)

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return nil, err
}

admin, err := NewAdminClientImpl(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
defer func() {
if err := admin.Close(); err != nil {
log.Warn("close kafka cluster admin failed", zap.Error(err))
}
}()

if err := validateAndCreateTopic(admin, topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
return k, nil
}

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

var (
validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
commonInvalidChar = regexp.MustCompile(`[\?:,"]`)
)

func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) {
if configuredClientID != "" {
clientID = configuredClientID
} else {
clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
}
if !validClientID.MatchString(clientID) {
return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID)
}
return
}

func validateAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error {
topics, err := admin.ListTopics()
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

info, created := topics[topic]
info, exists := topics[topic]
// once we have found the topic, no matter `auto-create-topic`, make sure user input parameters are valid.
if created {
if exists {
// make sure that producer's `MaxMessageBytes` smaller than topic's `max.message.bytes`
topicMaxMessageBytes, err := getTopicMaxMessageBytes(admin, info)
topicMaxMessageBytesStr, err := getTopicConfig(admin, info, kafka.TopicMaxMessageBytesConfigName,
kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
topicMaxMessageBytes, err := strconv.Atoi(topicMaxMessageBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

if topicMaxMessageBytes < config.MaxMessageBytes {
log.Warn("topic's `max.message.bytes` less than the user set `max-message-bytes`,"+
log.Warn("topic's `max.message.bytes` less than the `max-message-bytes`,"+
"use topic's `max.message.bytes` to initialize the Kafka producer",
zap.Int("max.message.bytes", topicMaxMessageBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
Expand All @@ -440,7 +526,7 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
zap.String("topic", topic), zap.Any("detail", info))
}

if err := config.adjustPartitionNum(info.NumPartitions); err != nil {
if err := config.setPartitionNum(info.NumPartitions); err != nil {
return errors.Trace(err)
}

Expand All @@ -451,25 +537,29 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
return cerror.ErrKafkaInvalidConfig.GenWithStack("`auto-create-topic` is false, and topic not found")
}

brokerMessageMaxBytes, err := getBrokerMessageMaxBytes(admin)
brokerMessageMaxBytesStr, err := getBrokerConfig(admin, kafka.BrokerMessageMaxBytesConfigName)
if err != nil {
log.Warn("TiCDC cannot find `message.max.bytes` from broker's configuration")
return errors.Trace(err)
}
brokerMessageMaxBytes, err := strconv.Atoi(brokerMessageMaxBytesStr)
if err != nil {
return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

// when create the topic, `max.message.bytes` is decided by the broker,
// it would use broker's `message.max.bytes` to set topic's `max.message.bytes`.
// TiCDC need to make sure that the producer's `MaxMessageBytes` won't larger than
// broker's `message.max.bytes`.
if brokerMessageMaxBytes < config.MaxMessageBytes {
log.Warn("broker's `message.max.bytes` less than the user set `max-message-bytes`,"+
log.Warn("broker's `message.max.bytes` less than the `max-message-bytes`,"+
"use broker's `message.max.bytes` to initialize the Kafka producer",
zap.Int("message.max.bytes", brokerMessageMaxBytes),
zap.Int("max-message-bytes", config.MaxMessageBytes))
saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes
}

// topic not created yet, and user does not specify the `partition-num` in the sink uri.
// topic not exists yet, and user does not specify the `partition-num` in the sink uri.
if config.PartitionNum == 0 {
config.PartitionNum = defaultPartitionNum
log.Warn("partition-num is not set, use the default partition count",
Expand All @@ -492,87 +582,6 @@ func topicPreProcess(topic string, config *Config, saramaConfig *sarama.Config)
return nil
}

var newSaramaConfigImpl = newSaramaConfig

// NewKafkaSaramaProducer creates a kafka sarama producer
func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) {
log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config))
cfg, err := newSaramaConfigImpl(ctx, config)
if err != nil {
return nil, err
}

if err := topicPreProcess(topic, config, cfg); err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
opts["max-message-bytes"] = strconv.Itoa(cfg.Producer.MaxMessageBytes)

asyncClient, err := sarama.NewAsyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
syncClient, err := sarama.NewSyncProducer(config.BrokerEndpoints, cfg)
if err != nil {
return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

notifier := new(notify.Notifier)
flushedReceiver, err := notifier.NewReceiver(50 * time.Millisecond)
if err != nil {
return nil, err
}
k := &kafkaSaramaProducer{
asyncClient: asyncClient,
syncClient: syncClient,
topic: topic,
partitionNum: config.PartitionNum,
partitionOffset: make([]struct {
flushed uint64
sent uint64
}, config.PartitionNum),
flushedNotifier: notifier,
flushedReceiver: flushedReceiver,
closeCh: make(chan struct{}),
failpointCh: make(chan error, 1),
closing: kafkaProducerRunning,
}
go func() {
if err := k.run(ctx); err != nil && errors.Cause(err) != context.Canceled {
select {
case <-ctx.Done():
return
case errCh <- err:
default:
log.Error("error channel is full", zap.Error(err))
}
}
}()
return k, nil
}

func init() {
sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB
}

var (
validClientID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
commonInvalidChar = regexp.MustCompile(`[\?:,"]`)
)

func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) (clientID string, err error) {
if configuredClientID != "" {
clientID = configuredClientID
} else {
clientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureAddr, changefeedID)
clientID = commonInvalidChar.ReplaceAllString(clientID, "_")
}
if !validClientID.MatchString(clientID) {
return "", cerror.ErrKafkaInvalidClientID.GenWithStackByArgs(clientID)
}
return
}

// NewSaramaConfig return the default config and set the according version and metrics
func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
config := sarama.NewConfig()

Expand Down Expand Up @@ -666,50 +675,42 @@ func newSaramaConfig(ctx context.Context, c *Config) (*sarama.Config, error) {
return config, err
}

func getBrokerMessageMaxBytes(admin sarama.ClusterAdmin) (int, error) {
target := "message.max.bytes"
// getBrokerConfig gets broker config by name.
func getBrokerConfig(admin kafka.ClusterAdminClient, brokerConfigName string) (string, error) {
_, controllerID, err := admin.DescribeCluster()
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return "", err
}

configEntries, err := admin.DescribeConfig(sarama.ConfigResource{
Type: sarama.BrokerResource,
Name: strconv.Itoa(int(controllerID)),
ConfigNames: []string{target},
ConfigNames: []string{brokerConfigName},
})
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
return "", err
}

if len(configEntries) == 0 || configEntries[0].Name != target {
return 0, cerror.ErrKafkaNewSaramaProducer.GenWithStack(
"since cannot find the `message.max.bytes` from the broker's configuration, " +
"ticdc decline to create the topic and changefeed to prevent potential error")
if len(configEntries) == 0 || configEntries[0].Name != brokerConfigName {
return "", errors.New(fmt.Sprintf(
"cannot find the `%s` from the broker's configuration", brokerConfigName))
}

result, err := strconv.Atoi(configEntries[0].Value)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}

return result, nil
return configEntries[0].Value, nil
}

func getTopicMaxMessageBytes(admin sarama.ClusterAdmin, info sarama.TopicDetail) (int, error) {
if a, ok := info.ConfigEntries["max.message.bytes"]; ok {
result, err := strconv.Atoi(*a)
if err != nil {
return 0, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err)
}
return result, nil
// getTopicConfig gets topic config by name.
// If the topic does not have this configuration, we will try to get it from the broker's configuration.
// NOTICE: The configuration names of topic and broker may be different for the same configuration.
func getTopicConfig(admin kafka.ClusterAdminClient, detail sarama.TopicDetail, topicConfigName string, brokerConfigName string) (string, error) {
if a, ok := detail.ConfigEntries[topicConfigName]; ok {
return *a, nil
}

return getBrokerMessageMaxBytes(admin)
return getBrokerConfig(admin, brokerConfigName)
}

// adjust the partition-num by the topic's partition count
func (c *Config) adjustPartitionNum(realPartitionCount int32) error {
func (c *Config) setPartitionNum(realPartitionCount int32) error {
// user does not specify the `partition-num` in the sink-uri
if c.PartitionNum == 0 {
c.PartitionNum = realPartitionCount
Expand Down
Loading

0 comments on commit 19b9177

Please sign in to comment.