Skip to content

Commit

Permalink
Merge "[FAB-1253] Allow attempt to send >1MB blocks to kafka"
Browse files Browse the repository at this point in the history
  • Loading branch information
Jason Yellick authored and Gerrit Code Review committed Dec 5, 2016
2 parents 18c4999 + 6d03a16 commit 50f5ca9
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 0 deletions.
3 changes: 3 additions & 0 deletions orderer/kafka/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ func newBrokerConfig(conf *config.TopLevel) *sarama.Config {
brokerConfig := sarama.NewConfig()
brokerConfig.Version = conf.Kafka.Version
brokerConfig.Producer.Partitioner = newStaticPartitioner(conf.Kafka.PartitionID)
// set equivalent of kafka producer config max.request.bytes to the deafult
// value of a kafka server's socket.request.max.bytes property (100MiB).
brokerConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
return brokerConfig
}

Expand Down
41 changes: 41 additions & 0 deletions orderer/kafka/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,47 @@ func TestStaticPartitioner(t *testing.T) {
}
}

func TestProducerConfigMessageMaxBytes(t *testing.T) {

topic := testConf.Kafka.Topic

broker := sarama.NewMockBroker(t, 1000)
broker.SetHandlerByMap(map[string]sarama.MockResponse{
"MetadataRequest": sarama.NewMockMetadataResponse(t).
SetBroker(broker.Addr(), broker.BrokerID()).
SetLeader(topic, 0, broker.BrokerID()),
"ProduceRequest": sarama.NewMockProduceResponse(t),
})

config := newBrokerConfig(testConf)
producer, err := sarama.NewSyncProducer([]string{broker.Addr()}, config)
if err != nil {
t.Fatal(err)
}

testCases := []struct {
name string
size int
err error
}{
{"TypicalDeploy", 8 * 1024 * 1024, nil},
{"TooBig", 100*1024*1024 + 1, sarama.ErrMessageSizeTooLarge},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, _, err = producer.SendMessage(&sarama.ProducerMessage{Topic: topic, Value: sarama.ByteEncoder(make([]byte, tc.size))})
if err != tc.err {
t.Fatal(err)
}
})

}

producer.Close()
broker.Close()
}

func TestNewBrokerConfig(t *testing.T) {

topic := testConf.Kafka.Topic
Expand Down

0 comments on commit 50f5ca9

Please sign in to comment.