Skip to content

Commit

Permalink
Merge pull request #1258 from zimin2000/zimin2000.logappend.time
Browse files Browse the repository at this point in the history
Support LogAppend timestamps
  • Loading branch information
bai authored Feb 6, 2019
2 parents e775ee1 + e1eda41 commit a6c1f7e
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 13 deletions.
15 changes: 13 additions & 2 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,9 +487,13 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
for _, msgBlock := range msgSet.Messages {
for _, msg := range msgBlock.Messages() {
offset := msg.Offset
timestamp := msg.Msg.Timestamp
if msg.Msg.Version >= 1 {
baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
offset += baseOffset
if msg.Msg.LogAppendTime {
timestamp = msgBlock.Msg.Timestamp
}
}
if offset < child.offset {
continue
Expand All @@ -500,7 +504,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
Key: msg.Msg.Key,
Value: msg.Msg.Value,
Offset: offset,
Timestamp: msg.Msg.Timestamp,
Timestamp: timestamp,
BlockTimestamp: msgBlock.Msg.Timestamp,
})
child.offset = offset + 1
Expand All @@ -519,13 +523,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
if offset < child.offset {
continue
}
timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
if batch.LogAppendTime {
timestamp = batch.MaxTimestamp
}
messages = append(messages, &ConsumerMessage{
Topic: child.topic,
Partition: child.partition,
Key: rec.Key,
Value: rec.Value,
Offset: offset,
Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
Timestamp: timestamp,
Headers: rec.Headers,
})
child.offset = offset + 1
Expand Down Expand Up @@ -787,6 +795,9 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
}
if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
request.Version = 1
}
if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 2
}
Expand Down
117 changes: 117 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,6 +985,123 @@ func TestConsumerExpiryTicker(t *testing.T) {
broker0.Close()
}

func TestConsumerTimestamps(t *testing.T) {
now := time.Now().Truncate(time.Millisecond)
type testMessage struct {
key Encoder
value Encoder
offset int64
timestamp time.Time
}
for _, d := range []struct {
kversion KafkaVersion
logAppendTime bool
messages []testMessage
expectedTimestamp []time.Time
}{
{MinVersion, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_9_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_0_0, false, []testMessage{
{nil, testMsg, 1, now},
{nil, testMsg, 2, now},
}, []time.Time{{}, {}}},
{V0_10_2_1, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_10_2_1, true, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
{V0_11_0_0, false, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now.Add(time.Second), now.Add(2 * time.Second)}},
{V0_11_0_0, true, []testMessage{
{nil, testMsg, 1, now.Add(time.Second)},
{nil, testMsg, 2, now.Add(2 * time.Second)},
}, []time.Time{now, now}},
} {
var fr *FetchResponse
var offsetResponseVersion int16
cfg := NewConfig()
cfg.Version = d.kversion
switch {
case d.kversion.IsAtLeast(V0_11_0_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 4, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddRecordWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp)
}
fr.SetLastOffsetDelta("my_topic", 0, 2)
fr.SetLastStableOffset("my_topic", 0, 2)
case d.kversion.IsAtLeast(V0_10_1_0):
offsetResponseVersion = 1
fr = &FetchResponse{Version: 3, LogAppendTime: d.logAppendTime, Timestamp: now}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 1)
}
default:
var version int16
switch {
case d.kversion.IsAtLeast(V0_10_0_0):
version = 2
case d.kversion.IsAtLeast(V0_9_0_0):
version = 1
}
fr = &FetchResponse{Version: version}
for _, m := range d.messages {
fr.AddMessageWithTimestamp("my_topic", 0, m.key, testMsg, m.offset, m.timestamp, 0)
}
}

broker0 := NewMockBroker(t, 0)
broker0.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetBroker(broker0.Addr(), broker0.BrokerID()).
SetLeader("my_topic", 0, broker0.BrokerID()),
"OffsetRequest": NewMockOffsetResponse(t).
SetVersion(offsetResponseVersion).
SetOffset("my_topic", 0, OffsetNewest, 1234).
SetOffset("my_topic", 0, OffsetOldest, 0),
"FetchRequest": NewMockSequence(fr),
})

master, err := NewConsumer([]string{broker0.Addr()}, cfg)
if err != nil {
t.Fatal(err)
}

consumer, err := master.ConsumePartition("my_topic", 0, 1)
if err != nil {
t.Fatal(err)
}

for i, ts := range d.expectedTimestamp {
select {
case msg := <-consumer.Messages():
assertMessageOffset(t, msg, int64(i)+1)
if msg.Timestamp != ts {
t.Errorf("Wrong timestamp (kversion:%v, logAppendTime:%v): got: %v, want: %v",
d.kversion, d.logAppendTime, msg.Timestamp, ts)
}
case err := <-consumer.Errors():
t.Fatal(err)
}
}

safeClose(t, consumer)
safeClose(t, master)
broker0.Close()
}
}

func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
if msg.Offset != expectedOffset {
t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)
Expand Down
29 changes: 21 additions & 8 deletions fetch_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
}

type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
Version int16 // v1 requires 0.9+, v2 requires 0.10+
LogAppendTime bool
Timestamp time.Time
}

func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
Expand Down Expand Up @@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) {
return kb, vb
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
msg := &Message{Key: kb, Value: vb}
if r.LogAppendTime {
timestamp = r.Timestamp
}
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
msgBlock := &MessageBlock{Msg: msg, Offset: offset}
if len(frb.RecordsSet) == 0 {
records := newLegacyRecords(&MessageSet{})
Expand All @@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc
set.Messages = append(set.Messages, msgBlock)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
frb := r.getOrCreateBlock(topic, partition)
kb, vb := encodeKV(key, value)
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
if len(frb.RecordsSet) == 0 {
records := newDefaultRecords(&RecordBatch{Version: 2})
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
frb.RecordsSet = []*Records{&records}
}
batch := frb.RecordsSet[0].RecordBatch
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
batch.addRecord(rec)
}

func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
}

func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
}

func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
frb := r.getOrCreateBlock(topic, partition)
if len(frb.RecordsSet) == 0 {
Expand Down
14 changes: 11 additions & 3 deletions message.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"time"
)

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

// The lowest 3 bits contain the compression codec used for the message
const compressionCodecMask int8 = 0x07

// Bit 3 set for "LogAppend" timestamps
const timestampTypeMask = 0x08

// CompressionCodec represents the various compression codecs recognized by Kafka in messages.
type CompressionCodec int8

const (
CompressionNone CompressionCodec = 0
CompressionGZIP CompressionCodec = 1
Expand All @@ -36,6 +39,7 @@ const CompressionLevelDefault = -1000
type Message struct {
Codec CompressionCodec // codec used to compress the message contents
CompressionLevel int // compression level
LogAppendTime bool // the used timestamp is LogAppendTime
Key []byte // the message key, may be nil
Value []byte // the message contents
Set *MessageSet // the message set a message might wrap
Expand All @@ -52,6 +56,9 @@ func (m *Message) encode(pe packetEncoder) error {
pe.putInt8(m.Version)

attributes := int8(m.Codec) & compressionCodecMask
if m.LogAppendTime {
attributes |= timestampTypeMask
}
pe.putInt8(attributes)

if m.Version >= 1 {
Expand Down Expand Up @@ -108,6 +115,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
return err
}
m.Codec = CompressionCodec(attribute & compressionCodecMask)
m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask

if m.Version == 1 {
if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
Expand Down
5 changes: 5 additions & 0 deletions record_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type RecordBatch struct {
Codec CompressionCodec
CompressionLevel int
Control bool
LogAppendTime bool
LastOffsetDelta int32
FirstTimestamp time.Time
MaxTimestamp time.Time
Expand Down Expand Up @@ -120,6 +121,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
}
b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
b.Control = attributes&controlMask == controlMask
b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask

if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
return err
Expand Down Expand Up @@ -200,6 +202,9 @@ func (b *RecordBatch) computeAttributes() int16 {
if b.Control {
attr |= controlMask
}
if b.LogAppendTime {
attr |= timestampTypeMask
}
return attr
}

Expand Down

0 comments on commit a6c1f7e

Please sign in to comment.