Skip to content

Commit

Permalink
test(KafkaEmitter): Enable ability to run test locally (#6123)
Browse files Browse the repository at this point in the history
  • Loading branch information
david-leifker authored Oct 6, 2022
1 parent 3f84a20 commit 3dc25c1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import static java.util.Collections.singletonList;

import java.io.IOException;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.stream.Stream;
Expand All @@ -12,10 +14,11 @@
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.Before;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.TimeoutException;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.containers.Network;
import org.testcontainers.lifecycle.Startables;
import org.testng.Assert;

import com.linkedin.dataset.DatasetProperties;
Expand All @@ -37,27 +40,30 @@ public class KafkaEmitterTest {
private static ZookeeperContainer zookeeperContainer;
private static KafkaContainer kafkaContainer;
private static SchemaRegistryContainer schemaRegistryContainer;
private KafkaEmitterConfig config;
private KafkaEmitter emitter;
private static KafkaEmitterConfig config;
private static KafkaEmitter emitter;

@SuppressWarnings("resource")
@Before
public void confluentSetup() throws Exception {
@BeforeClass
public static void confluentSetup() throws Exception {
network = Network.newNetwork();
zookeeperContainer = new ZookeeperContainer().withNetwork(network);
kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl()).withNetwork(network);
schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl()).withNetwork(network);
Startables.deepStart(Stream.of(zookeeperContainer, kafkaContainer, schemaRegistryContainer)).join();

createKafkaEmitter();
createTopics();
kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl())
.withNetwork(network)
.dependsOn(zookeeperContainer);
schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl())
.withNetwork(network)
.dependsOn(zookeeperContainer, kafkaContainer);
schemaRegistryContainer.start();

String bootstrap = createTopics(kafkaContainer.getBootstrapServers());
createKafkaEmitter(bootstrap);
registerSchemaRegistryTypes();

}

public void createKafkaEmitter() throws IOException {
public static void createKafkaEmitter(String bootstrap) throws IOException {
KafkaEmitterConfig.KafkaEmitterConfigBuilder builder = KafkaEmitterConfig.builder();
builder.bootstrap(kafkaContainer.getBootstrapServers());
builder.bootstrap(bootstrap);
builder.schemaRegistryUrl(schemaRegistryContainer.getUrl());
config = builder.build();
emitter = new KafkaEmitter(config);
Expand All @@ -80,8 +86,11 @@ public void testSend() throws IOException, InterruptedException, ExecutionExcept
Assert.assertTrue(response.isSuccess());
}

private AdminClient createAdminClient() {
return KafkaAdminClient.create(emitter.getKafkaConfgiProperties());
private static AdminClient createAdminClient(String bootstrap) {
// Fail fast
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap);
return KafkaAdminClient.create(props);
}

private static void registerSchemaRegistryTypes() throws IOException, RestClientException {
Expand All @@ -90,11 +99,17 @@ private static void registerSchemaRegistryTypes() throws IOException, RestClient
schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema);
}

private void createTopics() throws InterruptedException, ExecutionException {
AdminClient adminClient = createAdminClient();
private static String createTopics(Stream<String> bootstraps) {
short replicationFactor = 1;
int partitions = 1;
adminClient.createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstraps.parallel().map(bootstrap -> {
try {
createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get();
return bootstrap;
} catch (TimeoutException | InterruptedException | ExecutionException ex) {
return null;
}
}).filter(Objects::nonNull).findFirst().get();
}

@SuppressWarnings("rawtypes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.github.dockerjava.api.command.InspectContainerResponse;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.TestcontainersConfiguration;

Expand All @@ -20,6 +21,7 @@ public class KafkaContainer extends GenericContainer<KafkaContainer> {
private static final String STARTER_SCRIPT = "/testcontainers_start.sh";

private static final int KAFKA_INTERNAL_PORT = 9092;
private static final int KAFKA_LOCAL_PORT = 9093;

public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092;

Expand All @@ -39,15 +41,17 @@ public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect)
super(getKafkaContainerImage(confluentPlatformVersion));

this.zookeeperConnect = zookeeperConnect;
withExposedPorts(KAFKA_INTERNAL_PORT);
withExposedPorts(KAFKA_INTERNAL_PORT, KAFKA_LOCAL_PORT);

// Use two listeners with different names, it will force Kafka to communicate
// with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener
withEnv("KAFKA_LISTENERS",
"PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT");
"PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT
+ ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT
+ ",BROKER_LOCAL://0.0.0.0:" + KAFKA_LOCAL_PORT);
withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER_LOCAL:PLAINTEXT");
withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER");

withEnv("KAFKA_BROKER_ID", "1");
Expand All @@ -57,14 +61,15 @@ public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect)
withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0");

withNetworkAliases(networkAlias);

waitingFor(new HostPortWaitStrategy());
}

public String getBootstrapServers() {
public Stream<String> getBootstrapServers() {
if (port == PORT_NOT_ASSIGNED) {
throw new IllegalStateException("You should start Kafka container first");
}
return String.format("PLAINTEXT://%s:%s", getHost(), port);
return Stream.of(String.format("PLAINTEXT://%s:%s", getHost(), port),
String.format("PLAINTEXT://localhost:%s", getMappedPort(KAFKA_LOCAL_PORT)));
}

@Override
Expand All @@ -84,10 +89,16 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole
return;
}

// Use two listeners with different names, it will force Kafka to communicate
// with itself via internal
// listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will
// try to use the advertised listener

String command = "#!/bin/bash \n";
command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n";
command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream
.concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT),
.concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT,
"BROKER_LOCAL://localhost:" + getMappedPort(KAFKA_LOCAL_PORT)),
containerInfo.getNetworkSettings().getNetworks().values().stream()
.map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT))
.collect(Collectors.joining(",")) + "'\n";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import static java.lang.String.format;

import java.io.IOException;
import java.time.Duration;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.utility.TestcontainersConfiguration;


Expand All @@ -27,7 +29,7 @@ public SchemaRegistryContainer(String confluentPlatformVersion, String zookeeper
withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT);
withNetworkAliases(networkAlias);

waitingFor(Wait.forHttp("/subjects"));
waitingFor(new HttpWaitStrategy().forPath("/subjects").withStartupTimeout(Duration.ofMinutes(2)));
}

public String getUrl() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package datahub.client.kafka.containers;

import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy;
import org.testcontainers.utility.TestcontainersConfiguration;

import java.io.IOException;
Expand Down Expand Up @@ -30,6 +31,7 @@ public ZookeeperContainer(String confluentPlatformVersion) throws IOException {

addExposedPort(ZOOKEEPER_INTERNAL_PORT);
withNetworkAliases(networkAlias);
waitingFor(new HostPortWaitStrategy());
}

public String getInternalUrl() {
Expand Down

0 comments on commit 3dc25c1

Please sign in to comment.