Skip to content
This repository was archived by the owner on Aug 23, 2023. It is now read-only.
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 985f69a

Browse files
committedMar 23, 2018
fixes according to comments
1 parent d924876 commit 985f69a

File tree

1 file changed

+28
-18
lines changed

1 file changed

+28
-18
lines changed
 

‎kafka/consumer.go

+28-18
Original file line numberDiff line numberDiff line change
@@ -303,12 +303,12 @@ func (c *Consumer) startConsumer() error {
303303
var currentOffset int64
304304
switch c.conf.StartAtOffset {
305305
case "oldest":
306-
currentOffset, _, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetBeginning), 3, time.Second)
306+
currentOffset, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetBeginning), 3, time.Second)
307307
if err != nil {
308308
return err
309309
}
310310
case "newest":
311-
_, currentOffset, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetEnd), 3, time.Second)
311+
currentOffset, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetEnd), 3, time.Second)
312312
if err != nil {
313313
return err
314314
}
@@ -318,9 +318,13 @@ func (c *Consumer) startConsumer() error {
318318
return fmt.Errorf("invalid offest format %s: %s", c.conf.StartAtOffset, err)
319319
}
320320
currentOffset = time.Now().Add(-1*offsetDuration).UnixNano() / int64(time.Millisecond)
321-
currentOffset, _, err = c.tryGetOffset(topic, partition, currentOffset, 3, time.Second)
321+
currentOffset, err = c.tryGetOffset(topic, partition, currentOffset, 3, time.Second)
322322
if err != nil {
323-
return err
323+
log.Warn("kafka-consumer: Failed to get specified offset %s, falling back to \"oldest\"", c.conf.StartAtOffset)
324+
currentOffset, err = c.tryGetOffset(topic, partition, int64(confluent.OffsetBeginning), 3, time.Second)
325+
if err != nil {
326+
return err
327+
}
324328
}
325329
}
326330

@@ -344,44 +348,50 @@ func (c *Consumer) startConsumer() error {
344348
return c.consumer.Assign(topicPartitions)
345349
}
346350

347-
func (c *Consumer) tryGetOffset(topic string, partition int32, offsetI int64, attempts int, sleep time.Duration) (int64, int64, error) {
351+
func (c *Consumer) tryGetOffset(topic string, partition int32, offsetI int64, attempts int, sleep time.Duration) (int64, error) {
348352
offset, err := confluent.NewOffset(offsetI)
349353
if err != nil {
350-
return 0, 0, err
354+
return 0, err
351355
}
352356

353-
var val1, val2 int64
357+
var beginning, end int64
354358

355359
attempt := 1
356360
for {
357361
if offset == confluent.OffsetBeginning || offset == confluent.OffsetEnd {
358-
val1, val2, err = c.consumer.QueryWatermarkOffsets(topic, partition, c.conf.MetadataTimeout)
362+
beginning, end, err = c.consumer.QueryWatermarkOffsets(topic, partition, c.conf.MetadataTimeout)
363+
if err == nil {
364+
if offset == confluent.OffsetBeginning {
365+
return beginning, nil
366+
} else {
367+
return end, nil
368+
}
369+
}
359370
} else {
360371
times := []confluent.TopicPartition{{Topic: &topic, Partition: partition, Offset: offset}}
361372
times, err = c.consumer.OffsetsForTimes(times, c.conf.MetadataTimeout)
362-
if err == nil {
363-
if len(times) == 0 {
364-
err = fmt.Errorf("Got 0 topics returned from broker")
373+
if err != nil || len(times) == 0 {
374+
if err == nil {
375+
err = fmt.Errorf("Failed to get offset %d from kafka, falling back to \"oldest\"", offset)
365376
} else {
366-
val1 = int64(times[0].Offset)
377+
err = fmt.Errorf("Failed to get offset %d from kafka, falling back to \"oldest\": %s", offset, err)
367378
}
379+
offset = confluent.OffsetBeginning
380+
} else {
381+
return int64(times[0].Offset), nil
368382
}
369383
}
370384

371-
if err == nil {
372-
return val1, val2, err
373-
}
374-
375385
if attempt >= attempts {
376386
break
377387
}
378388

379-
log.Warn("kafka-consumer: Error when qerying offsets, %d retries left: %s", attempts-attempt, err)
389+
log.Warn("kafka-consumer: Error when querying offsets, %d retries left: %s", attempts-attempt, err)
380390
attempt += 1
381391
time.Sleep(sleep)
382392
}
383393

384-
return 0, 0, fmt.Errorf("Failed to get offset %s of partition %s:%d. %s (attempt %d/%d)", offset.String(), topic, partition, err, attempt, attempts)
394+
return 0, fmt.Errorf("Failed to get offset %s of partition %s:%d. %s (attempt %d/%d)", offset.String(), topic, partition, err, attempt, attempts)
385395
}
386396

387397
func (c *Consumer) Stop() {

0 commit comments

Comments
 (0)
This repository has been archived.