Skip to content

Commit

Permalink
Consumer timestamp test adjusted
Browse files Browse the repository at this point in the history
  • Loading branch information
Sergey Zimin committed Jan 22, 2019
1 parent e666501 commit ca55d05
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
3 changes: 3 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
request.Version = 1
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
Expand Down
39 changes: 27 additions & 12 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,7 +999,15 @@ func TestConsumerTimestamps(t *testing.T) {
messages []testMessage
expectedTimestamp []time.Time
}{
{V0_8_2_0, false, []testMessage{
{MinVersion, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_9_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
Expand All @@ -1024,26 +1032,33 @@ func TestConsumerTimestamps(t *testing.T) {
var offsetResponseVersion int16
cfg := NewConfig()
cfg.Version = d.kversion
switch d.kversion {
default:
fr = &FetchResponse{}
switch {
case d.kversion.IsAtLeast(V0_11_0_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
case V0_10_2_1:
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
case d.kversion.IsAtLeast(V0_10_1_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
}
case V0_11_0_0:
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
default:
var version int16
switch {
case d.kversion.IsAtLeast(V0_10_0_0):
version = 2
case d.kversion.IsAtLeast(V0_9_0_0):
version = 1
}
fr = &FetchResponse{Version: version}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
}
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
}

broker0 := NewMockBroker(t, 0)
Expand Down
11 changes: 6 additions & 5 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"time"
)

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

// The lowest 3 bits contain the compression codec used for the message
const compressionCodecMask int8 = 0x07

// Bit 3 set for "LogAppend" timestamps
const timestampTypeMask = 0x08

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

const (
CompressionNone CompressionCodec = 0
CompressionGZIP CompressionCodec = 1
Expand All @@ -19,8 +22,6 @@ const (
CompressionZSTD CompressionCodec = 4
)

const timestampTypeMask = 0x08

func (cc CompressionCodec) String() string {
return []string{
"none",
Expand Down

0 comments on commit ca55d05

Please sign in to comment.