-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support LogAppend timestamps #1258
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -186,9 +186,11 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) | |
} | ||
|
||
type FetchResponse struct { | ||
Blocks map[string]map[int32]*FetchResponseBlock | ||
ThrottleTime time.Duration | ||
Version int16 // v1 requires 0.9+, v2 requires 0.10+ | ||
Blocks map[string]map[int32]*FetchResponseBlock | ||
ThrottleTime time.Duration | ||
Version int16 // v1 requires 0.9+, v2 requires 0.10+ | ||
LogAppendTime bool | ||
Timestamp time.Time | ||
} | ||
|
||
func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) { | ||
|
@@ -355,10 +357,13 @@ func encodeKV(key, value Encoder) ([]byte, []byte) { | |
return kb, vb | ||
} | ||
|
||
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { | ||
func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This and below are exported function and this change is going to be a breaking change. Let's see how we can handle them. @bai do you have an idea? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You'll need to keep both exported functions, and make one delegate to the other! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @varun06 , I think I did preserve the original API call reimplementing the original function using this one (https://github.com/Shopify/sarama/pull/1258/files/e1eda41894f763094e70b74f6dbf958c4cbbd315#diff-be942b2617e64413fb251f24e4bd4ea2R388). Same for Am I missing something ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This name change gonna break for folks who are using exported method "AddMessage". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The “AddMessage” method is still there (check the link I provided above). Also note that there are multiple tests that would have got broken otherwise. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Apologies, Do we have AddRecord method also? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, “AddRecord” also kept. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool Bean. Let's get it going then. @bai can we merge this please? |
||
frb := r.getOrCreateBlock(topic, partition) | ||
kb, vb := encodeKV(key, value) | ||
msg := &Message{Key: kb, Value: vb} | ||
if r.LogAppendTime { | ||
timestamp = r.Timestamp | ||
} | ||
msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version} | ||
msgBlock := &MessageBlock{Msg: msg, Offset: offset} | ||
if len(frb.RecordsSet) == 0 { | ||
records := newLegacyRecords(&MessageSet{}) | ||
|
@@ -368,18 +373,26 @@ func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Enc | |
set.Messages = append(set.Messages, msgBlock) | ||
} | ||
|
||
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { | ||
func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) { | ||
frb := r.getOrCreateBlock(topic, partition) | ||
kb, vb := encodeKV(key, value) | ||
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset} | ||
if len(frb.RecordsSet) == 0 { | ||
records := newDefaultRecords(&RecordBatch{Version: 2}) | ||
records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp}) | ||
frb.RecordsSet = []*Records{&records} | ||
} | ||
batch := frb.RecordsSet[0].RecordBatch | ||
rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)} | ||
batch.addRecord(rec) | ||
} | ||
|
||
func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) { | ||
r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0) | ||
} | ||
|
||
func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) { | ||
r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{}) | ||
} | ||
|
||
func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) { | ||
frb := r.getOrCreateBlock(topic, partition) | ||
if len(frb.RecordsSet) == 0 { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you include other supported versions here?
https://github.com/bai/sarama/blob/master/utils.go#L163
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have adjusted the test to cover all versions for which format of fetch is different (from https://github.com/bai/sarama/blob/0a21d90df4f6266fdf28d603e5ef91f2426c362a/fetch_request.go#L141)