From 2737fff9c5782467cea5d64463aca6caea8b1c65 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Sun, 26 Dec 2021 19:30:07 +0800 Subject: [PATCH] codec(cdc): fix encoder `max-message-bytes` (#4074) * fix encoder. * add test. * fix test. * fix encoder SetParams. * fix test in json. * update craft test. * fix craft test. * Refine error. * Update cdc/sink/codec/craft.go Co-authored-by: amyangfei * Update cdc/sink/codec/craft.go Co-authored-by: amyangfei * fix test in craft. Co-authored-by: amyangfei --- cdc/sink/codec/craft.go | 13 ++++---- cdc/sink/codec/craft_test.go | 14 ++++---- cdc/sink/codec/json.go | 18 +++++----- cdc/sink/codec/json_test.go | 42 ++++++++++++++++++++---- cdc/sink/mq.go | 2 +- cdc/sink/mq_test.go | 4 +-- cdc/sink/producer/kafka/kafka.go | 8 +++-- cdc/sink/producer/kafka/kafka_test.go | 47 ++++++++++++++++++++------- 8 files changed, 103 insertions(+), 45 deletions(-) diff --git a/cdc/sink/codec/craft.go b/cdc/sink/codec/craft.go index 45228ba90a2..b3f35187a29 100644 --- a/cdc/sink/codec/craft.go +++ b/cdc/sink/codec/craft.go @@ -101,13 +101,14 @@ func (e *CraftEventBatchEncoder) Reset() { // SetParams reads relevant parameters for craft protocol func (e *CraftEventBatchEncoder) SetParams(params map[string]string) error { var err error + maxMessageBytes, ok := params["max-message-bytes"] + if !ok { + return cerror.ErrSinkInvalidConfig.GenWithStack("max-message-bytes not found") + } - e.maxMessageBytes = config.DefaultMaxMessageBytes - if maxMessageBytes, ok := params["max-message-bytes"]; ok { - e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) - if err != nil { - return cerror.ErrSinkInvalidConfig.Wrap(err) - } + e.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) + if err != nil { + return cerror.WrapError(cerror.ErrSinkInvalidConfig, err) } if e.maxMessageBytes <= 0 || e.maxMessageBytes > math.MaxInt32 { return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", e.maxMessageBytes)) diff --git a/cdc/sink/codec/craft_test.go b/cdc/sink/codec/craft_test.go index 75752168da5..5841632eb1c 100644 --- a/cdc/sink/codec/craft_test.go +++ b/cdc/sink/codec/craft_test.go @@ -139,7 +139,7 @@ func (s *craftBatchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatc func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) { defer testleak.AfterTest(c)() encoder := NewCraftEventBatchEncoder().(*CraftEventBatchEncoder) - err := encoder.SetParams(map[string]string{}) + err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) @@ -158,21 +158,21 @@ func (s *craftBatchSuite) TestParamsEdgeCases(c *check.C) { err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.NotNil) - err = encoder.SetParams(map[string]string{"max-batch-size": "0"}) + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "0"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") - err = encoder.SetParams(map[string]string{"max-batch-size": "-1"}) + err = encoder.SetParams(map[string]string{"max-message-bytes": strconv.Itoa(math.MaxUint32), "max-batch-size": "-1"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint16)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint16)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, int(math.MaxUint16)) c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.NotNil) - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.NotNil) } @@ -203,7 +203,7 @@ func (s *craftBatchSuite) TestMaxMessageBytes(c *check.C) { func (s *craftBatchSuite) TestMaxBatchSize(c *check.C) { defer testleak.AfterTest(c)() encoder := NewCraftEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) + err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": "64"}) c.Check(err, check.IsNil) testEvent := &model.RowChangedEvent{ diff --git a/cdc/sink/codec/json.go b/cdc/sink/codec/json.go index 6ff6d94a3a5..e1b4170d6ef 100644 --- a/cdc/sink/codec/json.go +++ b/cdc/sink/codec/json.go @@ -385,8 +385,8 @@ type JSONEventBatchEncoder struct { maxBatchSize int } -// GetMaxMessageSize is only for unit testing. -func (d *JSONEventBatchEncoder) GetMaxMessageSize() int { +// GetMaxMessageBytes is only for unit testing. +func (d *JSONEventBatchEncoder) GetMaxMessageBytes() int { return d.maxMessageBytes } @@ -615,12 +615,14 @@ func (d *JSONEventBatchEncoder) Reset() { func (d *JSONEventBatchEncoder) SetParams(params map[string]string) error { var err error - d.maxMessageBytes = config.DefaultMaxMessageBytes - if maxMessageBytes, ok := params["max-message-bytes"]; ok { - d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) - if err != nil { - return cerror.ErrSinkInvalidConfig.Wrap(err) - } + maxMessageBytes, ok := params["max-message-bytes"] + if !ok { + return cerror.ErrSinkInvalidConfig.Wrap(errors.New("max-message-bytes not found")) + } + + d.maxMessageBytes, err = strconv.Atoi(maxMessageBytes) + if err != nil { + return cerror.ErrSinkInvalidConfig.Wrap(err) } if d.maxMessageBytes <= 0 { return cerror.ErrSinkInvalidConfig.Wrap(errors.Errorf("invalid max-message-bytes %d", d.maxMessageBytes)) diff --git a/cdc/sink/codec/json_test.go b/cdc/sink/codec/json_test.go index 1e15e1d7fcb..d7f1499ec30 100644 --- a/cdc/sink/codec/json_test.go +++ b/cdc/sink/codec/json_test.go @@ -14,12 +14,14 @@ package codec import ( + "context" "math" "sort" "strconv" "testing" "github.com/pingcap/check" + "github.com/pingcap/errors" "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tiflow/cdc/model" "github.com/pingcap/tiflow/pkg/config" @@ -201,7 +203,7 @@ func (s *batchSuite) testBatchCodec(c *check.C, newEncoder func() EventBatchEnco func (s *batchSuite) TestParamsEdgeCases(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder().(*JSONEventBatchEncoder) - err := encoder.SetParams(map[string]string{}) + err := encoder.SetParams(map[string]string{"max-message-bytes": "10485760"}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, DefaultMaxBatchSize) c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) @@ -228,17 +230,43 @@ func (s *batchSuite) TestParamsEdgeCases(c *check.C) { err = encoder.SetParams(map[string]string{"max-batch-size": "-1"}) c.Assert(err, check.ErrorMatches, ".*invalid.*") - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxInt32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxInt32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxInt32) c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) - err = encoder.SetParams(map[string]string{"max-batch-size": strconv.Itoa(math.MaxUint32)}) + err = encoder.SetParams(map[string]string{"max-message-bytes": "10485760", "max-batch-size": strconv.Itoa(math.MaxUint32)}) c.Assert(err, check.IsNil) c.Assert(encoder.maxBatchSize, check.Equals, math.MaxUint32) c.Assert(encoder.maxMessageBytes, check.Equals, config.DefaultMaxMessageBytes) } +func (s *batchSuite) TestSetParams(c *check.C) { + defer testleak.AfterTest(c) + + opts := make(map[string]string) + encoderBuilder := newJSONEventBatchEncoderBuilder(opts) + c.Assert(encoderBuilder, check.NotNil) + encoder, err := encoderBuilder.Build(context.Background()) + c.Assert(encoder, check.IsNil) + c.Assert( + errors.Cause(err), + check.ErrorMatches, + ".*max-message-bytes not found.*", + ) + + opts["max-message-bytes"] = "1" + encoderBuilder = newJSONEventBatchEncoderBuilder(opts) + c.Assert(encoderBuilder, check.NotNil) + encoder, err = encoderBuilder.Build(context.Background()) + c.Assert(err, check.IsNil) + c.Assert(encoder, check.NotNil) + + jsonEncoder, ok := encoder.(*JSONEventBatchEncoder) + c.Assert(ok, check.IsTrue) + c.Assert(jsonEncoder.GetMaxMessageBytes(), check.Equals, 1) +} + func (s *batchSuite) TestMaxMessageBytes(c *check.C) { defer testleak.AfterTest(c)() encoder := NewJSONEventBatchEncoder() @@ -283,9 +311,11 @@ func (s *batchSuite) TestMaxMessageBytes(c *check.C) { func (s *batchSuite) TestMaxBatchSize(c *check.C) { defer testleak.AfterTest(c)() - encoder := NewJSONEventBatchEncoder() - err := encoder.SetParams(map[string]string{"max-batch-size": "64"}) - c.Check(err, check.IsNil) + encoderBuilder := newJSONEventBatchEncoderBuilder(map[string]string{"max-message-bytes": "1048576", "max-batch-size": "64"}) + c.Assert(encoderBuilder, check.NotNil) + encoder, err := encoderBuilder.Build(context.Background()) + c.Assert(err, check.IsNil) + c.Assert(encoder, check.NotNil) testEvent := &model.RowChangedEvent{ CommitTs: 1, diff --git a/cdc/sink/mq.go b/cdc/sink/mq.go index 83d5f846802..9f78bd79f88 100644 --- a/cdc/sink/mq.go +++ b/cdc/sink/mq.go @@ -395,7 +395,7 @@ func newKafkaSaramaSink(ctx context.Context, sinkURI *url.URL, filter *filter.Fi return nil, cerror.ErrKafkaInvalidConfig.GenWithStack("no topic is specified in sink-uri") } - sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, errCh) + sProducer, err := kafka.NewKafkaSaramaProducer(ctx, topic, producerConfig, opts, errCh) if err != nil { return nil, errors.Trace(err) } diff --git a/cdc/sink/mq_test.go b/cdc/sink/mq_test.go index 417049c6552..afa6162a75f 100644 --- a/cdc/sink/mq_test.go +++ b/cdc/sink/mq_test.go @@ -77,7 +77,7 @@ func (s mqSinkSuite) TestKafkaSink(c *check.C) { c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 1048576) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 1048576) // mock kafka broker processes 1 row changed event leader.Returns(prodSuccess) @@ -234,7 +234,7 @@ func (s mqSinkSuite) TestPulsarSinkEncoderConfig(c *check.C) { c.Assert(err, check.IsNil) c.Assert(encoder, check.FitsTypeOf, &codec.JSONEventBatchEncoder{}) c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxBatchSize(), check.Equals, 1) - c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageSize(), check.Equals, 4194304) + c.Assert(encoder.(*codec.JSONEventBatchEncoder).GetMaxMessageBytes(), check.Equals, 4194304) } func (s mqSinkSuite) TestFlushRowChangedEvents(c *check.C) { diff --git a/cdc/sink/producer/kafka/kafka.go b/cdc/sink/producer/kafka/kafka.go index b891a842bd4..c361e2c4678 100644 --- a/cdc/sink/producer/kafka/kafka.go +++ b/cdc/sink/producer/kafka/kafka.go @@ -263,7 +263,7 @@ var ( ) // NewKafkaSaramaProducer creates a kafka sarama producer -func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, errCh chan error) (*kafkaSaramaProducer, error) { +func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, opts map[string]string, errCh chan error) (*kafkaSaramaProducer, error) { log.Info("Starting kafka sarama producer ...", zap.Reflect("config", config)) cfg, err := newSaramaConfigImpl(ctx, config) if err != nil { @@ -280,7 +280,7 @@ func NewKafkaSaramaProducer(ctx context.Context, topic string, config *Config, e } }() - if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg); err != nil { + if err := validateMaxMessageBytesAndCreateTopic(admin, topic, config, cfg, opts); err != nil { return nil, cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) } @@ -345,7 +345,7 @@ func kafkaClientID(role, captureAddr, changefeedID, configuredClientID string) ( return } -func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config) error { +func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic string, config *Config, saramaConfig *sarama.Config, opts map[string]string) error { topics, err := admin.ListTopics() if err != nil { return cerror.WrapError(cerror.ErrKafkaNewSaramaProducer, err) @@ -367,6 +367,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic zap.Int("max-message-bytes", config.MaxMessageBytes)) saramaConfig.Producer.MaxMessageBytes = topicMaxMessageBytes } + opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes) // no need to create the topic, but we would have to log user if they found enter wrong topic name later if config.AutoCreate { @@ -402,6 +403,7 @@ func validateMaxMessageBytesAndCreateTopic(admin kafka.ClusterAdminClient, topic zap.Int("max-message-bytes", config.MaxMessageBytes)) saramaConfig.Producer.MaxMessageBytes = brokerMessageMaxBytes } + opts["max-message-bytes"] = strconv.Itoa(saramaConfig.Producer.MaxMessageBytes) // topic not exists yet, and user does not specify the `partition-num` in the sink uri. if config.PartitionNum == 0 { diff --git a/cdc/sink/producer/kafka/kafka_test.go b/cdc/sink/producer/kafka/kafka_test.go index 137849df796..5a138833b8e 100644 --- a/cdc/sink/producer/kafka/kafka_test.go +++ b/cdc/sink/producer/kafka/kafka_test.go @@ -15,6 +15,7 @@ package kafka import ( "context" + "strconv" "strings" "sync" "testing" @@ -106,9 +107,11 @@ func (s *kafkaSuite) TestNewSaramaProducer(c *check.C) { NewAdminClientImpl = kafka.NewSaramaAdminClient }() - producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(err, check.IsNil) c.Assert(producer.GetPartitionNum(), check.Equals, int32(2)) + c.Assert(opts, check.HasKey, "max-message-bytes") for i := 0; i < 100; i++ { err = producer.AsyncSendMessage(ctx, &codec.MQMessage{ Key: []byte("test-key-1"), @@ -200,8 +203,10 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { config.MaxMessageBytes = adminClient.GetDefaultMaxMessageBytes() cfg, err := newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) + opts := make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) c.Assert(err, check.IsNil) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) // When topic exists and max message bytes is not set correctly. // use the smaller one. @@ -209,22 +214,27 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { config.MaxMessageBytes = defaultMaxMessageBytes + 1 cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) config.MaxMessageBytes = defaultMaxMessageBytes - 1 cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, adminClient.GetDefaultMockTopicName(), config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) // When topic does not exist and auto-create is not enabled. config.AutoCreate = false cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "non-exist", config, cfg, opts) c.Assert( errors.Cause(err), check.ErrorMatches, @@ -237,9 +247,11 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { config.MaxMessageBytes = defaultMaxMessageBytes - 1 cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-success", config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) // When the topic does not exist, use the broker's configuration to create the topic. // It is larger than the value of broker. @@ -247,9 +259,11 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { config.AutoCreate = true cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "create-new-fail", config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) // When the topic exists, but the topic does not store max message bytes info, // the check of parameter succeeds. @@ -264,9 +278,11 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { } err = adminClient.CreateTopic("test-topic", detail, false) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, config.MaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) // When the topic exists, but the topic does not store max message bytes info, // the check of parameter fails. @@ -274,9 +290,11 @@ func (s *kafkaSuite) TestValidateMaxMessageBytesAndCreateTopic(c *check.C) { config.MaxMessageBytes = defaultMaxMessageBytes + 1 cfg, err = newSaramaConfigImpl(context.Background(), config) c.Assert(err, check.IsNil) - err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg) + opts = make(map[string]string) + err = validateMaxMessageBytesAndCreateTopic(adminClient, "test-topic", config, cfg, opts) c.Assert(err, check.IsNil) c.Assert(cfg.Producer.MaxMessageBytes, check.Equals, defaultMaxMessageBytes) + c.Assert(opts["max-message-bytes"], check.Equals, strconv.Itoa(cfg.Producer.MaxMessageBytes)) } func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { @@ -291,7 +309,8 @@ func (s *kafkaSuite) TestCreateProducerFailed(c *check.C) { defer func() { NewAdminClientImpl = kafka.NewSaramaAdminClient }() - _, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) + opts := make(map[string]string) + _, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) c.Assert(errors.Cause(err), check.ErrorMatches, "invalid version.*") } @@ -338,7 +357,9 @@ func (s *kafkaSuite) TestProducerSendMessageFailed(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { err := producer.Close() c.Assert(err, check.IsNil) @@ -405,7 +426,9 @@ func (s *kafkaSuite) TestProducerDoubleClose(c *check.C) { }() errCh := make(chan error, 1) - producer, err := NewKafkaSaramaProducer(ctx, topic, config, errCh) + opts := make(map[string]string) + producer, err := NewKafkaSaramaProducer(ctx, topic, config, opts, errCh) + c.Assert(opts, check.HasKey, "max-message-bytes") defer func() { err := producer.Close() c.Assert(err, check.IsNil)