From 3dc25c14e027bc38b5fa05bc140e5f5cf4ca2655 Mon Sep 17 00:00:00 2001 From: david-leifker <114954101+david-leifker@users.noreply.github.com> Date: Thu, 6 Oct 2022 15:34:11 -0500 Subject: [PATCH] test(KafkaEmitter): Enable ability to run test locally (#6123) --- .../client/kafka/KafkaEmitterTest.java | 55 ++++++++++++------- .../kafka/containers/KafkaContainer.java | 25 ++++++--- .../containers/SchemaRegistryContainer.java | 6 +- .../kafka/containers/ZookeeperContainer.java | 2 + 4 files changed, 59 insertions(+), 29 deletions(-) diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java index 2645b114f97655..8940d951bfc10a 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/KafkaEmitterTest.java @@ -4,6 +4,8 @@ import static java.util.Collections.singletonList; import java.io.IOException; +import java.util.Objects; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.stream.Stream; @@ -12,10 +14,11 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.KafkaAdminClient; import org.apache.kafka.clients.admin.NewTopic; -import org.junit.Before; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.errors.TimeoutException; +import org.junit.BeforeClass; import org.junit.Test; import org.testcontainers.containers.Network; -import org.testcontainers.lifecycle.Startables; import org.testng.Assert; import com.linkedin.dataset.DatasetProperties; @@ -37,27 +40,30 @@ public class KafkaEmitterTest { private static ZookeeperContainer zookeeperContainer; private static KafkaContainer kafkaContainer; private static SchemaRegistryContainer schemaRegistryContainer; - private KafkaEmitterConfig config; - private KafkaEmitter emitter; + private static KafkaEmitterConfig config; + private static KafkaEmitter emitter; @SuppressWarnings("resource") - @Before - public void confluentSetup() throws Exception { + @BeforeClass + public static void confluentSetup() throws Exception { network = Network.newNetwork(); zookeeperContainer = new ZookeeperContainer().withNetwork(network); - kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl()).withNetwork(network); - schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl()).withNetwork(network); - Startables.deepStart(Stream.of(zookeeperContainer, kafkaContainer, schemaRegistryContainer)).join(); - - createKafkaEmitter(); - createTopics(); + kafkaContainer = new KafkaContainer(zookeeperContainer.getInternalUrl()) + .withNetwork(network) + .dependsOn(zookeeperContainer); + schemaRegistryContainer = new SchemaRegistryContainer(zookeeperContainer.getInternalUrl()) + .withNetwork(network) + .dependsOn(zookeeperContainer, kafkaContainer); + schemaRegistryContainer.start(); + + String bootstrap = createTopics(kafkaContainer.getBootstrapServers()); + createKafkaEmitter(bootstrap); registerSchemaRegistryTypes(); - } - public void createKafkaEmitter() throws IOException { + public static void createKafkaEmitter(String bootstrap) throws IOException { KafkaEmitterConfig.KafkaEmitterConfigBuilder builder = KafkaEmitterConfig.builder(); - builder.bootstrap(kafkaContainer.getBootstrapServers()); + builder.bootstrap(bootstrap); builder.schemaRegistryUrl(schemaRegistryContainer.getUrl()); config = builder.build(); emitter = new KafkaEmitter(config); @@ -80,8 +86,11 @@ public void testSend() throws IOException, InterruptedException, ExecutionExcept Assert.assertTrue(response.isSuccess()); } - private AdminClient createAdminClient() { - return KafkaAdminClient.create(emitter.getKafkaConfgiProperties()); + private static AdminClient createAdminClient(String bootstrap) { + // Fail fast + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrap); + return KafkaAdminClient.create(props); } private static void registerSchemaRegistryTypes() throws IOException, RestClientException { @@ -90,11 +99,17 @@ private static void registerSchemaRegistryTypes() throws IOException, RestClient schemaRegistryClient.register(mcpSchema.getFullName(), mcpSchema); } - private void createTopics() throws InterruptedException, ExecutionException { - AdminClient adminClient = createAdminClient(); + private static String createTopics(Stream bootstraps) { short replicationFactor = 1; int partitions = 1; - adminClient.createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get(); + return bootstraps.parallel().map(bootstrap -> { + try { + createAdminClient(bootstrap).createTopics(singletonList(new NewTopic(TOPIC, partitions, replicationFactor))).all().get(); + return bootstrap; + } catch (TimeoutException | InterruptedException | ExecutionException ex) { + return null; + } + }).filter(Objects::nonNull).findFirst().get(); } @SuppressWarnings("rawtypes") diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java index 971782d2b7aa09..47d5162db9c819 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/KafkaContainer.java @@ -2,6 +2,7 @@ import com.github.dockerjava.api.command.InspectContainerResponse; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.images.builder.Transferable; import org.testcontainers.utility.TestcontainersConfiguration; @@ -20,6 +21,7 @@ public class KafkaContainer extends GenericContainer { private static final String STARTER_SCRIPT = "/testcontainers_start.sh"; private static final int KAFKA_INTERNAL_PORT = 9092; + private static final int KAFKA_LOCAL_PORT = 9093; public static final int KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT = 29092; @@ -39,15 +41,17 @@ public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect) super(getKafkaContainerImage(confluentPlatformVersion)); this.zookeeperConnect = zookeeperConnect; - withExposedPorts(KAFKA_INTERNAL_PORT); + withExposedPorts(KAFKA_INTERNAL_PORT, KAFKA_LOCAL_PORT); // Use two listeners with different names, it will force Kafka to communicate // with itself via internal // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will // try to use the advertised listener withEnv("KAFKA_LISTENERS", - "PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT); - withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT"); + "PLAINTEXT://0.0.0.0:" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT + + ",BROKER://0.0.0.0:" + KAFKA_INTERNAL_PORT + + ",BROKER_LOCAL://0.0.0.0:" + KAFKA_LOCAL_PORT); + withEnv("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,BROKER_LOCAL:PLAINTEXT"); withEnv("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER"); withEnv("KAFKA_BROKER_ID", "1"); @@ -57,14 +61,15 @@ public KafkaContainer(String confluentPlatformVersion, String zookeeperConnect) withEnv("KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS", "0"); withNetworkAliases(networkAlias); - + waitingFor(new HostPortWaitStrategy()); } - public String getBootstrapServers() { + public Stream getBootstrapServers() { if (port == PORT_NOT_ASSIGNED) { throw new IllegalStateException("You should start Kafka container first"); } - return String.format("PLAINTEXT://%s:%s", getHost(), port); + return Stream.of(String.format("PLAINTEXT://%s:%s", getHost(), port), + String.format("PLAINTEXT://localhost:%s", getMappedPort(KAFKA_LOCAL_PORT))); } @Override @@ -84,10 +89,16 @@ protected void containerIsStarting(InspectContainerResponse containerInfo, boole return; } + // Use two listeners with different names, it will force Kafka to communicate + // with itself via internal + // listener when KAFKA_INTER_BROKER_LISTENER_NAME is set, otherwise Kafka will + // try to use the advertised listener + String command = "#!/bin/bash \n"; command += "export KAFKA_ZOOKEEPER_CONNECT='" + zookeeperConnect + "'\n"; command += "export KAFKA_ADVERTISED_LISTENERS='" + Stream - .concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT), + .concat(Stream.of("PLAINTEXT://" + networkAlias + ":" + KAFKA_INTERNAL_ADVERTISED_LISTENERS_PORT, + "BROKER_LOCAL://localhost:" + getMappedPort(KAFKA_LOCAL_PORT)), containerInfo.getNetworkSettings().getNetworks().values().stream() .map(it -> "BROKER://" + it.getIpAddress() + ":" + KAFKA_INTERNAL_PORT)) .collect(Collectors.joining(",")) + "'\n"; diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java index 324e74d33e7045..fd20002fb2b146 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/SchemaRegistryContainer.java @@ -4,8 +4,10 @@ import static java.lang.String.format; import java.io.IOException; +import java.time.Duration; + import org.testcontainers.containers.GenericContainer; -import org.testcontainers.containers.wait.strategy.Wait; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; import org.testcontainers.utility.TestcontainersConfiguration; @@ -27,7 +29,7 @@ public SchemaRegistryContainer(String confluentPlatformVersion, String zookeeper withExposedPorts(SCHEMA_REGISTRY_INTERNAL_PORT); withNetworkAliases(networkAlias); - waitingFor(Wait.forHttp("/subjects")); + waitingFor(new HttpWaitStrategy().forPath("/subjects").withStartupTimeout(Duration.ofMinutes(2))); } public String getUrl() { diff --git a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/ZookeeperContainer.java b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/ZookeeperContainer.java index 62ee3765a39b06..5bfc5055df68a7 100644 --- a/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/ZookeeperContainer.java +++ b/metadata-integration/java/datahub-client/src/test/java/datahub/client/kafka/containers/ZookeeperContainer.java @@ -1,6 +1,7 @@ package datahub.client.kafka.containers; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HostPortWaitStrategy; import org.testcontainers.utility.TestcontainersConfiguration; import java.io.IOException; @@ -30,6 +31,7 @@ public ZookeeperContainer(String confluentPlatformVersion) throws IOException { addExposedPort(ZOOKEEPER_INTERNAL_PORT); withNetworkAliases(networkAlias); + waitingFor(new HostPortWaitStrategy()); } public String getInternalUrl() {