Skip to content

Commit

Permalink
Merge pull request #2814 from smallrye/dependabot/maven/main/kafka.ve…
Browse files Browse the repository at this point in the history
…rsion-3.9.0

Bump kafka.version from 3.7.1 to 3.9.0
  • Loading branch information
cescoffier authored Nov 20, 2024
2 parents 1b81d7e + 40d2510 commit b86af3b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 45 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@

<jboss-log-manager.version>3.0.6.Final</jboss-log-manager.version>

<kafka.version>3.7.1</kafka.version>
<kafka.version>3.9.0</kafka.version>

<opentelemetry.instrumentation.version>2.5.0-alpha</opentelemetry.instrumentation.version>
<opentelemetry-semconv.version>1.25.0-alpha</opentelemetry-semconv.version>
Expand Down
24 changes: 24 additions & 0 deletions smallrye-reactive-messaging-kafka-test-companion/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,30 @@
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-server</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-raft</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-group-coordinator</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-storage</artifactId>
<version>${kafka.version}</version>
<optional>true</optional>
</dependency>
<!-- Optional dependencies for usage in Tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import static io.smallrye.reactive.messaging.kafka.companion.test.EmbeddedKafkaBroker.LoggingOutputStream.loggerPrintStream;
import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
import static org.apache.kafka.server.common.MetadataVersion.MINIMUM_BOOTSTRAP_VERSION;

import java.io.ByteArrayOutputStream;
import java.io.Closeable;
Expand All @@ -17,6 +16,8 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -27,16 +28,20 @@
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.metadata.storage.Formatter;
import org.apache.kafka.network.SocketServerConfigs;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.config.KRaftConfigs;
import org.apache.kafka.server.config.ReplicationConfigs;
import org.apache.kafka.server.config.ServerConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.apache.kafka.storage.internals.log.CleanerConfig;
import org.jboss.logging.Logger;

import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaRaftServer;
import kafka.tools.StorageTool;
import scala.collection.immutable.Seq;
import scala.jdk.CollectionConverters;
import scala.jdk.javaapi.StreamConverters;

/**
Expand Down Expand Up @@ -194,7 +199,7 @@ public synchronized EmbeddedKafkaBroker start() {
brokerConfigModifier.accept(properties);
}

if (properties.get(KafkaConfig.LogDirProp()) == null) {
if (properties.get(ServerLogConfigs.LOG_DIR_CONFIG) == null) {
createAndSetlogDir(properties);
}

Expand Down Expand Up @@ -296,12 +301,12 @@ public static Endpoint parseEndpoint(String listenerStr) {
public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controller, Endpoint internalEndpoint,
List<Endpoint> advertisedListeners) {
Properties props = new Properties();
props.put(KafkaConfig.BrokerIdProp(), Integer.toString(nodeId));
props.put(ServerConfigs.BROKER_ID_CONFIG, Integer.toString(nodeId));

// Configure kraft
props.put(KafkaConfig.ProcessRolesProp(), "broker,controller");
props.put(KafkaConfig.ControllerListenerNamesProp(), listenerName(controller));
props.put(KafkaConfig.QuorumVotersProp(), nodeId + "@" + controller.host() + ":" + controller.port());
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller");
props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, listenerName(controller));
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, nodeId + "@" + controller.host() + ":" + controller.port());

// Configure listeners
Map<String, Endpoint> listeners = advertisedListeners.stream()
Expand All @@ -314,7 +319,7 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
.map(EmbeddedKafkaBroker::toListenerString)
.distinct()
.collect(Collectors.joining(","));
props.put(KafkaConfig.ListenersProp(), listenersString);
props.put(SocketServerConfigs.LISTENERS_CONFIG, listenersString);

// Find a PLAINTEXT listener
Endpoint plaintextEndpoint = advertisedListeners.stream()
Expand All @@ -327,53 +332,83 @@ public static Properties createDefaultBrokerConfig(int nodeId, Endpoint controll
.distinct()
.collect(Collectors.joining(","));
if (!Utils.isBlank(advertisedListenersString)) {
props.put(KafkaConfig.AdvertisedListenersProp(), advertisedListenersString);
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, advertisedListenersString);
}

// Configure security protocol map
String securityProtocolMap = listeners.values().stream()
.map(EmbeddedKafkaBroker::toProtocolMap)
.distinct()
.collect(Collectors.joining(","));
props.put(KafkaConfig.ListenerSecurityProtocolMapProp(), securityProtocolMap);
props.put(KafkaConfig.InterBrokerListenerNameProp(), listenerName(plaintextEndpoint));
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, securityProtocolMap);
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, listenerName(plaintextEndpoint));

// Configure static default props
props.put(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
props.put(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(), String.valueOf(Long.MAX_VALUE));
props.put(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
props.put(KafkaConfig.ControlledShutdownEnableProp(), Boolean.toString(false));
props.put(KafkaConfig.ControlledShutdownRetryBackoffMsProp(), "100");
props.put(KafkaConfig.DeleteTopicEnableProp(), Boolean.toString(true));
props.put(KafkaConfig.LogDeleteDelayMsProp(), "1000");
props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp(), "2097152");
props.put(KafkaConfig.LogMessageTimestampDifferenceMaxMsProp(), String.valueOf(Long.MAX_VALUE));
props.put(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
props.put(KafkaConfig.OffsetsTopicPartitionsProp(), "5");
props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp(), "0");
props.put(KafkaConfig.NumPartitionsProp(), "1");
props.put(KafkaConfig.DefaultReplicationFactorProp(), "1");
props.put(ReplicationConfigs.REPLICA_SOCKET_TIMEOUT_MS_CONFIG, "1000");
props.put(ReplicationConfigs.REPLICA_HIGH_WATERMARK_CHECKPOINT_INTERVAL_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
props.put(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, "1000");
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG, Boolean.toString(false));
props.put(ServerConfigs.CONTROLLED_SHUTDOWN_RETRY_BACKOFF_MS_CONFIG, "100");
props.put(ServerConfigs.DELETE_TOPIC_ENABLE_CONFIG, Boolean.toString(true));
props.put(ServerLogConfigs.LOG_DELETE_DELAY_MS_CONFIG, "1000");
props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097152");
props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_DIFFERENCE_MAX_MS_DOC, String.valueOf(Long.MAX_VALUE));
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
props.put(GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, "5");
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0");
props.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, "1");
props.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, "1");

return props;
}

public static KafkaConfig formatStorageFromConfig(Properties properties, String clusterId, boolean ignoreFormatted) {
KafkaConfig config = KafkaConfig.fromProps(properties, false);
Seq<String> directories = StorageTool.configToLogDirectories(config);
MetaProperties metaProperties = StorageTool.buildMetadataProperties(clusterId, config);
StorageTool.formatCommand(loggerPrintStream(LOGGER), directories, metaProperties, MINIMUM_BOOTSTRAP_VERSION,
ignoreFormatted);
Formatter formatter = new Formatter();
formatter.setClusterId(clusterId)
.setNodeId(config.nodeId())
.setUnstableFeatureVersionsEnabled(config.unstableFeatureVersionsEnabled())
.setIgnoreFormatted(ignoreFormatted)
.setControllerListenerName(config.controllerListenerNames().head())
.setMetadataLogDirectory(config.metadataLogDir());
configToLogDirectories(config).forEach(formatter::addDirectory);
try {
formatter.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
return config;
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
MetaProperties metaProperties = new MetaProperties.Builder()
.setVersion(MetaPropertiesVersion.V1)
.setClusterId(clusterId)
static Set<String> configToLogDirectories(KafkaConfig config) {
TreeSet<String> dirs = new TreeSet<>();
config.logDirs().foreach(dirs::add);
String metadataLogDir = config.metadataLogDir();
if (metadataLogDir != null) {
dirs.add(metadataLogDir);
}
return dirs;
}

public static void formatStorage(List<String> directories, String controllerListenerName,
String metadataLogDirectory,
String clusterId, int nodeId, boolean ignoreFormatted) {
Formatter formatter = new Formatter();
formatter.setClusterId(clusterId)
.setNodeId(nodeId)
.build();
Seq<String> dirs = CollectionConverters.ListHasAsScala(directories).asScala().toSeq();
StorageTool.formatCommand(loggerPrintStream(LOGGER), dirs, metaProperties, MINIMUM_BOOTSTRAP_VERSION, ignoreFormatted);
.setIgnoreFormatted(ignoreFormatted)
.setControllerListenerName(controllerListenerName)
.setMetadataLogDirectory(metadataLogDirectory);
directories.forEach(formatter::addDirectory);
try {
formatter.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public static void formatStorage(List<String> directories, String clusterId, int nodeId, boolean ignoreFormatted) {
formatStorage(directories, "CONTROLLER", directories.get(0), clusterId, nodeId, ignoreFormatted);
}

public static KafkaRaftServer createServer(final KafkaConfig config) {
Expand All @@ -383,7 +418,7 @@ public static KafkaRaftServer createServer(final KafkaConfig config) {
}

private static String getAdvertisedListeners(KafkaConfig config) {
return StreamConverters.asJavaParStream(config.effectiveAdvertisedListeners())
return StreamConverters.asJavaParStream(config.effectiveAdvertisedBrokerListeners())
.map(EndPoint::connectionString)
.collect(Collectors.joining(","));
}
Expand All @@ -406,7 +441,7 @@ private static int getUnusedPort(int port) {

private static void createAndSetlogDir(Properties properties) {
try {
properties.put(KafkaConfig.LogDirProp(),
properties.put(ServerLogConfigs.LOG_DIR_CONFIG,
Files.createTempDirectory(COMPANION_BROKER_PREFIX + "-" + UUID.randomUUID()).toString());
} catch (IOException e) {
throw new UncheckedIOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,13 @@
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.server.config.ServerLogConfigs;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import io.smallrye.reactive.messaging.kafka.companion.TestTags;
import kafka.server.KafkaConfig;

@Tag(TestTags.FLAKY)
public class EmbeddedKafkaTest {
Expand All @@ -42,7 +42,7 @@ void test() {
void testWithExistingLogDir(@TempDir File dir) {
EmbeddedKafkaBroker broker = new EmbeddedKafkaBroker()
.withNodeId(0)
.withAdditionalProperties(props -> props.put(KafkaConfig.LogDirProp(), dir.toPath().toString()))
.withAdditionalProperties(props -> props.put(ServerLogConfigs.LOG_DIR_CONFIG, dir.toPath().toString()))
.withDeleteLogDirsOnClose(false);

// format storage before starting
Expand Down

0 comments on commit b86af3b

Please sign in to comment.