Skip to content

Commit

Permalink
Merge pull request #1574 from Shopify/diego_zstd-support-kafka-2_1_0_0
Browse files Browse the repository at this point in the history
Enables zstd (for real this time)
  • Loading branch information
d1egoaz authored Jan 23, 2020
2 parents 7629590 + 37faed7 commit 30e7094
Show file tree
Hide file tree
Showing 9 changed files with 115 additions and 28 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog


#### Unreleased

Improvements:
- Enable zstd compression
([1574](https://github.com/Shopify/sarama/pull/1574),
[1582](https://github.com/Shopify/sarama/pull/1582))

#### Version 1.25.0 (2020-01-13)

New Features:
Expand Down
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,10 @@ func (c *Config) Validate() error {
}
}

if c.Producer.Compression == CompressionZSTD && !c.Version.IsAtLeast(V2_1_0_0) {
return ConfigurationError("zstd compression requires Version >= V2_1_0_0")
}

if c.Producer.Idempotent {
if !c.Version.IsAtLeast(V0_11_0_0) {
return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
Expand Down
12 changes: 12 additions & 0 deletions config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,18 @@ func TestLZ4ConfigValidation(t *testing.T) {
}
}

func TestZstdConfigValidation(t *testing.T) {
config := NewConfig()
config.Producer.Compression = CompressionZSTD
if err := config.Validate(); string(err.(ConfigurationError)) != "zstd compression requires Version >= V2_1_0_0" {
t.Error("Expected invalid zstd/kafka version error, got ", err)
}
config.Version = V2_1_0_0
if err := config.Validate(); err != nil {
t.Error("Expected zstd to work, got ", err)
}
}

// This example shows how to integrate with an existing registry as well as publishing metrics
// on the standard output
func ExampleConfig_metrics() {
Expand Down
4 changes: 4 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,10 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}

if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
request.Version = 10
}

for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
}
Expand Down
15 changes: 15 additions & 0 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@ func TestVersionMatrixLZ4(t *testing.T) {
consumeMsgs(t, testVersions, producedMessages)
}

// Support for zstd codec was introduced in v2.1.0.0
func TestVersionMatrixZstd(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

// Produce lot's of message with all possible combinations of supported
// protocol versions starting with v2.1.0.0 (first where zstd was supported)
testVersions := versionRange(V2_1_0_0)
allCodecs := []CompressionCodec{CompressionZSTD}
producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)

// When/Then
consumeMsgs(t, testVersions, producedMessages)
}

func TestVersionMatrixIdempotent(t *testing.T) {
setupFunctionalTest(t)
defer teardownFunctionalTest(t)
Expand Down
2 changes: 2 additions & 0 deletions produce_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
return V0_10_0_0
case 3:
return V0_11_0_0
case 7:
return V2_1_0_0
default:
return MinVersion
}
Expand Down
53 changes: 34 additions & 19 deletions produce_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,27 @@ import (
"time"
)

// Protocol, http://kafka.apache.org/protocol.html
// v1
// v2 = v3 = v4
// v5 = v6 = v7
// Produce Response (Version: 7) => [responses] throttle_time_ms
// responses => topic [partition_responses]
// topic => STRING
// partition_responses => partition error_code base_offset log_append_time log_start_offset
// partition => INT32
// error_code => INT16
// base_offset => INT64
// log_append_time => INT64
// log_start_offset => INT64
// throttle_time_ms => INT32

// partition_responses in protocol
type ProduceResponseBlock struct {
Err KError
Offset int64
// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
Timestamp time.Time
Err KError // v0, error_code
Offset int64 // v0, base_offset
Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
StartOffset int64 // v5, log_start_offset
}

func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
Expand All @@ -32,6 +48,13 @@ func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err erro
}
}

if version >= 5 {
b.StartOffset, err = pd.getInt64()
if err != nil {
return err
}
}

return nil
}

Expand All @@ -49,13 +72,17 @@ func (b *ProduceResponseBlock) encode(pe packetEncoder, version int16) (err erro
pe.putInt64(timestamp)
}

if version >= 5 {
pe.putInt64(b.StartOffset)
}

return nil
}

type ProduceResponse struct {
Blocks map[string]map[int32]*ProduceResponseBlock
Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
Version int16
ThrottleTime time.Duration // only provided if Version >= 1
ThrottleTime time.Duration // v1, throttle_time_ms
}

func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -129,6 +156,7 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
}
}
}

if r.Version >= 1 {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
Expand All @@ -143,19 +171,6 @@ func (r *ProduceResponse) version() int16 {
return r.Version
}

func (r *ProduceResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_11_0_0
default:
return MinVersion
}
}

func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
if r.Blocks == nil {
return nil
Expand Down
41 changes: 32 additions & 9 deletions produce_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ var (
produceResponseNoBlocksV0 = []byte{
0x00, 0x00, 0x00, 0x00}

produceResponseManyBlocksVersions = [][]byte{
{
produceResponseManyBlocksVersions = map[int][]byte{
0: {
0x00, 0x00, 0x00, 0x01,

0x00, 0x03, 'f', 'o', 'o',
Expand All @@ -20,7 +20,9 @@ var (
0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
}, {
},

1: {
0x00, 0x00, 0x00, 0x01,

0x00, 0x03, 'f', 'o', 'o',
Expand All @@ -31,7 +33,8 @@ var (
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255

0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
}, {
},
2: {
0x00, 0x00, 0x00, 0x01,

0x00, 0x03, 'f', 'o', 'o',
Expand All @@ -42,6 +45,20 @@ var (
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)

0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
7: { // version 7 adds StartOffset
0x00, 0x00, 0x00, 0x01,

0x00, 0x03, 'f', 'o', 'o',
0x00, 0x00, 0x00, 0x01,

0x00, 0x00, 0x00, 0x01, // Partition 1
0x00, 0x02, // ErrInvalidMessage
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xFF, // Offset 255
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x03, 0xE8, // Timestamp January 1st 0001 at 00:00:01,000 UTC (LogAppendTime was used)
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x32, // StartOffset 50

0x00, 0x00, 0x00, 0x64, // 100 ms throttle time
},
}
Expand Down Expand Up @@ -69,14 +86,19 @@ func TestProduceResponseDecode(t *testing.T) {
t.Error("Decoding did not produce a block for foo/1")
} else {
if block.Err != ErrInvalidMessage {
t.Error("Decoding failed for foo/2/Err, got:", int16(block.Err))
t.Error("Decoding failed for foo/1/Err, got:", int16(block.Err))
}
if block.Offset != 255 {
t.Error("Decoding failed for foo/1/Offset, got:", block.Offset)
}
if v >= 2 {
if block.Timestamp != time.Unix(1, 0) {
t.Error("Decoding failed for foo/2/Timestamp, got:", block.Timestamp)
t.Error("Decoding failed for foo/1/Timestamp, got:", block.Timestamp)
}
}
if v >= 7 {
if block.StartOffset != 50 {
t.Error("Decoding failed for foo/1/StartOffset, got:", block.StartOffset)
}
}
}
Expand All @@ -95,9 +117,10 @@ func TestProduceResponseEncode(t *testing.T) {

response.Blocks["foo"] = make(map[int32]*ProduceResponseBlock)
response.Blocks["foo"][1] = &ProduceResponseBlock{
Err: ErrInvalidMessage,
Offset: 255,
Timestamp: time.Unix(1, 0),
Err: ErrInvalidMessage,
Offset: 255,
Timestamp: time.Unix(1, 0),
StartOffset: 50,
}
response.ThrottleTime = 100 * time.Millisecond
for v, produceResponseManyBlocks := range produceResponseManyBlocksVersions {
Expand Down
4 changes: 4 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
req.Version = 3
}

if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
req.Version = 7
}

for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
if req.Version >= 3 {
Expand Down

0 comments on commit 30e7094

Please sign in to comment.