Skip to content

Commit

Permalink
Remove use of Optional as method parameter.
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyg484 committed Apr 20, 2023
1 parent 04925ed commit 13e43b3
Showing 1 changed file with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ private void createConsumerThreadPollLoop(final ExecutableMethod<?, ?> method,
}

if (isBatch) {
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, ackArg, consumerRecords);
failed = !processConsumerRecordsAsBatch(consumerState, method, boundArguments, ackArg.orElse(null), consumerRecords);
} else {
failed = !processConsumerRecords(consumerState, method, boundArguments, trackPartitions, ackArg, consumerRecords);
}
Expand Down Expand Up @@ -645,17 +645,17 @@ private Duration computeRetryDelay(ErrorStrategyValue errorStrategy, Duration fi
private boolean processConsumerRecordsAsBatch(final ConsumerState consumerState,
final ExecutableMethod<?, ?> method,
final Map<Argument<?>, Object> boundArguments,
final Optional<Argument<?>> ackArg,
@Nullable final Argument<?> ackArg,
final ConsumerRecords<?, ?> consumerRecords) {
ackArg.ifPresent(argument -> {
if (ackArg != null) {
Map<TopicPartition, OffsetAndMetadata> batchOffsets = new HashMap<>();
for (ConsumerRecord<?, ?> consumerRecord : consumerRecords) {
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null);
batchOffsets.put(topicPartition, offsetAndMetadata);
}
boundArguments.put(argument, (KafkaAcknowledgement) () -> consumerState.kafkaConsumer.commitSync(batchOffsets));
});
boundArguments.put(ackArg, (KafkaAcknowledgement) () -> consumerState.kafkaConsumer.commitSync(batchOffsets));
};

final ExecutableBinder<ConsumerRecords<?, ?>> batchBinder = new DefaultExecutableBinder<>(boundArguments);
final BoundExecutable boundExecutable = batchBinder.bind(method, batchBinderRegistry, consumerRecords);
Expand Down

0 comments on commit 13e43b3

Please sign in to comment.