Skip to content

Commit

Permalink
Merge pull request #90 from robobario/add-auto-configure-flag
Browse files Browse the repository at this point in the history
Add autoConfigure option so user can disable broker autoconfiguration
  • Loading branch information
ozangunalp authored Sep 22, 2023
2 parents 2a21837 + f363a14 commit e5feba8
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 32 deletions.
27 changes: 14 additions & 13 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,20 @@ By default, the `kafka-server` application configures an embedded Kafka Kraft se

Following configuration options are available:

| Key | Description | Default |
|-------------------------------|-----------------------------------------------------------|--------------------|
| `server.kafka-port` | External listener port | 9092 |
| `server.internal-port` | Internal listener port | 9093 |
| `server.controller-port` | Controller listener port | 9094 |
| `server.delete-dirs-on-close` | Whether to delete `log-dir` on application close | false |
| `server.host` | Hostname of listeners | `` (empty string) |
| `server.cluster-id` | Provide `cluster-id`, generated if empty | |
| `server.properties-file` | Path to `server.properties` file | |
| `kafka.log.dir` | Path to `log-dir` directory, will create the directory if | `./target/log-dir` |
| `kafka.advertised.listeners` | Override `advertised.listeners` | |
| `kafka.zookeeper.connect` | When configured the kafka broker starts in zookeeper mode | `` |
| `kafka.*` | Override broker properties | |
| Key | Description | Default |
|-------------------------------|-------------------------------------------------------------------------------------------|--------------------|
| `server.kafka-port` | External listener port | 9092 |
| `server.internal-port` | Internal listener port | 9093 |
| `server.controller-port` | Controller listener port | 9094 |
| `server.delete-dirs-on-close` | Whether to delete `log-dir` on application close | false |
| `server.host` | Hostname of listeners | `` (empty string) |
| `server.cluster-id` | Provide `cluster-id`, generated if empty | |
| `server.properties-file` | Path to `server.properties` file | |
| `server.auto-configure ` | Automatically configure server properties, if false only `server.properties` is respected | true |
| `kafka.log.dir` | Path to `log-dir` directory, will create the directory if | `./target/log-dir` |
| `kafka.advertised.listeners` | Override `advertised.listeners` | |
| `kafka.zookeeper.connect` | When configured the kafka broker starts in zookeeper mode | `` |
| `kafka.*` | Override broker properties | |


You can set configuration options using Java system properties, e.g.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class KafkaNativeContainer extends GenericContainer<KafkaNativeContainer>
private String additionalArgs = null;
private int exposedPort = -1;
private Function<GenericContainer<?>, Consumer<OutputFrame>> outputFrameConsumer;
private boolean autoConfigure = true;

public static DockerImageName imageName(String version) {
return DockerImageName.parse(DEFAULT_REPOSITORY + ":" + version);
Expand Down Expand Up @@ -57,13 +58,19 @@ public KafkaNativeContainer withPort(int fixedPort) {
return self();
}

public KafkaNativeContainer withServerProperties(MountableFile serverPropertiesFile) {
public KafkaNativeContainer withServerProperties(Transferable transferable) {
assertNotRunning();
super.withCopyFileToContainer(serverPropertiesFile, SERVER_PROPERTIES);
super.withCopyToContainer(transferable, SERVER_PROPERTIES);
this.hasServerProperties = true;
return self();
}

public KafkaNativeContainer withAutoConfigure(boolean autoConfigure) {
assertNotRunning();
this.autoConfigure = autoConfigure;
return self();
}

public KafkaNativeContainer withAdvertisedListeners(final Function<KafkaNativeContainer, String> provider) {
assertNotRunning();
this.advertisedListenersProvider = provider;
Expand Down Expand Up @@ -96,6 +103,7 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
if (hasServerProperties) {
cmd += " -Dserver.properties-file=" + SERVER_PROPERTIES;
}
cmd += " -Dserver.auto-configure=" + autoConfigure;
if (additionalArgs != null) {
cmd += " " + additionalArgs;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
package com.ozangunalp.kafka.test.container;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.io.File;
import java.lang.reflect.Method;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import com.ozangunalp.kafka.server.Endpoints;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Uuid;
Expand All @@ -21,11 +12,27 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
import org.testcontainers.containers.Network;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.MountableFile;

import com.ozangunalp.kafka.server.Endpoints;
import io.smallrye.reactive.messaging.kafka.companion.KafkaCompanion;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Method;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static java.util.regex.Pattern.quote;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class KafkaNativeContainerIT {

Expand All @@ -48,7 +55,7 @@ private KafkaNativeContainer createKafkaNativeContainer(String containerName) {
public void initTopic(TestInfo testInfo) {
String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString());
String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString());
testOutputName = String.format("%s.%s", testInfo.getDisplayName().replaceAll("\\(\\)$", ""),
testOutputName = String.format("%s.%s", testInfo.getDisplayName().replaceAll("\\(\\)$", ""),
OffsetDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH.mm.ss")));
topic = cn + "-" + mn + "-" + UUID.randomUUID().getMostSignificantBits();
}
Expand Down Expand Up @@ -237,6 +244,67 @@ void testKraftClusterBothControllers() throws Exception {
}
}

@Test
void testKraftClusterWithOneControllerOnlyNode() throws Exception {
String clusterId = Uuid.randomUuid().toString();
String brokerController = "broker-controller";
String controllerOnly = "controller";
String quorumVotes = String.format("1@%s:9094,2@%s:9094", brokerController, controllerOnly);

try (var network = Network.newNetwork();
var b1 = createKafkaNativeContainer(brokerController);
var b2 = createKafkaNativeContainer(controllerOnly)) {

b1.addEnv("SERVER_CLUSTER_ID", clusterId);
b1.addEnv("KAFKA_CONTROLLER_QUORUM_VOTERS", quorumVotes);
b1.withNetworkAliases(brokerController);
b1.withNetwork(network);
b1.addEnv("SERVER_HOST", brokerController);
b1.addEnv("KAFKA_BROKER_ID", "1");

b2.addEnv("SERVER_CLUSTER_ID", clusterId);
b2.withNetworkAliases(controllerOnly);
b2.withNetwork(network);
b2.withAutoConfigure(false);
Transferable controllerProps = controllerOnlyProperties(quorumVotes, "2");
b2.withServerProperties(controllerProps);

Startables.deepStart(b1, b2).get(30, TimeUnit.SECONDS);

verifyClusterMembers(b1, Map.of(), 1);
checkProduceConsume(b1);
}
}

private Transferable controllerOnlyProperties(String quorumVotes, String nodeId) {
Properties properties = new Properties();
properties.put("process.roles", "controller");
properties.put("node.id", nodeId);
properties.put("controller.quorum.voters", quorumVotes);
properties.put("listeners", "CONTROLLER://:9094");
properties.put("controller.listener.names", "CONTROLLER");
properties.put("num.network.threads", "1");
properties.put("num.io.threads", "1");
properties.put("socket.send.buffer.bytes", "102400");
properties.put("socket.receive.buffer.bytes", "102400");
properties.put("socket.request.max.bytes", "104857600");
properties.put("log.dirs", "/tmp/kraft-controller-logs");
properties.put("num.partitions", "1");
properties.put("num.num.recovery.threads.per.data.dir", "1");
properties.put("offsets.topic.replication.factor", "1");
properties.put("transaction.state.log.replication.factor", "1");
properties.put("transaction.state.log.min.isr", "1");
properties.put("log.retention.hours", "168");
properties.put("log.segment.bytes", "1073741824");
properties.put("log.retention.check.interval.ms", "300000");
try (StringWriter writer = new StringWriter()) {
properties.store(writer, null);
return Transferable.of(writer.toString());
} catch (IOException e) {
throw new RuntimeException(e);
}
}

@Test
void testKraftClusterOneController() throws Exception {
String clusterId = Uuid.randomUuid().toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class EmbeddedKafkaBroker implements Closeable {
private String clusterId = Uuid.randomUuid().toString();
private final Properties brokerConfig = new Properties();
public SecurityProtocol defaultProtocol = PLAINTEXT;
private boolean autoConfigure = true;

/**
* Configure properties for the broker.
Expand All @@ -59,6 +60,19 @@ public EmbeddedKafkaBroker withConfig(Consumer<Properties> function) {
return this;
}

/**
* Automatically configure broker for embedded testing, exposing relevant listeners, configuring broker to run
* in KRaft mode if required, tuning timeouts. See {@link BrokerConfig} for details. Disabling autoConfigure should
* be used in combination with user supplied configuration.
*
* @param autoConfigure autoConfigure
* @return this {@link EmbeddedKafkaBroker}
*/
public EmbeddedKafkaBroker withAutoConfigure(boolean autoConfigure) {
this.autoConfigure = autoConfigure;
return this;
}

/**
* Configure the port on which the broker will listen.
*
Expand Down Expand Up @@ -172,9 +186,12 @@ public synchronized EmbeddedKafkaBroker start() {
return this;
}

BrokerConfig.providedConfig(brokerConfig);
BrokerConfig.defaultStaticConfig(brokerConfig);
BrokerConfig.defaultCoreConfig(brokerConfig, host, kafkaPort, internalPort, controllerPort, defaultProtocol);
if (autoConfigure) {
LOGGER.info("auto-configuring server");
BrokerConfig.providedConfig(brokerConfig);
BrokerConfig.defaultStaticConfig(brokerConfig);
BrokerConfig.defaultCoreConfig(brokerConfig, host, kafkaPort, internalPort, controllerPort, defaultProtocol);
}

Storage.ensureLogDirExists(brokerConfig);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,7 @@ public interface ServerConfig {

Optional<Path> propertiesFile();

@WithDefault("true")
boolean autoConfigure();

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ void startup(@Observes StartupEvent event) {
.withControllerPort(config.controllerPort())
.withInternalPort(config.internalPort())
.withKafkaHost(config.host().orElse(""))
.withAutoConfigure(config.autoConfigure())
.withConfig(properties -> {
properties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, Reporter.class.getName());
config.propertiesFile().ifPresent(Unchecked.consumer(file ->
Expand Down

0 comments on commit e5feba8

Please sign in to comment.