Skip to content

Commit

Permalink
Produce records with consistent timestamps (#1455)
Browse files Browse the repository at this point in the history
It is possible for the same record to have a different timestamp
depending on where it appears in a produceSet, as the test case in
this commit illustrates.

The problem is that the produceSet's FirstTimestamp and the
record's TimestampDelta are each calculated with nanosecond
precision, and then truncated to millisecond precision during
encoding. This leads to accumulated rounding error when the
original timestamp is later reconstructed.

Instead, truncate all timestamps to millisecond precision before
calculating the FirstTimestamp and TimestampDelta, so that if the
same record is produced multiple times, it will always have the
same timestamp.
  • Loading branch information
stevenjm authored and varun06 committed Dec 11, 2019
1 parent c82acaf commit d514254
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 3 deletions.
3 changes: 2 additions & 1 deletion produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
}

timestamp := msg.Timestamp
if msg.Timestamp.IsZero() {
if timestamp.IsZero() {
timestamp = time.Now()
}
timestamp = timestamp.Truncate(time.Millisecond)

partitions := ps.msgs[msg.Topic]
if partitions == nil {
Expand Down
61 changes: 59 additions & 2 deletions produce_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
}

batch := req.records["t1"][0].RecordBatch
if batch.FirstTimestamp != now {
if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
}
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -334,7 +334,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
}

batch := req.records["t1"][0].RecordBatch
if batch.FirstTimestamp != now {
if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
}
if batch.ProducerID != pID {
Expand Down Expand Up @@ -368,3 +368,60 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
}
}
}

func TestProduceSetConsistentTimestamps(t *testing.T) {
parent, ps1 := makeProduceSet()
ps2 := newProduceSet(parent)
parent.conf.Producer.RequiredAcks = WaitForAll
parent.conf.Producer.Timeout = 10 * time.Second
parent.conf.Version = V0_11_0_0

msg1 := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Timestamp: time.Unix(1555718400, 500000000),
sequenceNumber: 123,
}
msg2 := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Timestamp: time.Unix(1555718400, 500900000),
sequenceNumber: 123,
}
msg3 := &ProducerMessage{
Topic: "t1",
Partition: 0,
Key: StringEncoder(TestMessage),
Value: StringEncoder(TestMessage),
Timestamp: time.Unix(1555718400, 600000000),
sequenceNumber: 123,
}

safeAddMessage(t, ps1, msg1)
safeAddMessage(t, ps1, msg3)
req1 := ps1.buildRequest()
if req1.Version != 3 {
t.Error("Wrong request version")
}
batch1 := req1.records["t1"][0].RecordBatch
ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)

safeAddMessage(t, ps2, msg2)
safeAddMessage(t, ps2, msg3)
req2 := ps2.buildRequest()
if req2.Version != 3 {
t.Error("Wrong request version")
}
batch2 := req2.records["t1"][0].RecordBatch
ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)

if time1 != time2 {
t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
}
}

0 comments on commit d514254

Please sign in to comment.