diff --git a/pom.xml b/pom.xml index a702ea01b2..9ee9d863b2 100644 --- a/pom.xml +++ b/pom.xml @@ -88,7 +88,7 @@ <jboss-log-manager.version>3.0.6.Final</jboss-log-manager.version> - <kafka.version>3.7.1</kafka.version> + <kafka.version>3.9.0</kafka.version> <opentelemetry.instrumentation.version>2.5.0-alpha</opentelemetry.instrumentation.version> <opentelemetry-semconv.version>1.25.0-alpha</opentelemetry-semconv.version> diff --git a/smallrye-reactive-messaging-kafka-test-companion/pom.xml b/smallrye-reactive-messaging-kafka-test-companion/pom.xml index 5cfb71f839..76d462fea6 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/pom.xml +++ b/smallrye-reactive-messaging-kafka-test-companion/pom.xml @@ -59,6 +59,30 @@ <version>${kafka.version}</version> <optional>true</optional> </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-server</artifactId> + <version>${kafka.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-raft</artifactId> + <version>${kafka.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-group-coordinator</artifactId> + <version>${kafka.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka-storage</artifactId> + <version>${kafka.version}</version> + <optional>true</optional> + </dependency> <!-- Optional dependencies for usage in Tests --> <dependency> <groupId>org.junit.jupiter</groupId> diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java index 5134f6c60d..0b6e9657c7 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/main/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaBroker.java @@ -2,7 +2,6 @@ import static io.smallrye.reactive.messaging.kafka.companion.test.EmbeddedKafkaBroker.LoggingOutputStream.loggerPrintStream; import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT; -import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION; import java.io.ByteArrayOutputStream; import java.io.Closeable; @@ -17,6 +16,8 @@ import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; import java.util.UUID; import java.util.function.Consumer; import java.util.function.Function; @@ -27,16 +28,20 @@ import org.apache.kafka.common.security.auth.SecurityProtocol; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.metadata.properties.MetaProperties; -import org.apache.kafka.metadata.properties.MetaPropertiesVersion; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; +import org.apache.kafka.metadata.storage.Formatter; +import org.apache.kafka.network.SocketServerConfigs; +import org.apache.kafka.raft.QuorumConfig; +import org.apache.kafka.server.config.KRaftConfigs; +import org.apache.kafka.server.config.ReplicationConfigs; +import org.apache.kafka.server.config.ServerConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; +import org.apache.kafka.storage.internals.log.CleanerConfig; import org.jboss.logging.Logger; import kafka.cluster.EndPoint; import kafka.server.KafkaConfig; import kafka.server.KafkaRaftServer; -import kafka.tools.StorageTool; -import scala.collection.immutable.Seq; -import scala.jdk.CollectionConverters; import scala.jdk.javaapi.StreamConverters; /** @@ -194,7 +199,7 @@ public synchronized EmbeddedKafkaBroker start() { brokerConfigModifier.accept(properties); } - if (properties.get(KafkaConfig.LogDirProp()) == null) { + if (properties.get(ServerLogConfigs.LOG_DIR_CONFIG) == null) { createAndSetlogDir(properties); } @@ -296,12 +301,12 @@ public static Endpoint parseEndpoint(String listenerStr) { public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controller, Endpoint internalEndpoint, List<Endpoint> advertisedListeners) { Properties props = new Properties(); - props.put(KafkaConfig.BrokerIdProp(), Integer.toString(nodeId)); + props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(nodeId)); // Configure kraft - props.put(KafkaConfig.ProcessRolesProp(), "broker,controller"); - props.put(KafkaConfig.ControllerListenerNamesProp(), listenerName(controller)); - props.put(KafkaConfig.QuorumVotersProp(), nodeId + "@" + controller.host() + ":" + controller.port()); + props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller"); + props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, listenerName(controller)); + props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, nodeId + "@" + controller.host() + ":" + controller.port()); // Configure listeners Map<String, Endpoint> listeners = advertisedListeners.stream() @@ -314,7 +319,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll .map(EmbeddedKafkaBroker::toListenerString) .distinct() .collect(Collectors.joining(",")); - props.put(KafkaConfig.ListenersProp(), listenersString); + props.put(SocketServerConfigs.LISTENERS_CONFIG, listenersString); // Find a PLAINTEXT listener Endpoint plaintextEndpoint = advertisedListeners.stream() @@ -327,7 +332,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll .distinct() .collect(Collectors.joining(",")); if (!Utils.isBlank(advertisedListenersString)) { - props.put(KafkaConfig.AdvertisedListenersProp(), advertisedListenersString); + props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, advertisedListenersString); } // Configure security protocol map @@ -335,45 +340,75 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll .map(EmbeddedKafkaBroker::toProtocolMap) .distinct() .collect(Collectors.joining(",")); - props.put(KafkaConfig.ListenerSecurityProtocolMapProp(), securityProtocolMap); - props.put(KafkaConfig.InterBrokerListenerNameProp(), listenerName(plaintextEndpoint)); + props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, securityProtocolMap); + props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, listenerName(plaintextEndpoint)); // Configure static default props - props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000"); - props.put(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE)); - props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000"); - props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(false)); - props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100"); - props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(true)); - props.put(KafkaConfig.LogDeleteDelayMsProp(), "1000"); - props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152"); - props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), String.valueOf(Long.MAX_VALUE)); - props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1"); - props.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5"); - props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0"); - props.put(KafkaConfig.NumPartitionsProp(), "1"); - props.put(KafkaConfig.DefaultReplicationFactorProp(), "1"); + props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1000"); + props.put(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, String.valueOf(Long.MAX_VALUE)); + props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1000"); + props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(false)); + props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, "100"); + props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(true)); + props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000"); + props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152"); + props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC, String.valueOf(Long.MAX_VALUE)); + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5"); + props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0"); + props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1"); + props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1"); return props; } public static KafkaConfig formatStorageFromConfig(Properties properties, String clusterId, boolean ignoreFormatted) { KafkaConfig config = KafkaConfig.fromProps(properties, false); - Seq<String> directories = StorageTool.configToLogDirectories(config); - MetaProperties metaProperties = StorageTool.buildMetadataProperties(clusterId, config); - StorageTool.formatCommand(loggerPrintStream(LOGGER), directories, metaProperties, MINIMUM_BOOTSTRAP_VERSION, - ignoreFormatted); + Formatter formatter = new Formatter(); + formatter.setClusterId(clusterId) + .setNodeId(config.nodeId()) + .setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled()) + .setIgnoreFormatted(ignoreFormatted) + .setControllerListenerName(config.controllerListenerNames().head()) + .setMetadataLogDirectory(config.metadataLogDir()); + configToLogDirectories(config).forEach(formatter::addDirectory); + try { + formatter.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } return config; } - public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) { - MetaProperties metaProperties = new MetaProperties.Builder() - .setVersion(MetaPropertiesVersion.V1) - .setClusterId(clusterId) + static Set<String> configToLogDirectories(KafkaConfig config) { + TreeSet<String> dirs = new TreeSet<>(); + config.logDirs().foreach(dirs::add); + String metadataLogDir = config.metadataLogDir(); + if (metadataLogDir != null) { + dirs.add(metadataLogDir); + } + return dirs; + } + + public static void formatStorage(List<String> directories, String controllerListenerName, + String metadataLogDirectory, + String clusterId, int nodeId, boolean ignoreFormatted) { + Formatter formatter = new Formatter(); + formatter.setClusterId(clusterId) .setNodeId(nodeId) - .build(); - Seq<String> dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq(); - StorageTool.formatCommand(loggerPrintStream(LOGGER), dirs, metaProperties, MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted); + .setIgnoreFormatted(ignoreFormatted) + .setControllerListenerName(controllerListenerName) + .setMetadataLogDirectory(metadataLogDirectory); + directories.forEach(formatter::addDirectory); + try { + formatter.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) { + formatStorage(directories, "CONTROLLER", directories.get(0), clusterId, nodeId, ignoreFormatted); } public static KafkaRaftServer createServer(final KafkaConfig config) { @@ -383,7 +418,7 @@ public static KafkaRaftServer createServer(final KafkaConfig config) { } private static String getAdvertisedListeners(KafkaConfig config) { - return StreamConverters.asJavaParStream(config.effectiveAdvertisedListeners()) + return StreamConverters.asJavaParStream(config.effectiveAdvertisedBrokerListeners()) .map(EndPoint::connectionString) .collect(Collectors.joining(",")); } @@ -406,7 +441,7 @@ private static int getUnusedPort(int port) { private static void createAndSetlogDir(Properties properties) { try { - properties.put(KafkaConfig.LogDirProp(), + properties.put(ServerLogConfigs.LOG_DIR_CONFIG, Files.createTempDirectory(COMPANION_BROKER_PREFIX + "-" + UUID.randomUUID()).toString()); } catch (IOException e) { throw new UncheckedIOException(e); diff --git a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java index 9f135c621d..4953ee02fa 100644 --- a/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java +++ b/smallrye-reactive-messaging-kafka-test-companion/src/test/java/io/smallrye/reactive/messaging/kafka/companion/test/EmbeddedKafkaTest.java @@ -15,13 +15,13 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.Endpoint; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.server.config.ServerLogConfigs; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion; import io.smallrye.reactive.messaging.kafka.companion.TestTags; -import kafka.server.KafkaConfig; @Tag(TestTags.FLAKY) public class EmbeddedKafkaTest { @@ -42,7 +42,7 @@ void test() { void testWithExistingLogDir(@TempDir File dir) { EmbeddedKafkaBroker broker = new EmbeddedKafkaBroker() .withNodeId(0) - .withAdditionalProperties(props -> props.put(KafkaConfig.LogDirProp(), dir.toPath().toString())) + .withAdditionalProperties(props -> props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.toPath().toString())) .withDeleteLogDirsOnClose(false); // format storage before starting