Skip to content

Commit c470b97

Browse files
authoredFeb 9, 2023
fix(topicdata): topic data access slow on sort newest (#1371)
- Adding stop condition in newest sort to prevent extra polling - Fixing wrong number of records with pagination close #192
1 parent 9d199bd commit c470b97

File tree

1 file changed

+10
-2
lines changed

1 file changed

+10
-2
lines changed
 

‎src/main/java/org/akhq/repositories/RecordRepository.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,12 @@ private List<Record> consumeNewest(Topic topic, Options options) {
324324
filterMessageLength(current);
325325
list.add(current);
326326
}
327+
328+
// End of the partition, we can stop here
329+
if (record.offset() == topicPartitionOffset.getEnd()) {
330+
emptyPoll = 1;
331+
break;
332+
}
327333
}
328334
}
329335
while (emptyPoll < 1);
@@ -394,8 +400,10 @@ private Optional<OffsetBound> getFirstOffsetForSortOldest(KafkaConsumer<byte[],
394400
private Optional<EndOffsetBound> getOffsetForSortNewest(KafkaConsumer<byte[], byte[]> consumer, Partition partition, Options options, int pollSizePerPartition) {
395401
return getFirstOffset(consumer, partition, options)
396402
.map(first -> {
397-
long last = partition.getLastOffset();
403+
// Take end offset - 1 to get the last record offset
404+
long last = partition.getLastOffset() - 1;
398405

406+
// If there is an after parameter in the request use this one
399407
if (pollSizePerPartition > 0 && options.after.containsKey(partition.getId())) {
400408
last = options.after.get(partition.getId()) - 1;
401409
}
@@ -404,7 +412,7 @@ private Optional<EndOffsetBound> getOffsetForSortNewest(KafkaConsumer<byte[], by
404412
consumer.close();
405413
return null;
406414
} else if (!(last - pollSizePerPartition < first)) {
407-
first = last - pollSizePerPartition;
415+
first = last - pollSizePerPartition + 1;
408416
}
409417

410418
return EndOffsetBound.builder()

0 commit comments

Comments
 (0)