-
Notifications
You must be signed in to change notification settings - Fork 107
fix how the kafka offsets get defined in the notifier #1350
fix how the kafka offsets get defined in the notifier #1350
Conversation
b3479e1
to
8a3411b
Compare
8a3411b
to
eb2da37
Compare
} | ||
if startOffset < 0 { | ||
// happens when OffsetOldest or an offsetDuration was used and there is no message in the partition | ||
startOffset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I confirmed, I don't think we want to set this to 0
because it would just result in an error from sarama: https://github.com/Shopify/sarama/blob/master/consumer.go#L397
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. This change makes sense. However we need to also update consumePartition()
a little. Variable lastReadOffset
has an incorrect value with that patch when startOffset
is negative. We need to have lastReadOffset
set to a value compatible with what getLastAvailableOffset()
returns when there is no message in the partition (which I think is -1).
This should be tested with an empty partition to make sure we don't hit the timeout unnecessarily.
Couple of other issues:
- it's not
offsetTime
that should be set tosarama.OffsetOldest
butstartOffset
. offsetTime
should be logged instead ofoffsetDuration
Finally, I think that if c.client.GetOffset
fails we probably want to log.Fatalf
anyway. If we don't do that now it will very likely happen a little later next time we do c.client.GetOffset
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like I had not clicked the 'submit review' yesterday...
We will first try to use the offset supplied by the configuration. If that doesn't work we will try all other offset possibilities, in priority from custom, oldest, and then newest. If we still can't find a valid offset we will just crash.
mdata/notifierKafka/notifierKafka.go
Outdated
processBacklog.Add(1) | ||
go c.consumePartition(topic, partition, startOffset, processBacklog) | ||
|
||
// in case we did not originally have a valid offset, we need to re-check here |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment is a bit confusing. For clarity i think you should check and fail if the offset is still not valid. eg.
// if we still dont have a valid offset, we can't proceed.
if !validOffset {
log.Fatalf("kafka-cluster: tried all fallbacks, could not find a valid offset for topic: %s using %s\n", topic, offsetStr)
}
processBacklog.Add(1)
go c.consumePartition(topic, partition, startOffset, processBacklog)
mdata/notifierKafka/notifierKafka.go
Outdated
var validOffset bool | ||
var offsetFromDuration int64 | ||
|
||
offsets := make([]offset, 0, 3) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would be easier to follow by using a map[string]offset, keyed by the offsetName.
eg
offsets := map[string]offset{
"oldest": {offsetTime: sarama.OffsetOldest},
"newest": {offsetTime: sarama.OffsetNewest},
"custom": {offsetTime: offsetFromDuration},
}
To iterate over them in order you can use
for _, name := range []string{"oldest", "newest", "custom"} {
tmpOffset, err := c.client.GetOffset(topic, partition, offsets[name].offsetTime)
offsets[name].offsetStart = tmpOffset
offsets[name].offsetError = err
}
I think this code might need a large comment block explaining how GetOffsets work and the different scenarios, eg new partition with no messages, partition that has messages but is new, partition that is old but has no messages, etc... Requesting offsetOldest or offsetTime will return the offset number of a message that exists in the log.
I am pretty sure this table is accurate, but we should do some simple testing to verify. |
Add block comment to describe scenarios Add more error messages
It's looking pretty good so far. We do need to verify the information in the various scenarios. |
Should we make this logic generic in some function, so it can be reused in the kafka input? |
mdata/notifierKafka/notifierKafka.go
Outdated
} | ||
|
||
// get all of the offsets | ||
for _, name := range []string{"newest", "custom", "oldest"} { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume in most cases there won't be an error when querying the offset from Kafka. So is it necessary to always query it 3 times per partition, even when there's no error? Wouldn't it be more efficient to only query the other possible offsets when the initial one returned with an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did think about that, but in this case I don't think it matters. In the grand scheme of things when we talk about consuming the partitions a few milliseconds or seconds won't make a difference AFAIK. I'm up for it though.
I tried to keep the logic as it is, while removing unnecessary calls to get offsets and duplicate code
I pushed another commit. I tried to keep the logic unmodified while removing unnecessary calls to get the offset from kafka and duplicate code |
Ok, so we are hitting timeouts again with that patch merged when there is no message in the partition: |
Possible fix: #1352 |
Today we saw an issue with an instance crashlooping due to an invalid partition offset. I wasn't able to reproduce the issue & verify this fix yet, but I think it would probably make sense to make this change anyway.
When the offset gets set to
0
as it previously was, doesn't that basically guarantee an error due to an invalid offset?