Skip to content

Commit

Permalink
fix test / deprecations and surperflous graal metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
graemerocher committed Nov 18, 2022
1 parent cc55d46 commit 97d53e1
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import io.micronaut.management.health.indicator.HealthResult;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.TaskMetadata;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
import org.reactivestreams.Publisher;

import jakarta.inject.Singleton;
Expand All @@ -40,7 +40,6 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
Expand All @@ -57,6 +56,7 @@ public class KafkaStreamsHealth implements HealthIndicator {
public static final String ENABLED_PROPERTY = AbstractKafkaConfiguration.PREFIX + ".health.streams.enabled";

private static final String NAME = "kafkaStreams";
private static final String METADATA_PARTITIONS = "partitions";

private final KafkaStreamsFactory kafkaStreamsFactory;
private final HealthAggregator<?> healthAggregator;
Expand Down Expand Up @@ -130,7 +130,7 @@ private Map<String, Object> buildDetails(KafkaStreams kafkaStreams) {
final Map<String, Object> streamDetails = new HashMap<>();

if (kafkaStreams.state().isRunningOrRebalancing()) {
for (ThreadMetadata metadata : kafkaStreams.localThreadsMetadata()) {
for (org.apache.kafka.streams.ThreadMetadata metadata : kafkaStreams.metadataForLocalThreads()) {
final Map<String, Object> threadDetails = new HashMap<>();
threadDetails.put("threadName", metadata.threadName());
threadDetails.put("threadState", metadata.threadState());
Expand Down Expand Up @@ -179,7 +179,7 @@ private String getApplicationId(final KafkaStreams kafkaStreams) {
private static String getDefaultStreamName(final KafkaStreams kafkaStreams) {
return Optional.ofNullable(kafkaStreams)
.filter(kafkaStreams1 -> kafkaStreams1.state().isRunningOrRebalancing())
.map(KafkaStreams::localThreadsMetadata)
.map(KafkaStreams::metadataForLocalThreads)
.map(Collection::stream)
.flatMap(Stream::findFirst)
.map(ThreadMetadata::threadName)
Expand All @@ -196,12 +196,12 @@ private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadataSet
final Map<String, Object> details = new HashMap<>();
for (TaskMetadata taskMetadata : taskMetadataSet) {
details.put("taskId", taskMetadata.taskId());
if (details.containsKey("partitions")) {
if (details.containsKey(METADATA_PARTITIONS)) {
@SuppressWarnings("unchecked")
List<String> partitionsInfo = (List<String>) details.get("partitions");
List<String> partitionsInfo = (List<String>) details.get(METADATA_PARTITIONS);
partitionsInfo.addAll(addPartitionsInfo(taskMetadata));
} else {
details.put("partitions", addPartitionsInfo(taskMetadata));
details.put(METADATA_PARTITIONS, addPartitionsInfo(taskMetadata));
}
}
return details;
Expand All @@ -216,7 +216,7 @@ private static Map<String, Object> taskDetails(Set<TaskMetadata> taskMetadataSet
private static List<String> addPartitionsInfo(TaskMetadata metadata) {
return metadata.topicPartitions().stream()
.map(p -> "partition=" + p.partition() + ", topic=" + p.topic())
.collect(Collectors.toList());
.toList();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ class KafkaStreamsSpec extends AbstractTestContainersSpec {
def stream = context.getBean(KafkaStreams, Qualifiers.byName('my-stream'))

then:
stream.config.originals()['application.id'] == myStreamApplicationId
stream.config.originals()['generic.config'] == "hello"
stream.applicationConfigs.originals()['application.id'] == myStreamApplicationId
stream.applicationConfigs.originals()['generic.config'] == "hello"
}

void "test kafka stream application"() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.StickyAssignor;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.security.authenticator.AbstractLogin;
import org.apache.kafka.common.security.authenticator.DefaultLogin;
import org.apache.kafka.common.security.authenticator.SaslServerCallbackHandler;
import org.apache.kafka.common.security.plain.PlainLoginModule;
import org.apache.kafka.common.serialization.*;

import java.util.Optional;
Expand Down Expand Up @@ -67,11 +71,18 @@
FloatDeserializer.class,

// partitioners
DefaultPartitioner.class,
RoundRobinPartitioner.class,
// assigners
RangeAssignor.class,
RoundRobinAssignor.class,
StickyAssignor.class
StickyAssignor.class,

// authentication
DefaultLogin.class,
SaslServerCallbackHandler.class,
PlainLoginModule.class,
AbstractLogin.DefaultLoginCallbackHandler.class

})
public class KafkaConsumerFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package io.micronaut.configuration.kafka.annotation;

import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;

/**
* <p>An enum representing different strategies for committing offsets to Kafka when using {@link KafkaListener}.</p>
*
Expand Down Expand Up @@ -52,7 +56,7 @@ public enum OffsetStrategy {
ASYNC_PER_RECORD,
/**
* Only applicable for transactional processing in combination with {@link io.micronaut.messaging.annotation.SendTo}.
* Sends offsets to transaction using {@link org.apache.kafka.clients.producer.Producer#sendOffsetsToTransaction(java.util.Map, String)}
* Sends offsets to transaction using {@link org.apache.kafka.clients.producer.Producer#sendOffsetsToTransaction(Map, ConsumerGroupMetadata)}
*/
SEND_TO_TRANSACTION

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
Expand Down Expand Up @@ -794,7 +795,7 @@ record = new ProducerRecord(destinationTopic, null, key, value, consumerRecord.h
}
try {
LOG.trace("Sending offsets: {} to transaction for producer: {} and customer group id: {}", offsetsToCommit, consumerState.producerTransactionalId, consumerState.groupId);
kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, consumerState.groupId);
kafkaProducer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(consumerState.groupId));
LOG.trace("Committing transaction for producer: {}", consumerState.producerTransactionalId);
kafkaProducer.commitTransaction();
LOG.trace("Committed transaction for producer: {}", consumerState.producerTransactionalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,6 @@
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.clients.consumer.RangeAssignor",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.clients.producer.internals.DefaultPartitioner",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.common.security.authenticator.AbstractLogin$DefaultLoginCallbackHandler",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.common.security.authenticator.DefaultLogin",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.common.security.authenticator.SaslClientCallbackHandler",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"org.apache.kafka.common.security.plain.PlainLoginModule",
"allDeclaredMethods":true,
"allDeclaredConstructors":true
},
{
"name":"sun.security.pkcs.SignerInfo[]"
},
Expand Down

0 comments on commit 97d53e1

Please sign in to comment.