Skip to content

Commit

Permalink
Fix or suppress other Sonar warnings
Browse files Browse the repository at this point in the history
  • Loading branch information
guillermocalvo committed Sep 19, 2023
1 parent 815780e commit 1188b37
Show file tree
Hide file tree
Showing 5 changed files with 223 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,12 @@ final class ConsumerInfo {
this.groupId = groupId;
this.shouldRedeliver = kafkaListener.isTrue("redelivery");
this.offsetStrategy = offsetStrategy;
final Optional<AnnotationValue<ErrorStrategy>> errorStrategy = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class);
this.errorStrategy = errorStrategy.map(a -> a.getRequiredValue(ErrorStrategyValue.class)).orElse(ErrorStrategyValue.NONE);
this.retryDelay = errorStrategy.flatMap(a -> a.get("retryDelay", Duration.class)).filter(d -> !d.isZero() && !d.isNegative()).orElse(null);
this.retryCount = errorStrategy.map(a -> a.intValue("retryCount").orElse(ErrorStrategy.DEFAULT_RETRY_COUNT)).orElse(0);
this.shouldHandleAllExceptions = errorStrategy.flatMap(a -> a.booleanValue("handleAllExceptions")).orElse(ErrorStrategy.DEFAULT_HANDLE_ALL_EXCEPTIONS);
this.exceptionTypes = Arrays.stream((Class<? extends Throwable>[]) errorStrategy.map(a -> a.classValues("exceptionTypes")).orElse(ReflectionUtils.EMPTY_CLASS_ARRAY)).toList();
final Optional<AnnotationValue<ErrorStrategy>> errorStrategyAnnotation = kafkaListener.getAnnotation("errorStrategy", ErrorStrategy.class);
this.errorStrategy = errorStrategyAnnotation.map(a -> a.getRequiredValue(ErrorStrategyValue.class)).orElse(ErrorStrategyValue.NONE);
this.retryDelay = errorStrategyAnnotation.flatMap(a -> a.get("retryDelay", Duration.class)).filter(d -> !d.isZero() && !d.isNegative()).orElse(null);
this.retryCount = errorStrategyAnnotation.map(a -> a.intValue("retryCount").orElse(ErrorStrategy.DEFAULT_RETRY_COUNT)).orElse(0);
this.shouldHandleAllExceptions = errorStrategyAnnotation.flatMap(a -> a.booleanValue("handleAllExceptions")).orElse(ErrorStrategy.DEFAULT_HANDLE_ALL_EXCEPTIONS);
this.exceptionTypes = Arrays.stream((Class<? extends Throwable>[]) errorStrategyAnnotation.map(a -> a.classValues("exceptionTypes")).orElse(ReflectionUtils.EMPTY_CLASS_ARRAY)).toList();
this.producerClientId = kafkaListener.stringValue("producerClientId").orElse(null);
this.producerTransactionalId = kafkaListener.stringValue("producerTransactionalId").filter(StringUtils::isNotEmpty).orElse(null);
this.isTransactional = producerTransactionalId != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.micronaut.core.async.publisher.Publishers;
import io.micronaut.core.type.Argument;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -40,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;

import java.nio.charset.StandardCharsets;
Expand All @@ -54,14 +56,15 @@
@Internal
final class ConsumerState {

@SuppressWarnings("java:S3416") // Using this logger because ConsumerState used to be an internal class
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerProcessor.class);

private final KafkaConsumerProcessor kafkaConsumerProcessor;
private final ConsumerInfo info;
private final Consumer<?, ?> kafkaConsumer;
final ConsumerInfo info;
final Consumer<?, ?> kafkaConsumer;
private final Object consumerBean;
private final Set<String> subscriptions;
private Set<TopicPartition> assignments;
final Set<String> subscriptions;
Set<TopicPartition> assignments;
private Set<TopicPartition> pausedTopicPartitions;
private Set<TopicPartition> pauseRequests;
@Nullable
Expand Down Expand Up @@ -119,26 +122,10 @@ synchronized boolean isPaused(@NonNull Collection<TopicPartition> topicPartition
return pauseRequests.containsAll(topicPartitions) && pausedTopicPartitions.containsAll(topicPartitions);
}

String getClientId() {
return info.clientId;
}

Consumer getKafkaConsumer() {
return kafkaConsumer;
}

boolean isPolling() {
return closedState == ConsumerCloseState.POLLING;
}

Set<String> getSubscriptions() {
return subscriptions;
}

Set<TopicPartition> getAssignments() {
return assignments;
}

void threadPollLoop() {
try (kafkaConsumer) {
//noinspection InfiniteLoopStatement
Expand Down Expand Up @@ -327,7 +314,6 @@ private void processIndividually(final ConsumerRecords<?, ?> consumerRecords) {
}

if (info.trackPartitions) {
assert currentOffsets != null;
final TopicPartition topicPartition = new TopicPartition(consumerRecord.topic(), consumerRecord.partition());
final OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(consumerRecord.offset() + 1, null);
currentOffsets.put(topicPartition, offsetAndMetadata);
Expand Down Expand Up @@ -390,99 +376,106 @@ private void handleResultFlux(
}

private Publisher<RecordMetadata> sendToDestination(Object value, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
if (!info.sendToTopics.isEmpty()) {
final Object key = consumerRecord.key();
if (value != null) {
final Producer<?, ?> kafkaProducer;
if (info.shouldSendOffsetsToTransaction) {
kafkaProducer = kafkaConsumerProcessor.getTransactionalProducer(
info.producerClientId,
info.producerTransactionalId,
byte[].class,
Object.class
);
if (value == null || info.sendToTopics.isEmpty()) {
return Flux.empty();
}
final Object key = consumerRecord.key();
final Producer<?, ?> kafkaProducer;
if (info.shouldSendOffsetsToTransaction) {
kafkaProducer = kafkaConsumerProcessor.getTransactionalProducer(
info.producerClientId,
info.producerTransactionalId,
byte[].class,
Object.class
);
} else {
kafkaProducer = kafkaConsumerProcessor.getProducer(
Optional.ofNullable(info.producerClientId).orElse(info.groupId),
(Class<?>) (key != null ? key.getClass() : byte[].class),
value.getClass()
);
}
return Flux.create(emitter -> sendToDestination(emitter, kafkaProducer, key, value, consumerRecord, consumerRecords));
}

private void sendToDestination(FluxSink<RecordMetadata> emitter, Producer<?, ?> kafkaProducer, Object key, Object value, ConsumerRecord<?, ?> consumerRecord, ConsumerRecords<?, ?> consumerRecords) {
try {
if (info.shouldSendOffsetsToTransaction) {
beginTransaction(kafkaProducer);
}
sendToDestination(kafkaProducer, new FluxCallback(emitter), key, value, consumerRecord);
if (info.shouldSendOffsetsToTransaction) {
endTransaction(kafkaProducer, consumerRecords);
}
emitter.complete();
} catch (Exception e) {
if (info.shouldSendOffsetsToTransaction) {
abortTransaction(kafkaProducer, e);
}
emitter.error(e);
}
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private void sendToDestination(Producer<?, ?> kafkaProducer, Callback callback, Object key, Object value, ConsumerRecord<?, ?> consumerRecord) {
for (String destinationTopic : info.sendToTopics) {
if (info.returnsManyKafkaMessages) {
final Iterable<KafkaMessage> messages = (Iterable<KafkaMessage>) value;
for (KafkaMessage message : messages) {
final ProducerRecord producerRecord = createFromMessage(destinationTopic, message);
kafkaProducer.send(producerRecord, callback);
}
} else {
final ProducerRecord producerRecord;
if (info.returnsOneKafkaMessage) {
producerRecord = createFromMessage(destinationTopic, (KafkaMessage) value);
} else {
kafkaProducer = kafkaConsumerProcessor.getProducer(
Optional.ofNullable(info.producerClientId).orElse(info.groupId),
(Class<?>) (key != null ? key.getClass() : byte[].class),
value.getClass()
);
producerRecord = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.headers());
}
return Flux.create(emitter -> {
try {
if (info.shouldSendOffsetsToTransaction) {
try {
LOG.trace("Beginning transaction for producer: {}", info.producerTransactionalId);
kafkaProducer.beginTransaction();
} catch (ProducerFencedException e) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
}
}
for (String destinationTopic : info.sendToTopics) {
if (info.returnsManyKafkaMessages) {
Iterable<KafkaMessage> messages = (Iterable<KafkaMessage>) value;
for (KafkaMessage message : messages) {
ProducerRecord record = createFromMessage(destinationTopic, message);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
emitter.error(exception);
} else {
emitter.next(metadata);
}
});
}
} else {
ProducerRecord record;
if (info.returnsOneKafkaMessage) {
record = createFromMessage(destinationTopic, (KafkaMessage) value);
} else {
record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.headers());
}
LOG.trace("Sending record: {} for producer: {} {}", record, kafkaProducer, info.producerTransactionalId);
kafkaProducer.send(record, (metadata, exception) -> {
if (exception != null) {
emitter.error(exception);
} else {
emitter.next(metadata);
}
});
}
}
if (info.shouldSendOffsetsToTransaction) {
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<? extends ConsumerRecord<?, ?>> partitionedRecords = consumerRecords.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}
try {
final String producerTransactionalId = info.producerTransactionalId;
LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, producerTransactionalId, info.groupId);
kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(info.groupId));
LOG.trace("Committing transaction for producer: {}", producerTransactionalId);
kafkaProducer.commitTransaction();
LOG.trace("Committed transaction for producer: {}", producerTransactionalId);
} catch (ProducerFencedException e) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
}
}
emitter.complete();
} catch (Exception e) {
if (info.shouldSendOffsetsToTransaction) {
try {
LOG.trace("Aborting transaction for producer: {} because of error: {}", info.producerTransactionalId, e.getMessage());
kafkaProducer.abortTransaction();
} catch (ProducerFencedException ex) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, ex);
}
}
emitter.error(e);
}
});
LOG.trace("Sending record: {} for producer: {} {}", producerRecord, kafkaProducer, info.producerTransactionalId);
kafkaProducer.send(producerRecord, callback);
}
return Flux.empty();
}
return Flux.empty();
}

private void beginTransaction(Producer<?, ?> kafkaProducer) {
try {
LOG.trace("Beginning transaction for producer: {}", info.producerTransactionalId);
kafkaProducer.beginTransaction();
} catch (ProducerFencedException e) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
}
}

private void endTransaction(Producer<?, ?> kafkaProducer, ConsumerRecords<?, ?> consumerRecords) {
final Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
for (TopicPartition partition : consumerRecords.partitions()) {
List<? extends ConsumerRecord<?, ?>> partitionedRecords = consumerRecords.records(partition);
long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset();
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
}
sendOffsetsToTransaction(kafkaProducer, offsetsToCommit);
}

private void abortTransaction(Producer<?, ?> kafkaProducer, Exception e) {
try {
LOG.trace("Aborting transaction for producer: {} because of error: {}", info.producerTransactionalId, e.getMessage());
kafkaProducer.abortTransaction();
} catch (ProducerFencedException ex) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, ex);
}
}

private void sendOffsetsToTransaction(Producer<?, ?> kafkaProducer, Map<TopicPartition, OffsetAndMetadata> offsetsToCommit) {
try {
LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, info.producerTransactionalId, info.groupId);
kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(info.groupId));
LOG.trace("Committing transaction for producer: {}", info.producerTransactionalId);
kafkaProducer.commitTransaction();
LOG.trace("Committed transaction for producer: {}", info.producerTransactionalId);
} catch (ProducerFencedException e) {
kafkaConsumerProcessor.handleProducerFencedException(kafkaProducer, e);
}
}

private Publisher<RecordMetadata> handleSendToError(Throwable error, ConsumerRecord<?, ?> consumerRecord) {
Expand All @@ -496,6 +489,7 @@ private Publisher<RecordMetadata> handleSendToError(Throwable error, ConsumerRec
.doOnError(ex -> handleException("Redelivery failed for record [" + consumerRecord + "] with Kafka reactive consumer [" + info.method + "]: " + error.getMessage(), ex, consumerRecord));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private Mono<RecordMetadata> redeliver(ConsumerRecord<?, ?> consumerRecord) {
final Object key = consumerRecord.key();
final Object value = consumerRecord.value();
Expand All @@ -515,13 +509,7 @@ private Mono<RecordMetadata> redeliver(ConsumerRecord<?, ?> consumerRecord) {
final ProducerRecord producerRecord = new ProducerRecord(consumerRecord.topic(), consumerRecord.partition(), key, value, consumerRecord.headers());

LOG.trace("Sending record: {} for producer: {} {}", producerRecord, kafkaProducer, info.producerTransactionalId);
return Mono.create(emitter -> kafkaProducer.send(producerRecord, (metadata, exception) -> {
if (exception != null) {
emitter.error(exception);
} else {
emitter.success(metadata);
}
}));
return Mono.create(emitter -> kafkaProducer.send(producerRecord, new MonoCallback(emitter)));
}

private boolean resolveWithErrorStrategy(ConsumerRecord<?, ?> consumerRecord, Throwable e) {
Expand Down Expand Up @@ -614,6 +602,7 @@ private OffsetCommitCallback resolveCommitCallback() {
};
}

@SuppressWarnings({ "rawtypes", "unchecked" })
private static ProducerRecord createFromMessage(String topic, KafkaMessage<?, ?> message) {
return new ProducerRecord(
Optional.ofNullable(message.getTopic()).orElse(topic),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright 2017-2020 original authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.micronaut.configuration.kafka.processor;

import io.micronaut.core.annotation.Internal;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import reactor.core.publisher.FluxSink;

@Internal
final class FluxCallback implements Callback {

private final FluxSink<RecordMetadata> emitter;

FluxCallback(FluxSink<RecordMetadata> emitter) {
this.emitter = emitter;
}

@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
emitter.error(exception);
} else {
emitter.next(metadata);
}
}
}
Loading

0 comments on commit 1188b37

Please sign in to comment.