Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.

NotifierKafka: fix case when partition is empty (avoid hitting timeout) #1352

Merged
merged 1 commit into from
Jun 21, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions mdata/notifierKafka/notifierKafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ func (c *NotifierKafka) start() {

// | scenario | offsetOldest | offsetNewest | offsetTime
// ------------------------------------------------------------------------------------------------------------------------------
// | new empty partition | error | 0 | error
// | new with messages | 0 | validOffset | validOffset or error if offsetTime is earlier than first message
// | existing with messages | validOffset | validOffset | validOffset
// | existing with no messages | error | validOffset | error
// | new empty partition | 0 | 0 | -1
// | new with messages | 0 | validOffset | validOffset
// | existing with messages | 0 | validOffset | validOffset
// | existing with no messages | 0 | 0 | -1
// ------------------------------------------------------------------------------------------------------------------------------

// getOffsetFor takes an offset string and a partition id, then it tries to get the according offset
Expand Down Expand Up @@ -197,7 +197,13 @@ func (c *NotifierKafka) consumePartition(topic string, partition int32, startOff
messages := pc.Messages()
ticker := time.NewTicker(5 * time.Second)

lastReadOffset := startOffset - 1
var lastReadOffset int64
if startOffset < 0 {
log.Infof("kafka-cluster: empty partition")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should probably mention which partition..

lastReadOffset = -1
} else {
lastReadOffset = startOffset - 1
}
lastAvailableOffsetAtStartup, err := c.getLastAvailableOffset(topic, partition)
if err != nil {
log.Fatalf("kafka-cluster: failed to get newest offset for topic %s part %d: %s", topic, partition, err)
Expand Down