From e666501eeb737286da0f2806e8eb74cb5b73e46d Mon Sep 17 00:00:00 2001 From: Sergey Zimin Date: Wed, 16 Jan 2019 15:58:43 -0800 Subject: [PATCH] Tests for timestamps added --- consumer_test.go | 102 ++++++++++++++++++++++++++++++++++++++++++++++ fetch_response.go | 29 +++++++++---- message.go | 3 ++ record_batch.go | 3 ++ 4 files changed, 129 insertions(+), 8 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 4bd6629080..96f2aae633 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -985,6 +985,108 @@ func TestConsumerExpiryTicker(t *testing.T) { broker0.Close() } +func TestConsumerTimestamps(t *testing.T) { + now := time.Now().Truncate(time.Millisecond) + type testMessage struct { + key Encoder + value Encoder + offset int64 + timestamp time.Time + } + for _, d := range []struct { + kversion KafkaVersion + logAppendTime bool + messages []testMessage + expectedTimestamp []time.Time + }{ + {V0_8_2_0, false, []testMessage{ + {nil, testMsg, 1, now}, + {nil, testMsg, 2, now}, + }, []time.Time{{}, {}}}, + {V0_10_2_1, false, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, + {V0_10_2_1, true, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now, now}}, + {V0_11_0_0, false, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}}, + {V0_11_0_0, true, []testMessage{ + {nil, testMsg, 1, now.Add(time.Second)}, + {nil, testMsg, 2, now.Add(2 * time.Second)}, + }, []time.Time{now, now}}, + } { + var fr *FetchResponse + var offsetResponseVersion int16 + cfg := NewConfig() + cfg.Version = d.kversion + switch d.kversion { + default: + fr = &FetchResponse{} + for _, m := range d.messages { + fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0) + } + case V0_10_2_1: + offsetResponseVersion = 1 + fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now} + for _, m := range d.messages { + fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1) + } + case V0_11_0_0: + offsetResponseVersion = 1 + fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now} + for _, m := range d.messages { + fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp) + } + fr.SetLastOffsetDelta("my_topic", 0, 2) + fr.SetLastStableOffset("my_topic", 0, 2) + } + + broker0 := NewMockBroker(t, 0) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetVersion(offsetResponseVersion). + SetOffset("my_topic", 0, OffsetNewest, 1234). + SetOffset("my_topic", 0, OffsetOldest, 0), + "FetchRequest": NewMockSequence(fr), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, cfg) + if err != nil { + t.Fatal(err) + } + + consumer, err := master.ConsumePartition("my_topic", 0, 1) + if err != nil { + t.Fatal(err) + } + + for i, ts := range d.expectedTimestamp { + select { + case msg := <-consumer.Messages(): + assertMessageOffset(t, msg, int64(i)+1) + if msg.Timestamp != ts { + t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v", + d.kversion, d.logAppendTime, msg.Timestamp, ts) + } + case err := <-consumer.Errors(): + t.Fatal(err) + } + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() + } +} + func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) { if msg.Offset != expectedOffset { t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset) diff --git a/fetch_response.go b/fetch_response.go index 90acfc2802..9df99c17eb 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) } type FetchResponse struct { - Blocks map[string]map[int32]*FetchResponseBlock - ThrottleTime time.Duration - Version int16 // v1 requires 0.9+, v2 requires 0.10+ + Blocks map[string]map[int32]*FetchResponseBlock + ThrottleTime time.Duration + Version int16 // v1 requires 0.9+, v2 requires 0.10+ + LogAppendTime bool + Timestamp time.Time } func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { @@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) { return kb, vb } -func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { +func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) { frb := r.getOrCreateBlock(topic, partition) kb, vb := encodeKV(key, value) - msg := &Message{Key: kb, Value: vb} + if r.LogAppendTime { + timestamp = r.Timestamp + } + msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version} msgBlock := &MessageBlock{Msg: msg, Offset: offset} if len(frb.RecordsSet) == 0 { records := newLegacyRecords(&MessageSet{}) @@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc set.Messages = append(set.Messages, msgBlock) } -func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { +func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) { frb := r.getOrCreateBlock(topic, partition) kb, vb := encodeKV(key, value) - rec := &Record{Key: kb, Value: vb, OffsetDelta: offset} if len(frb.RecordsSet) == 0 { - records := newDefaultRecords(&RecordBatch{Version: 2}) + records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp}) frb.RecordsSet = []*Records{&records} } batch := frb.RecordsSet[0].RecordBatch + rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)} batch.addRecord(rec) } +func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { + r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0) +} + +func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { + r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{}) +} + func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) { frb := r.getOrCreateBlock(topic, partition) if len(frb.RecordsSet) == 0 { diff --git a/message.go b/message.go index 6981abf082..a31b67111c 100644 --- a/message.go +++ b/message.go @@ -55,6 +55,9 @@ func (m *Message) encode(pe packetEncoder) error { pe.putInt8(m.Version) attributes := int8(m.Codec) & compressionCodecMask + if m.LogAppendTime { + attributes |= timestampTypeMask + } pe.putInt8(attributes) if m.Version >= 1 { diff --git a/record_batch.go b/record_batch.go index 62e54239aa..a36f7e6296 100644 --- a/record_batch.go +++ b/record_batch.go @@ -202,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 { if b.Control { attr |= controlMask } + if b.LogAppendTime { + attr |= timestampTypeMask + } return attr }