From 7bf2a8a64d2beef6dec7701fdd88c0bb9d22b0d9 Mon Sep 17 00:00:00 2001 From: leoppro Date: Wed, 13 May 2020 11:33:47 +0800 Subject: [PATCH] fix race --- cdc/sink/mqProducer/kafka.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/cdc/sink/mqProducer/kafka.go b/cdc/sink/mqProducer/kafka.go index a1868dbfbac..126c6e973c0 100644 --- a/cdc/sink/mqProducer/kafka.go +++ b/cdc/sink/mqProducer/kafka.go @@ -236,6 +236,10 @@ func NewKafkaSaramaProducer(ctx context.Context, address string, topic string, c return k, nil } +func init() { + sarama.MaxRequestSize = 1024 * 1024 * 1024 // 1GB +} + // NewSaramaConfig return the default config and set the according version and metrics func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error) { config := sarama.NewConfig() @@ -255,7 +259,6 @@ func newSaramaConfig(ctx context.Context, c KafkaConfig) (*sarama.Config, error) config.ClientID = fmt.Sprintf("TiCDC_sarama_producer_%s_%s_%s", role, captureID, changefeedID) config.Version = version - sarama.MaxRequestSize = int32(c.MaxMessageBytes) config.Producer.Flush.MaxMessages = c.MaxMessageBytes config.Metadata.Retry.Max = 20 config.Metadata.Retry.Backoff = 500 * time.Millisecond