Skip to content

Commit

Permalink
Fix messages waiting for empty topic (#60)
Browse files Browse the repository at this point in the history
  • Loading branch information
apetrovs authored Jun 15, 2020
1 parent b60f2a3 commit 5a0b23e
Showing 1 changed file with 2 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@ public void emit(FluxSink<ConsumerRecord<Bytes, Bytes>> sink) {
assignPartitions(consumer);
seekOffsets(consumer);
int pollsCount = 0;
while (!sink.isCancelled() || ++pollsCount > MAX_POLLS_COUNT) {
while (!sink.isCancelled() && ++pollsCount < MAX_POLLS_COUNT) {
ConsumerRecords<Bytes, Bytes> records = consumer.poll(POLL_TIMEOUT_MS);
log.info("{} records polled", records.count());
records.iterator()
.forEachRemaining(sink::next);
}
sink.complete();
} catch (Exception e) {
log.error("Error occurred while consuming records", e);
throw new RuntimeException(e);
Expand Down

0 comments on commit 5a0b23e

Please sign in to comment.