diff --git a/orderer/kafka/util.go b/orderer/kafka/util.go index 7d48c23a713..023ff9bb7c6 100644 --- a/orderer/kafka/util.go +++ b/orderer/kafka/util.go @@ -31,6 +31,9 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config { brokerConfig := sarama.NewConfig() brokerConfig.Version = conf.Kafka.Version brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID) + // set equivalent of kafka producer config max.request.bytes to the deafult + // value of a kafka server's socket.request.max.bytes property (100MiB). + brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize) return brokerConfig } diff --git a/orderer/kafka/util_test.go b/orderer/kafka/util_test.go index f5c996f2009..cbd85521909 100644 --- a/orderer/kafka/util_test.go +++ b/orderer/kafka/util_test.go @@ -41,6 +41,47 @@ func TestStaticPartitioner(t *testing.T) { } } +func TestProducerConfigMessageMaxBytes(t *testing.T) { + + topic := testConf.Kafka.Topic + + broker := sarama.NewMockBroker(t, 1000) + broker.SetHandlerByMap(map[string]sarama.MockResponse{ + "MetadataRequest": sarama.NewMockMetadataResponse(t). + SetBroker(broker.Addr(), broker.BrokerID()). + SetLeader(topic, 0, broker.BrokerID()), + "ProduceRequest": sarama.NewMockProduceResponse(t), + }) + + config := newBrokerConfig(testConf) + producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config) + if err != nil { + t.Fatal(err) + } + + testCases := []struct { + name string + size int + err error + }{ + {"TypicalDeploy", 8 * 1024 * 1024, nil}, + {"TooBig", 100*1024*1024 + 1, sarama.ErrMessageSizeTooLarge}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(make([]byte, tc.size))}) + if err != tc.err { + t.Fatal(err) + } + }) + + } + + producer.Close() + broker.Close() +} + func TestNewBrokerConfig(t *testing.T) { topic := testConf.Kafka.Topic